handler_task.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/aegis/model"
  5. "go-common/library/queue/databus"
  6. "go-common/library/xstr"
  7. )
  8. type baseTaskHandler struct {
  9. *Service
  10. }
  11. type dynamicTaskHandler struct {
  12. baseTaskHandler
  13. }
  14. func (h baseTaskHandler) CheckMessage(msg *databus.Message) (taskObj interface{}, err error) {
  15. return h.checkTaskMsg(msg)
  16. }
  17. func (h baseTaskHandler) HandleMessage(c context.Context, taskObj interface{}) error {
  18. return h.writeTaskToDB(c, taskObj.(*model.Task))
  19. }
  20. func (h dynamicTaskHandler) CheckMessage(msg *databus.Message) (taskObj interface{}, err error) {
  21. var c = context.Background()
  22. if taskObj, err = h.baseTaskHandler.CheckMessage(msg); err != nil {
  23. return
  24. }
  25. //补充mid相关信息
  26. task := taskObj.(*model.Task)
  27. res, err := h.dao.Resource(c, task.RID)
  28. if err != nil || res == nil {
  29. return nil, ErrTaskResourceInvalid
  30. }
  31. task.MID = res.MID
  32. if task.MID > 0 {
  33. groupids, _ := h.dao.UpSpecial(c, task.MID)
  34. task.Group = xstr.JoinInts(groupids)
  35. task.Fans, _ = h.dao.FansCount(c, task.MID)
  36. }
  37. taskObj = task
  38. return
  39. }
  40. func (h dynamicTaskHandler) HandleMessage(c context.Context, obj interface{}) error {
  41. return h.baseTaskHandler.HandleMessage(c, obj.(*model.Task))
  42. }