123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- package service
- import (
- "bytes"
- "context"
- "fmt"
- "io/ioutil"
- "net/http"
- "strconv"
- "time"
- "go-common/app/job/main/dm2/model"
- "go-common/app/job/main/dm2/model/oplog"
- "go-common/library/log"
- )
- func (s *Service) taskResProc() {
- var (
- c = context.Background()
- tasks []*model.TaskInfo
- err error
- )
- ticker := time.NewTicker(time.Duration(s.conf.TaskConf.ResInterval))
- defer ticker.Stop()
- for range ticker.C {
- if tasks, err = s.dao.TaskInfos(c, model.TaskStateSearch); err != nil {
- log.Error("s.dao.TaskInfos error(%v)", err)
- continue
- }
- for _, task := range tasks {
- count, url, state, err := s.dao.TaskSearchRes(c, task)
- if err != nil {
- log.Error("s.dao.TaskSearchRes(%+v) error(%v)", task, err)
- continue
- }
- if state == model.TaskSearchFail {
- task.State = model.TaskStateFail
- } else if state == model.TaskSearchSuc {
- task.Result = url
- task.Count = count
- if task.Sub > 0 {
- task.State = model.TaskStateWait
- } else {
- task.State = model.TaskStateSuc
- }
- }
- s.dao.UpdateTask(c, task)
- }
- }
- }
- func (s *Service) taskDelProc() {
- var (
- c = context.Background()
- err error
- )
- ticker := time.NewTicker(time.Duration(s.conf.TaskConf.DelInterval))
- defer ticker.Stop()
- for range ticker.C {
- if err = s.taskSchedule(c); err != nil {
- log.Error("taskDelProc error(%v)", err)
- continue
- }
- }
- }
- func (s *Service) taskSchedule(c context.Context) (err error) {
- var (
- ok bool
- now = time.Now()
- expire = now.Add(time.Duration(s.conf.TaskConf.DelInterval))
- expireStr = expire.Format(time.RFC3339)
- oldExpireStr, oldExpireGetSetStr string
- oldExpire time.Time
- )
- if ok, err = s.dao.SetnxTaskJob(c, expireStr); err != nil {
- return
- }
- // redis中不存在
- if ok {
- if err = s.taskDelJob(c); err != nil {
- s.dao.DelTaskJob(c)
- log.Error("taskDelJob,error(%v)", err)
- return
- }
- return
- }
- // redis中已经存在
- // 判断是否过期了
- if oldExpireStr, err = s.dao.GetTaskJob(c); err != nil {
- return
- }
- if oldExpire, err = time.Parse(time.RFC3339, oldExpireStr); err != nil {
- return
- }
- if oldExpire.Sub(now) > 0 {
- return
- }
- if oldExpireGetSetStr, err = s.dao.GetSetTaskJob(c, expireStr); err != nil {
- return
- }
- if oldExpireGetSetStr != oldExpireStr {
- return
- }
- if err = s.taskDelJob(c); err != nil {
- s.dao.DelTaskJob(c)
- log.Error("taskDelJob,error(%v)", err)
- return
- }
- return
- }
- // TODO: operation_time && operation_rate
- func (s *Service) taskDelJob(c context.Context) (err error) {
- var (
- task *model.TaskInfo
- )
- if task, err = s.dao.OneTask(c); err != nil || task == nil {
- return
- }
- task.State = model.TaskStateDelDM
- s.dao.UpdateTask(c, task)
- var delCount int64
- if delCount, task.LastIndex, task.State, err = s.taskDelDM(c, task); err != nil {
- return
- }
- if task.State == model.TaskStateDelDM {
- task.State = model.TaskStateSuc
- }
- if _, err = s.dao.UptSubTask(c, task.ID, delCount, time.Now()); err != nil {
- return
- }
- _, err = s.dao.UpdateTask(c, task)
- return
- }
- func (s *Service) taskDelDM(c context.Context, eTask *model.TaskInfo) (delCount int64, lastIndex, state int32, err error) {
- taskDelNum := s.conf.TaskConf.DelNum
- taskResFieldLen := s.conf.TaskConf.ResFieldLen
- res, err := http.Get(eTask.Result)
- if err != nil {
- log.Error("s.taskDelDM.HttpGet(%s) error(%v)", eTask.Result, err)
- return
- }
- resp, err := ioutil.ReadAll(res.Body)
- if err != nil {
- res.Body.Close()
- log.Error("s.taskDelDM.ioutilRead error(%v)", err)
- return
- }
- res.Body.Close()
- lines := bytes.Split(resp, []byte("\n"))
- total := len(lines)
- n := (total-1)/taskDelNum + 1
- for i := int(eTask.LastIndex); i < n; i++ {
- var (
- task *model.TaskInfo
- subTask *model.SubTask
- )
- start := i * taskDelNum
- end := (i + 1) * taskDelNum
- if end > total {
- end = total
- }
- OidDMid := make(map[int64][]int64)
- for _, line := range lines[start:end] {
- var dmid, oid int64
- fields := bytes.Split(line, []byte("\001"))
- if len(fields) < taskResFieldLen {
- log.Error("fields lenth too small:%d", len(fields))
- continue
- }
- if dmid, err = strconv.ParseInt(string(fields[0]), 10, 64); err != nil {
- log.Error("ParseInt(%s) error(%v)", string(fields[0]), err)
- continue
- }
- if oid, err = strconv.ParseInt(string(fields[1]), 10, 64); err != nil {
- log.Error("ParseInt(%s) error(%v)", string(fields[1]), err)
- continue
- }
- OidDMid[oid] = append(OidDMid[oid], dmid)
- }
- for oid, dmids := range OidDMid {
- var affected int64
- if affected, err = s.dao.DelDMs(c, oid, dmids, model.StateTaskDel); err != nil {
- log.Error("dm task(id:%d) del dm(oid:%d,dmids:%v) error(%v)", eTask.ID, oid, dmids, err)
- continue
- }
- if affected > 0 {
- s.OpLog(c, oid, 0, time.Now().Unix(), int(model.SubTypeVideo), dmids, "status", "", strconv.FormatInt(int64(model.StateTaskDel), 10), "弹幕任务删除", oplog.SourceManager, oplog.OperatorSystem)
- delCount += affected
- if _, err = s.dao.UptSubjectCount(c, model.SubTypeVideo, oid, affected); err != nil {
- log.Error("dm task update count(oid:%d,affected:%d) error(%v)", oid, affected, err)
- }
- }
- time.Sleep(50 * time.Millisecond)
- }
- if len(OidDMid) > 0 {
- log.Warn("dm task(id:%d) del dm(oid,dmids:%+v)", eTask.ID, OidDMid)
- }
- lastIndex = int32(i + 1)
- task, err = s.dao.OneTask(c)
- if err == nil && task != nil && task.ID != eTask.ID && task.Priority > eTask.Priority {
- state = model.TaskStateWait
- return
- }
- if eTask, err = s.dao.TaskInfoByID(c, eTask.ID); err != nil || task == nil {
- continue
- }
- state = eTask.State
- if state != model.TaskStateDelDM {
- return
- }
- if subTask, err = s.dao.SubTask(c, eTask.ID); err != nil || subTask == nil {
- continue
- }
- tCount := subTask.Tcount + delCount
- if tCount >= s.conf.TaskConf.DelLimit && subTask.Tcount < s.conf.TaskConf.DelLimit {
- log.Warn("task(id:%d) del dm reach limit(count:%d)", eTask.ID, tCount)
- s.sendWechatWorkMsg(c, eTask, tCount)
- state = model.TaskStatePause
- return
- }
- }
- return
- }
- func (s *Service) sendWechatWorkMsg(c context.Context, task *model.TaskInfo, count int64) (err error) {
- content := fmt.Sprintf(model.TaskNoticeContent, task.ID, task.Title, count)
- users := s.conf.TaskConf.MsgCC
- users = append(users, task.Creator, task.Reviewer)
- return s.dao.SendWechatWorkMsg(c, content, model.TaskNoticeTitle, users)
- }
- // OpLog put a new infoc format operation log into the channel
- func (s *Service) OpLog(c context.Context, cid, operator, OperationTime int64, typ int, dmids []int64, subject, originVal, currentVal, remark string, source oplog.Source, operatorType oplog.OperatorType) (err error) {
- infoLog := new(oplog.Infoc)
- infoLog.Oid = cid
- infoLog.Type = typ
- infoLog.DMIds = dmids
- infoLog.Subject = subject
- infoLog.OriginVal = originVal
- infoLog.CurrentVal = currentVal
- infoLog.OperationTime = strconv.FormatInt(OperationTime, 10)
- infoLog.Source = source
- infoLog.OperatorType = operatorType
- infoLog.Operator = operator
- infoLog.Remark = remark
- select {
- case s.opsLogCh <- infoLog:
- default:
- err = fmt.Errorf("opsLogCh full")
- log.Error("opsLogCh full (%v)", infoLog)
- }
- return
- }
- func (s *Service) oplogproc() {
- for opLog := range s.opsLogCh {
- if len(opLog.Subject) == 0 || len(opLog.CurrentVal) == 0 || opLog.Source <= 0 ||
- opLog.Operator < 0 || opLog.OperatorType <= 0 {
- log.Warn("oplogproc() it is an illegal log, warn(%v, %v, %v)", opLog.Subject, opLog.Subject, opLog.CurrentVal)
- continue
- } else {
- for _, dmid := range opLog.DMIds {
- if dmid > 0 {
- s.dmOperationLogSvc.Info(opLog.Subject, strconv.FormatInt(opLog.Oid, 10), strconv.Itoa(opLog.Type),
- strconv.FormatInt(dmid, 10), opLog.Source.String(), opLog.OriginVal,
- opLog.CurrentVal, strconv.FormatInt(opLog.Operator, 10), opLog.OperatorType.String(),
- opLog.OperationTime, opLog.Remark)
- } else {
- log.Warn("oplogproc() it is an illegal log, for dmid value, warn(%d, %+v)", dmid, opLog)
- }
- }
- }
- }
- }
|