queue.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package service
  2. import (
  3. "context"
  4. "strings"
  5. "time"
  6. "go-common/app/job/main/workflow/model"
  7. "go-common/library/log"
  8. )
  9. const (
  10. _wfKeyPrefix = "wf_"
  11. _feedbackDealType = 1
  12. )
  13. // queueProc .
  14. func (s *Service) queueproc(c context.Context, dealType int) {
  15. for {
  16. var (
  17. key string
  18. listMap = make(map[string][]int64)
  19. )
  20. sParams := searchParams(c, dealType, model.ListBefore, s.businessAttr)
  21. cLists, err := s.challByIDs(c, sParams)
  22. if err != nil {
  23. log.Error("s.challByIDs error(%v)", err)
  24. time.Sleep(time.Second * 3)
  25. continue
  26. }
  27. if len(cLists) > 0 {
  28. for _, cList := range cLists {
  29. if cList.DispatchState != model.QueueState {
  30. continue
  31. }
  32. now := time.Now().Format("2006-01-02 15:04:05")
  33. log.Info("current cid(%d) dispatch_state is (%d) time is (%s)", cList.ID, cList.DispatchState, now)
  34. key = genKey(c, cList.Business, dealType)
  35. listMap[key] = append(listMap[key], cList.ID)
  36. }
  37. for key, list := range listMap {
  38. newDispatchState := s.dispatchState(c, dealType, model.ListBefore, cLists[list[0]].DispatchState)
  39. err := s.dao.UpDispatchStateByIDs(c, list, newDispatchState)
  40. if err != nil {
  41. log.Error("s.dao.UpDispatchStateByIDs error(%v)", err)
  42. time.Sleep(time.Second * 3)
  43. continue
  44. }
  45. now := time.Now().Format("2006-01-02 15:04:05")
  46. log.Info("this cids(%v) change dispatch_state to (%d) time is (%s)", list, newDispatchState, now)
  47. err = s.dao.SetList(c, key, list)
  48. if err != nil {
  49. log.Error("s.dao.SetList error(%v)", err)
  50. time.Sleep(time.Second * 3)
  51. continue
  52. }
  53. }
  54. }
  55. time.Sleep(time.Second * 15)
  56. }
  57. }
  58. // repairListProc .
  59. func (s *Service) repairQueueproc(c context.Context, dealType int) {
  60. s.setCrash(c)
  61. for {
  62. time.Sleep(time.Second * 30)
  63. exist, err := s.dao.IsCrash(c)
  64. if err != nil {
  65. log.Error("s.dao.ExistKey error(%v)", err)
  66. time.Sleep(time.Second * 3)
  67. continue
  68. }
  69. if exist {
  70. continue
  71. }
  72. var keySlice []string
  73. for _, attr := range s.businessAttr {
  74. var key string
  75. if attr.AssignType == model.SysAssignType {
  76. continue
  77. }
  78. if dealType == model.FDealType {
  79. if dealType == attr.DealType {
  80. key = genKey(c, attr.ID, dealType)
  81. }
  82. } else if dealType == model.ADealType {
  83. key = genKey(c, attr.ID, dealType)
  84. }
  85. keySlice = append(keySlice, key)
  86. }
  87. sParams := searchParams(c, dealType, model.ListIng, s.businessAttr)
  88. for _, key := range keySlice {
  89. var cids []int64
  90. sParams.Business = strings.Split(key, "_")[1]
  91. searchRes, err := s.dao.SearchChall(c, sParams)
  92. if err != nil {
  93. log.Error("s.dao.SearchChall error(%v)", err)
  94. time.Sleep(time.Second * 3)
  95. continue
  96. }
  97. searchDataRes := searchRes.Result
  98. if len(searchDataRes) > 0 {
  99. for _, r := range searchDataRes {
  100. cids = append(cids, r.ID)
  101. }
  102. err := s.dao.SetList(c, key, cids)
  103. if err != nil {
  104. log.Error("s.dao.SetList error(%v)", err)
  105. time.Sleep(time.Second * 3)
  106. continue
  107. }
  108. }
  109. }
  110. s.setCrash(c)
  111. }
  112. }
  113. // SetConstKey .
  114. func (s *Service) setCrash(c context.Context) {
  115. for {
  116. if err := s.dao.SetCrash(c); err != nil {
  117. log.Error("s.dao.SetString error(%v)", err)
  118. time.Sleep(time.Second * 3)
  119. continue
  120. }
  121. break
  122. }
  123. }