|
- package service
- import (
- "bufio"
- "context"
- "crypto/md5"
- "errors"
- "fmt"
- "io"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "go-common/app/job/main/push/dao"
- pb "go-common/app/service/main/push/api/grpc/v1"
- pushmdl "go-common/app/service/main/push/model"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- "go-common/library/sync/errgroup"
- )
- const (
- _delTaskLimit = 5000
- )
- var (
- errEmptyLine = errors.New("empty line")
- errInvalidMid = errors.New("invalid mid format")
- errInvalidToken = errors.New("invalid token format")
- )
- func (s *Service) addTaskproc() {
- defer s.addTaskWg.Done()
- var err error
- for {
- task, ok := <-s.addTaskCh
- if !ok {
- log.Info("add task channel exit")
- return
- }
- if task == nil {
- continue
- }
- task.Status = pushmdl.TaskStatusPrepared
- for i := 0; i < _retry; i++ {
- if err = s.dao.AddTask(context.Background(), task); err == nil {
- break
- }
- }
- if err != nil {
- log.Error("add task(%+v) error(%v)", task, err)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("add task(%d)", task.Job))
- })
- continue
- }
- dao.PromInfo("add task")
- time.Sleep(time.Millisecond)
- }
- }
- func (s *Service) delTasksproc() {
- for {
- now := time.Now()
- // 每天2点时删除一个月前的task数据
- if now.Hour() != 2 {
- time.Sleep(time.Minute)
- continue
- }
- var (
- err error
- deleted int64
- b = now.Add(time.Duration(-s.c.Job.DelTaskInterval*24) * time.Hour)
- loc, _ = time.LoadLocation("Local")
- t = time.Date(b.Year(), b.Month(), b.Day(), 23, 59, 59, 0, loc)
- )
- for {
- if deleted, err = s.dao.DelTasks(context.TODO(), t, _delTaskLimit); err != nil {
- log.Error("s.delTasks(%v) error(%v)", t, err)
- s.dao.SendWechat("DB操作失败:push-job删除task数据错误")
- time.Sleep(time.Second)
- continue
- }
- if deleted < _delTaskLimit {
- break
- }
- time.Sleep(time.Second)
- }
- time.Sleep(time.Hour)
- }
- }
- func (s *Service) pretreatTaskproc() {
- defer s.waiter.Done()
- for {
- if s.closed {
- return
- }
- task, err := s.pickPretreatmentTask()
- if err != nil {
- time.Sleep(5 * time.Second)
- continue
- }
- if task != nil {
- log.Info("pretreat task job(%d) id(%s)", task.Job, task.ID)
- if err = s.pretreatTask(task); err != nil {
- log.Error("pretreat task(%+v) error(%v)", task, err)
- s.cache.Save(func() { s.dao.SendWechat(fmt.Sprintf("pretreat task(%s) error", task.ID)) })
- }
- }
- time.Sleep(time.Duration(s.c.Job.LoadTaskInteval))
- }
- }
- func (s *Service) pickPretreatmentTask() (t *pushmdl.Task, err error) {
- c := context.Background()
- var tx *xsql.Tx
- if tx, err = s.dao.BeginTx(c); err != nil {
- log.Error("tx.BeginTx() error(%v)", err)
- return
- }
- if t, err = s.dao.TxTaskByStatus(tx, pushmdl.TaskStatusPretreatmentPrepared); err != nil {
- if e := tx.Rollback(); e != nil {
- dao.PromError("task:获取新任务")
- log.Error("tx.Rollback() error(%v)", e)
- }
- return
- }
- if t == nil {
- if e := tx.Rollback(); e != nil {
- dao.PromError("task:获取新任务")
- log.Error("tx.Rollback() error(%v)", e)
- }
- return
- }
- if err = s.dao.TxUpdateTaskStatus(tx, t.ID, pushmdl.TaskStatusPretreatmentDoing); err != nil {
- if e := tx.Rollback(); e != nil {
- dao.PromError("task:更新任务状态")
- log.Error("tx.Rollback() error(%v)", e)
- }
- return
- }
- if err = tx.Commit(); err != nil {
- dao.PromError("task:获取新任务commit")
- log.Error("tx.Commit() error(%v)", err)
- }
- return
- }
- func (s *Service) pretreatTask(t *pushmdl.Task) (err error) {
- id, _ := strconv.ParseInt(t.ID, 10, 64)
- switch t.Type {
- case pushmdl.TaskTypeAll:
- err = s.pretreatTaskAll(t)
- case pushmdl.TaskTypeMngToken, pushmdl.TaskTypeDataPlatformToken, pushmdl.TaskTypeDataPlatformMid:
- err = s.pretreatTaskToken(t)
- case pushmdl.TaskTypeStrategyMid, pushmdl.TaskTypeMngMid:
- err = s.pretreatTaskMid(t)
- default:
- log.Error("invalid task type, (%+v)", t)
- }
- if err != nil {
- err = s.dao.UpdateTaskStatus(context.Background(), id, pushmdl.TaskStatusPretreatmentFailed)
- return
- }
- err = s.dao.UpdateTaskStatus(context.Background(), id, pushmdl.TaskStatusPretreatmentDone)
- return
- }
- func (s *Service) pretreatTaskAll(t *pushmdl.Task) (err error) {
- log.Info("AddTaskAll start, task(%+v)", t)
- var (
- maxID int64
- group = errgroup.Group{}
- )
- maxID, err = s.dao.ReportLastID(context.Background())
- if err != nil || maxID <= 0 {
- log.Error("s.pretreatTaskAll() error(%v)", err)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportLastID(%d) error", t.ID, maxID))
- })
- return
- }
- log.Info("AddTaskAll get last report ID(%d)", maxID)
- buildCount := len(t.Build)
- batch := maxID / int64(s.c.Job.TaskGoroutines)
- for j := 0; j < s.c.Job.TaskGoroutines; j++ {
- begin := int64(j) * batch
- end := begin + batch
- group.Go(func() (e error) {
- var (
- path string
- rows *xsql.Rows
- tokens = make(map[int][]string)
- )
- for {
- if begin >= end {
- break
- }
- l := begin + int64(_dbBatch)
- if l >= end {
- l = end
- }
- log.Info("AddTaskAll load reports start(%d) end(%d)", begin, l)
- if rows, e = s.dao.ReportsTaskAll(context.Background(), begin, l, t.APPID); e != nil {
- log.Error("s.dao.ReportsTaskAll(%d,%d,%d) error(%v)", begin, l, t.APPID)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportsTaskAll(%d,%d,%d) error", t.ID, begin, l, t.APPID))
- })
- return
- }
- for rows.Next() {
- var (
- platformID int
- build int
- token string
- )
- if e = rows.Scan(&platformID, &token, &build); e != nil {
- log.Error("AddTaskAll rows.Scan() error(%v)", e)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportsTaskAll(%d,%d,%d) error", t.ID, begin, l, t.APPID))
- })
- return
- }
- if buildCount > 0 && !pushmdl.ValidateBuild(platformID, build, t.Build) {
- continue
- }
- tokens[platformID] = append(tokens[platformID], token)
- if len(tokens[platformID]) >= s.c.Job.LimitPerTask {
- if path, e = s.saveFile(tokens[platformID]); e != nil {
- log.Error("AddTaskAll s.saveTokens error(%v)", e)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) saveTokens error(%v)", t.ID, e))
- })
- return
- }
- tokens[platformID] = []string{}
- task := *t
- task.MidFile = path
- task.PlatformID = platformID
- s.addTaskCh <- &task
- }
- }
- begin = l
- }
- for p, v := range tokens {
- if len(v) == 0 {
- continue
- }
- if path, e = s.saveFile(v); e == nil {
- task := *t
- task.MidFile = path
- task.PlatformID = p
- s.addTaskCh <- &task
- }
- }
- return
- })
- }
- if err = group.Wait(); err != nil {
- log.Error("add task all, task(%+v) error(%v)", t, err)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) error(%v)", t.ID, err))
- })
- return
- }
- log.Info("AddTaskAll end, task(%+v)", t)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("add task all success, job(%d)", t.Job))
- })
- return
- }
- func (s *Service) pretreatTaskMid(t *pushmdl.Task) (err error) {
- f, err := os.Open(t.MidFile)
- if err != nil {
- log.Error("pretreatTaskMid(%+v) open file error(%v)", t, err)
- return
- }
- defer f.Close()
- var (
- exit bool
- line string
- path string
- mid int64
- counter int
- midTotal int64
- midValid int64
- mu sync.Mutex
- mids []int64
- tokens = make(map[int][]string)
- group = errgroup.Group{}
- reader = bufio.NewReader(f)
- )
- for {
- if exit {
- break
- }
- if line, err = reader.ReadString('\n'); err != nil {
- if err == io.EOF {
- exit = true
- } else {
- log.Error("read file error(%v)", err)
- continue
- }
- }
- if mid, err = parseMidLine(line); err != nil {
- log.Error("parse mid line(%s) error(%v)", line, err)
- continue
- }
- midTotal++
- mids = append(mids, mid)
- if len(mids) >= s.c.Job.PushPartSize {
- midsCp := make([]int64, len(mids))
- copy(midsCp, mids)
- mids = []int64{}
- group.Go(func() (e error) {
- ts, valid, e := s.tokensByMids(t, midsCp)
- if e != nil {
- log.Error("s.tokensByMids(%v) error(%v)", t.ID, e)
- return
- }
- tcopy := make(map[int][]string)
- mu.Lock()
- midValid += valid
- for p, v := range ts {
- tokens[p] = append(tokens[p], v...)
- if len(tokens[p]) >= s.c.Job.LimitPerTask {
- tcopy[p] = append(tcopy[p], tokens[p]...)
- tokens[p] = []string{}
- }
- }
- mu.Unlock()
- for p, v := range tcopy {
- if path, err = s.saveFile(v); err != nil {
- log.Error("pretreatTaskMid s.saveFild error(%v)", err)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("pretreatTaskMid(%v) saveTokens error(%v)", t.ID, err))
- })
- return
- }
- task := *t
- task.MidFile = path
- task.PlatformID = p
- s.addTaskCh <- &task
- }
- return
- })
- counter++
- if counter == s.c.Job.PushPartChanSize {
- group.Wait()
- counter = 0
- }
- }
- }
- if counter > 0 {
- group.Wait()
- }
- if len(mids) > 0 {
- var (
- valid int64
- ts map[int][]string
- )
- if ts, valid, err = s.tokensByMids(t, mids); err == nil {
- midValid += valid
- for p, v := range ts {
- tokens[p] = append(tokens[p], v...)
- }
- } else {
- log.Error("s.tokensByMids(%+v) error(%v)", t, err)
- }
- }
- s.cache.Save(func() {
- arg := &pb.AddMidProgressRequest{Task: t.ID, MidTotal: midTotal, MidValid: midValid}
- if _, e := s.pushRPC.AddMidProgress(context.Background(), arg); e != nil {
- log.Error("s.pushRPC.AddMidProgress(%+v) error(%v)", arg, e)
- }
- })
- for p, v := range tokens {
- if len(v) == 0 {
- continue
- }
- if path, err = s.saveFile(v); err != nil {
- log.Error("pretreatTaskMid s.saveFild error(%v)", err)
- return
- }
- task := *t
- task.MidFile = path
- task.PlatformID = p
- s.addTaskCh <- &task
- }
- log.Info("pretreatTaskMid task(%+v)", t)
- return
- }
- func (s *Service) pretreatTaskToken(t *pushmdl.Task) (err error) {
- f, err := os.Open(t.MidFile)
- if err != nil {
- log.Error("pretreatTaskToken(%+v) open file error(%v)", t, err)
- return
- }
- defer f.Close()
- var (
- exit bool
- plat int
- line string
- token string
- path string
- tokens = make(map[int][]string)
- reader = bufio.NewReader(f)
- )
- for {
- if exit {
- break
- }
- if line, err = reader.ReadString('\n'); err != nil {
- if err == io.EOF {
- exit = true // no 'continue', solve the last line whitout '\n'
- } else {
- log.Error("read file error(%v)", err)
- continue
- }
- }
- if plat, token, err = parseTokenLine(line); err != nil {
- log.Error("parse token line(%s) error(%v)", line, err)
- continue
- }
- tokens[plat] = append(tokens[plat], token)
- if len(tokens[plat]) >= s.c.Job.LimitPerTask {
- if path, err = s.saveFile(tokens[plat]); err != nil {
- log.Error("pretreatTaskToken s.saveFile error(%v)", err)
- s.cache.Save(func() {
- s.dao.SendWechat(fmt.Sprintf("pretreatTaskToken(%v) saveTokens error(%v)", t.ID, err))
- })
- return
- }
- tokens[plat] = []string{}
- task := *t
- task.MidFile = path
- task.PlatformID = plat
- s.addTaskCh <- &task
- }
- }
- for p, v := range tokens {
- if len(v) == 0 {
- continue
- }
- if path, err = s.saveFile(v); err == nil {
- task := *t
- task.MidFile = path
- task.PlatformID = p
- s.addTaskCh <- &task
- }
- }
- log.Info("pretreatTaskToken task(%+v)", t)
- return
- }
- func parseTokenLine(line string) (plat int, token string, err error) {
- line = strings.Trim(line, " \r\n")
- if line == "" {
- err = errEmptyLine
- return
- }
- res := strings.Split(line, "\t")
- if len(res) != 2 {
- err = errInvalidToken
- return
- }
- if res[0] == "" || res[1] == "" {
- err = errInvalidToken
- return
- }
- if plat, err = strconv.Atoi(res[0]); err != nil || plat <= 0 {
- err = errInvalidToken
- return
- }
- token = res[1]
- return
- }
- func parseMidLine(line string) (mid int64, err error) {
- line = strings.Trim(line, " \r\t\n")
- if line == "" {
- err = errEmptyLine
- return
- }
- if mid, err = strconv.ParseInt(line, 10, 64); err != nil || mid <= 0 {
- err = errInvalidMid
- }
- return
- }
- func (s *Service) saveFile(tokens []string) (path string, err error) {
- name := strconv.FormatInt(time.Now().UnixNano(), 10) + tokens[0]
- data := []byte(strings.Join(tokens, "\n"))
- for i := 0; i < _retry; i++ {
- if path, err = s.saveNASFile(name, data); err == nil {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- return
- }
- // saveNASFile writes data into NAS.
- func (s *Service) saveNASFile(name string, data []byte) (path string, err error) {
- name = fmt.Sprintf("%x", md5.Sum([]byte(name)))
- dir := fmt.Sprintf("%s/%s/%s", strings.TrimSuffix(s.c.Job.MountDir, "/"), time.Now().Format("20060102"), name[:2])
- if _, err = os.Stat(dir); err != nil {
- if !os.IsNotExist(err) {
- log.Error("os.IsNotExist(%s) error(%v)", dir, err)
- return
- }
- if err = os.MkdirAll(dir, 0777); err != nil {
- log.Error("os.MkdirAll(%s) error(%v)", dir, err)
- return
- }
- }
- path = fmt.Sprintf("%s/%s", dir, name)
- f, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
- if err != nil {
- log.Error("s.saveNASFile(%s) OpenFile() error(%v)", path, err)
- return
- }
- defer f.Close()
- if _, err = f.Write(data); err != nil {
- log.Error("s.saveNASFile(%s) f.Write() error(%v)", err)
- }
- return
- }
|