report.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/job/main/aegis/model"
  7. "go-common/library/log"
  8. )
  9. func (s *Service) reportSubmit(c context.Context, old, new *model.Task) {
  10. s.reportTaskFinish(c, new)
  11. stfield := fmt.Sprintf(model.Submit, new.State, old.UID)
  12. s.dao.IncresByField(c, new.BusinessID, new.FlowID, new.UID, stfield, 1)
  13. s.dao.IncresByField(c, new.BusinessID, new.FlowID, new.UID, model.UseTime, new.Utime)
  14. //统计资源的通过,打回什么的,只统计任务列表操作; 异步统计,免得干扰缓存的同步速度
  15. if old.UID == new.UID && new.State == model.TaskStateSubmit {
  16. select {
  17. case s.chanReport <- &model.RIR{
  18. BizID: new.BusinessID,
  19. FlowID: new.FlowID,
  20. UID: new.UID,
  21. RID: new.RID,
  22. }:
  23. case <-time.NewTimer(time.Millisecond * 10).C:
  24. log.Error("reportSubmit chanfull")
  25. }
  26. }
  27. }
  28. func (s *Service) reportResource(c context.Context, bizid, flowid, rid, uid int64) {
  29. st, err := s.dao.RscState(c, rid)
  30. if err != nil {
  31. log.Error("reportResource RscState(%d) error(%v)", rid, err)
  32. return
  33. }
  34. field := fmt.Sprintf(model.RscState, st)
  35. s.dao.IncresByField(c, bizid, flowid, uid, field, 1)
  36. }
  37. func (s *Service) syncReport(c context.Context) {
  38. datas, err := s.dao.FlushReport(c)
  39. if err != nil {
  40. log.Error("FlushReport error(%v)", err)
  41. return
  42. }
  43. if len(datas) == 0 {
  44. return
  45. }
  46. for key, val := range datas {
  47. tp, bizid, flowid, uid, err := model.ParseKey(key)
  48. if err != nil {
  49. log.Error("syncReport ParseKey(%s)", key)
  50. continue
  51. }
  52. rt := &model.Report{
  53. BusinessID: int64(bizid),
  54. FlowID: int64(flowid),
  55. UID: int64(uid),
  56. TYPE: tp,
  57. Content: val,
  58. }
  59. s.dao.Report(c, rt)
  60. }
  61. }
  62. func (s *Service) reportTaskCreate(c context.Context, new *model.Task) {
  63. s.dao.IncresTaskInOut(c, new.BusinessID, new.FlowID, "in")
  64. }
  65. func (s *Service) reportTaskFinish(c context.Context, new *model.Task) {
  66. s.dao.IncresTaskInOut(c, new.BusinessID, new.FlowID, "out")
  67. }