123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- package service
- import (
- "bytes"
- "context"
- "crypto/md5"
- "fmt"
- "os"
- "strconv"
- "strings"
- "time"
- pamdl "go-common/app/admin/main/push/model"
- "go-common/app/job/main/push/model"
- pushmdl "go-common/app/service/main/push/model"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- )
- // txCond get a new condition by tx.
- func (s *Service) txCond(oldStatus, newStatus int) (cond *pamdl.DPCondition, err error) {
- ctx := context.Background()
- var tx *xsql.Tx
- if tx, err = s.dao.BeginTx(ctx); err != nil {
- log.Error("tx.BeginTx() error(%v)", err)
- return
- }
- if cond, err = s.dao.TxCondByStatus(tx, oldStatus); err != nil || cond == nil {
- if e := tx.Rollback(); e != nil {
- log.Error("tx.Rollback() error(%v)", e)
- }
- return
- }
- if err = s.dao.TxUpdateCondStatus(tx, cond.ID, newStatus); err != nil {
- if e := tx.Rollback(); e != nil {
- log.Error("tx.Rollback() error(%v)", e)
- }
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit() error(%v)", err)
- }
- return
- }
- // data platform query
- func (s *Service) dpQueryproc() {
- defer s.waiter.Done()
- for {
- if s.closed {
- return
- }
- cond, err := s.txCond(pushmdl.DpCondStatusPrepared, pushmdl.DpCondStatusSubmitting)
- if err != nil || cond == nil {
- time.Sleep(time.Second)
- continue
- }
- for i := 0; i < _retry; i++ {
- if cond.StatusURL, err = s.dao.DpSubmitQuery(context.Background(), cond.SQL); err == nil {
- break
- }
- time.Sleep(time.Second)
- }
- if err != nil {
- log.Error("data platform add query(%+v) error(%v)", cond, err)
- s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusFailed)
- s.dao.UpdateTaskStatus(context.Background(), cond.Task, pushmdl.TaskStatusFailed)
- continue
- }
- cond.Status = pushmdl.DpCondStatusSubmitted
- for i := 0; i < _retry; i++ {
- if err = s.dao.UpdateDpCond(context.Background(), cond); err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- log.Error("data platform update condition(%+v) error(%v)", cond, err)
- }
- time.Sleep(time.Second)
- }
- }
- // data platform get file
- func (s *Service) dpFileproc() {
- defer s.waiter.Done()
- for {
- if s.closed {
- return
- }
- cond, err := s.txCond(pushmdl.DpCondStatusSubmitted, pushmdl.DpCondStatusPolling)
- if err != nil || cond == nil {
- time.Sleep(time.Second)
- continue
- }
- var (
- path string
- files []string
- )
- if files = s.dpCheckJob(cond); len(files) == 0 {
- continue
- }
- for i := 0; i < _retry; i++ {
- if path, err = s.dpDownloadFiles(cond, files); err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil || path == "" {
- log.Error("data platform download query(%+v) file error(%v)", cond, err)
- s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusFailed)
- s.dao.UpdateTaskStatus(context.Background(), cond.Task, pushmdl.TaskStatusFailed)
- continue
- }
- cond.File = path
- cond.Status = pushmdl.DpCondStatusDone
- for i := 0; i < _retry; i++ {
- if err = s.dao.UpdateDpCond(context.Background(), cond); err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- log.Error("data platform UpdateDpCond(%+v) error(%v)", cond, err)
- continue
- }
- for i := 0; i < _retry; i++ {
- if err = s.dao.UpdateTask(context.Background(), strconv.FormatInt(cond.Task, 10), path, pushmdl.TaskStatusPretreatmentPrepared); err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- log.Error("s.dao.UpdateTask(%d,%s,%d) error(%v)", cond.Task, path, pushmdl.TaskStatusPretreatmentPrepared)
- }
- time.Sleep(time.Second)
- }
- }
- func (s *Service) dpCheckJob(cond *pamdl.DPCondition) (files []string) {
- now := time.Now()
- for {
- if time.Since(now) > time.Duration(s.c.Job.DpPollingTime) {
- log.Error("polling stoped, more over than dpPollingTime, give job up")
- s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusFailed)
- break
- }
- res, err := s.dao.DpCheckJob(context.Background(), cond.StatusURL)
- if err != nil {
- log.Error("s.dao.DpCheckJob(%s) error(%v)", cond.StatusURL, err)
- time.Sleep(time.Second)
- continue
- }
- if res.StatusID == model.CheckJobStatusDoing || res.StatusID == model.CheckJobStatusPending {
- log.Info("polling (%s) ing..., status(%d)", cond.StatusURL, res.StatusID)
- time.Sleep(5 * time.Second)
- continue
- }
- if res.StatusID == model.CheckJobStatusOk {
- if len(res.Files) == 0 {
- log.Info("polling (%s) success, no files found", cond.StatusURL)
- s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusNoFile)
- break
- }
- files = res.Files
- log.Info("polling (%s) success, files(%d)", cond.StatusURL, len(files))
- return
- }
- if res.StatusID == model.CheckJobStatusErr {
- log.Error("polling (%s) error, res(%+v)", cond.StatusURL, res)
- s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusFailed)
- break
- }
- }
- log.Error("polling cond(%d) error", cond.ID)
- s.dao.UpdateTaskStatus(context.Background(), cond.Task, pushmdl.TaskStatusFailed)
- return
- }
- func (s *Service) dpDownloadFiles(cond *pamdl.DPCondition, files []string) (path string, err error) {
- for i := 0; i < _retry; i++ {
- if err = s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusDownloading); err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- return
- }
- dir := fmt.Sprintf("%s/%s", strings.TrimSuffix(s.c.Job.MountDir, "/"), time.Now().Format("20060102"))
- if _, err = os.Stat(dir); err != nil {
- if !os.IsNotExist(err) {
- log.Error("os.IsNotExist(%s) error(%v)", dir, err)
- return
- }
- if err = os.MkdirAll(dir, 0777); err != nil {
- log.Error("os.MkdirAll(%s) error(%v)", dir, err)
- return
- }
- }
- name := strconv.FormatInt(time.Now().UnixNano(), 10)
- path = fmt.Sprintf("%s/%x", dir, md5.Sum([]byte(name)))
- for _, f := range files {
- if err = s.dpDownloadFile(f, path); err != nil {
- return
- }
- }
- return
- }
- func (s *Service) dpDownloadFile(url, path string) (err error) {
- var (
- res []byte
- content [][]byte
- )
- for i := 0; i < _retry; i++ {
- if res, err = s.dao.DpDownloadFile(context.Background(), url); err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- log.Error("s.dao.DpDownloadFile(%s) error(%v)", url, err)
- return
- }
- for _, bs := range bytes.Split(res, []byte("\n")) {
- n := bytes.Split(bs, []byte("\u0001"))
- content = append(content, bytes.Join(n, []byte(" ")))
- }
- for i := 0; i < _retry; i++ {
- if err = s.saveDpFile(path, bytes.Join(content, []byte("\n"))); err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- log.Error("s.saveNASFile(%s) error(%v)", url, err)
- }
- return
- }
- // saveDpFile writes data platform data into NAS.
- func (s *Service) saveDpFile(path string, data []byte) (err error) {
- f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
- if err != nil {
- log.Error("s.saveDpFile(%s) OpenFile() error(%v)", path, err)
- return
- }
- defer f.Close()
- if _, err = f.Write(data); err != nil {
- log.Error("s.saveDpFile(%s) f.Write() error(%v)", path, err)
- }
- return
- }
|