appeal.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/workflow/model"
  6. "go-common/library/log"
  7. "go-common/library/sync/errgroup.v2"
  8. )
  9. // 工作台单条过期
  10. func (s *Service) singleExpireproc() {
  11. errGroup := errgroup.Group{}
  12. for {
  13. for _, attr := range s.businessAttr {
  14. if attr.DealType != model.PDealType {
  15. continue
  16. }
  17. time.Sleep(1 * time.Second)
  18. errGroup.Go(func(ctx context.Context) error {
  19. return s.singleExpire(ctx, attr.Bid)
  20. })
  21. }
  22. errGroup.Wait()
  23. }
  24. }
  25. func (s *Service) singleExpire(c context.Context, bid int) (err error) {
  26. var delIDs []int64
  27. if delIDs, err = s.dao.SingleExpire(c, bid); err != nil {
  28. log.Error("s.dao.SingleExpire() bid:%d error: %v", bid, err)
  29. return
  30. }
  31. if len(delIDs) == 0 {
  32. return
  33. }
  34. log.Info("bid(%d) apid in (%v) are single expire", bid, delIDs)
  35. // set db state
  36. if err = s.dao.SetAppealAssignState(c, delIDs, model.AssignStateNotDispatch); err != nil {
  37. log.Error("s.dao.SetAppealAssignState() error: %v", err)
  38. return
  39. }
  40. if err = s.dao.DelSingleExpire(c, bid, delIDs); err != nil {
  41. log.Error("s.dao.SetSingleExpire() bid:%d error: %v", bid, err)
  42. return
  43. }
  44. if err = s.delRelatedMissions(c, bid, delIDs); err != nil {
  45. log.Error("s.delRelatedMissions() error: %v", err)
  46. }
  47. return
  48. }
  49. // 反馈整体过期
  50. func (s *Service) overallExpireproc() {
  51. errGroup := errgroup.Group{}
  52. for {
  53. for _, attr := range s.businessAttr {
  54. if attr.DealType != model.PDealType {
  55. continue
  56. }
  57. time.Sleep(1 * time.Second)
  58. errGroup.Go(func(ctx context.Context) error {
  59. return s.overallExpire(ctx, attr.Bid)
  60. })
  61. }
  62. errGroup.Wait()
  63. }
  64. }
  65. func (s *Service) overallExpire(c context.Context, bid int) (err error) {
  66. time.Sleep(10 * time.Second)
  67. cond := model.AppealSearchCond{
  68. Fields: []string{"id"},
  69. Bid: []int{bid},
  70. AssignState: []int8{model.AssignStatePoped},
  71. TransferState: []int8{model.TransferStatePendingSystemReply},
  72. DTimeTo: time.Now().Add(-time.Minute * 5).Format("2006-01-02 15:04:05"),
  73. PS: 101,
  74. PN: 1,
  75. Order: "id",
  76. Sort: "desc",
  77. }
  78. var res *model.AppealSearchRes
  79. if res, err = s.dao.SearchAppeal(c, cond); err != nil {
  80. log.Error("s.dao.SearchAppeal() error(%+v)", err)
  81. return
  82. }
  83. if len(res.Result) == 0 {
  84. log.Warn("no appeal is expire overall")
  85. return
  86. }
  87. ids := make([]int64, 0, len(res.Result))
  88. for _, r := range res.Result {
  89. ids = append(ids, r.ID)
  90. }
  91. log.Info("apids(%v) are expire overall", ids)
  92. if err = s.delRelatedMissions(c, bid, ids); err != nil {
  93. log.Error("s.delRelatedMissions() error: %v", err)
  94. }
  95. if err = s.dao.SetAppealAssignState(c, ids, model.AssignStateNotDispatch); err != nil {
  96. log.Error("s.dao.SetAppealAssignState(%v,%d) error(%v)", ids, model.AssignStateNotDispatch, err)
  97. }
  98. return
  99. }
  100. // 释放用户未评价反馈
  101. func (s *Service) releaseExpireproc() {
  102. for {
  103. for _, attr := range s.businessAttr {
  104. var (
  105. c = context.TODO()
  106. err error
  107. )
  108. if attr.DealType != model.PDealType {
  109. continue
  110. }
  111. time.Sleep(1 * time.Minute)
  112. cond := model.AppealSearchCond{
  113. Fields: []string{"id", "mid"},
  114. Bid: []int{attr.Bid},
  115. TTimeTo: time.Now().AddDate(0, 0, -3).Format("2006-01-02 15:04:05"),
  116. TransferState: []int8{model.TransferStatePendingSystemReply, model.TransferStateAdminReplyReaded, model.TransferStateAdminReplyNotReaded},
  117. PS: 50,
  118. PN: 1,
  119. Order: "id",
  120. Sort: "desc",
  121. }
  122. var res *model.AppealSearchRes
  123. if res, err = s.dao.SearchAppeal(c, cond); err != nil {
  124. log.Error("d.SearchAppeal error(%+v)", cond)
  125. continue
  126. }
  127. ids := make([]int64, 0, len(res.Result))
  128. for _, r := range res.Result {
  129. var e *model.Event
  130. if e, err = s.dao.LastEvent(r.ID); err != nil {
  131. log.Error("s.dao.LastEvent() id(%d) error:%v", r.ID, err)
  132. continue
  133. }
  134. // 关闭最后一个对话为管理员回复的申诉
  135. if e.Event == model.EventAdminReply {
  136. ids = append(ids, r.ID)
  137. }
  138. }
  139. if len(ids) == 0 {
  140. continue
  141. }
  142. // 删除权重参数
  143. if err = s.dao.DelUperInfo(c, ids); err != nil {
  144. log.Error("s.dao.DelUperInfo(%v), error(%v)", ids, err)
  145. continue
  146. }
  147. // todo delete single expire zset member
  148. if err = s.dao.DelSingleExpire(c, attr.Bid, ids); err != nil {
  149. log.Error("s.dao.DelSingleExpire(%d, %v), error(%v)", attr.Bid, ids, err)
  150. continue
  151. }
  152. if err = s.dao.SetAppealTransferState(c, ids, model.TransferStateAutoClosedExpire); err != nil {
  153. log.Error("s.dao.SetAppealTransferState() ids(%v) transferstate(%d) error(%v)", ids, model.TransferStateAutoClosedExpire, err)
  154. continue
  155. }
  156. log.Info("apids (%v) are expire user not set degree", ids)
  157. }
  158. }
  159. }
  160. // 进任务池
  161. func (s *Service) enterPoolproc() {
  162. for {
  163. errGroup := errgroup.Group{}
  164. for _, attr := range s.businessAttr {
  165. if attr.DealType != model.PDealType {
  166. continue
  167. }
  168. time.Sleep(1 * time.Second)
  169. cond := model.AppealSearchCond{
  170. Fields: []string{"id"},
  171. Bid: []int{attr.Bid},
  172. AssignState: []int8{model.AssignStateNotDispatch},
  173. AuditState: []int8{model.AuditStateInvalid},
  174. TransferState: []int8{model.TransferStatePendingSystemReply, model.TransferStatePendingSystemNotReply},
  175. PS: 99,
  176. PN: 1,
  177. Order: "id",
  178. Sort: "desc",
  179. }
  180. errGroup.Go(func(ctx context.Context) error {
  181. return s.enterPool(ctx, cond)
  182. })
  183. }
  184. errGroup.Wait()
  185. }
  186. }
  187. func (s *Service) enterPool(c context.Context, cond model.AppealSearchCond) (err error) {
  188. var res *model.AppealSearchRes
  189. if res, err = s.dao.SearchAppeal(c, cond); err != nil {
  190. log.Error("s.dao.SearchAppeal error(%+v)", cond)
  191. return
  192. }
  193. ids := make([]int64, len(res.Result))
  194. for _, r := range res.Result {
  195. ids = append(ids, r.ID)
  196. }
  197. var appeals []*model.Appeal
  198. if appeals, err = s.dao.Appeals(c, ids); err != nil {
  199. log.Error("s.dao.Appeals(%v) error(%v)", ids, err)
  200. return
  201. }
  202. ApIDMap := make(map[int64]int64) //map[ap_id]weight
  203. ApIDs := make([]int64, 0)
  204. // check state
  205. for _, ap := range appeals {
  206. if ap.AssignState == model.AssignStateNotDispatch && ap.AuditState == model.AuditStateInvalid && ap.TransferState == model.TransferStatePendingSystemReply {
  207. ApIDMap[ap.ApID] = ap.Weight
  208. ApIDs = append(ApIDs, ap.ApID)
  209. }
  210. }
  211. if len(ApIDs) == 0 {
  212. log.Warn("bid(%v) not found apids after check db should enter pool!", cond.Bid)
  213. return
  214. }
  215. log.Info("bid(%v) apids(%v) ApIDMap(%v) should set into mission pool", cond.Bid, ApIDs, ApIDMap)
  216. if err = s.dao.SetAppealAssignState(c, ApIDs, model.AssignStatePushed); err != nil {
  217. log.Error("s.dao.SetAppealAssignState(%v) err(%v)", ApIDs, err)
  218. return
  219. }
  220. //set sorted set
  221. if err = s.dao.SetWeightSortedSet(c, cond.Bid[0], ApIDMap); err != nil {
  222. log.Error("s.dao.SetWeightSortedSet() error(%v)", err)
  223. }
  224. return
  225. }
  226. // 刷新权重值 每3分钟刷新一次
  227. func (s *Service) refreshWeightproc() {
  228. errGroup := errgroup.Group{}
  229. for {
  230. time.Sleep(3 * time.Second) // todo 3 min
  231. for _, attr := range s.businessAttr {
  232. if attr.DealType != model.PDealType {
  233. continue
  234. }
  235. time.Sleep(1 * time.Second)
  236. cond := model.AppealSearchCond{
  237. Bid: []int{attr.Bid},
  238. Fields: []string{"id", "mid", "weight"},
  239. AuditState: []int8{model.AuditStateInvalid},
  240. TransferState: []int8{model.TransferStatePendingSystemReply},
  241. PS: 100,
  242. PN: 1,
  243. Order: "id",
  244. Sort: "desc",
  245. }
  246. errGroup.Go(func(ctx context.Context) error {
  247. time.Sleep(3 * time.Second)
  248. return s.refreshWeight(ctx, cond)
  249. })
  250. }
  251. errGroup.Wait()
  252. }
  253. }
  254. func (s *Service) refreshWeight(c context.Context, cond model.AppealSearchCond) (err error) {
  255. var res *model.AppealSearchRes
  256. if res, err = s.dao.SearchAppeal(c, cond); err != nil {
  257. log.Error("d.SearchAppeal error(%+v)", cond)
  258. return
  259. }
  260. appeals := make([]*model.Appeal, 0, len(res.Result))
  261. apIDs := make([]int64, 0, len(res.Result))
  262. newWeight := make(map[int64]int64, len(res.Result))
  263. newWeightInSortedSet := make(map[int64]int64, len(res.Result))
  264. for _, ap := range res.Result {
  265. appeals = append(appeals, &model.Appeal{
  266. ApID: ap.ID,
  267. Mid: ap.Mid,
  268. Weight: ap.Weight,
  269. })
  270. apIDs = append(apIDs, ap.ID)
  271. }
  272. if len(appeals) == 0 {
  273. log.Warn("no appeal is in feedback")
  274. return
  275. }
  276. var params []int64
  277. if params, err = s.dao.UperInfoCache(c, apIDs); err != nil { // 读用户维度的权重参数
  278. log.Error("s.dao.UperInfoCache(%v) error(%v)", apIDs, err)
  279. return
  280. }
  281. // fixme cache miss
  282. if len(params) != len(apIDs) {
  283. log.Warn("len params not equre len mids")
  284. return
  285. }
  286. for i, ap := range appeals {
  287. p := params[i]
  288. incr := s.calcWeight(ap, p)
  289. newWeight[ap.ApID] = ap.Weight + incr
  290. if ap.AssignState == model.AssignStatePushed { // should rewrite weight in db
  291. newWeightInSortedSet[ap.ApID] = newWeight[ap.ApID]
  292. }
  293. }
  294. log.Info("appeals in feedback weight(%+v) mids(%v)", newWeight, apIDs)
  295. tx := s.dao.WriteORM.Begin()
  296. if err = tx.Error; err != nil {
  297. log.Error("s.dao.WriteORM.Begin() error(%v)", err)
  298. return
  299. }
  300. if err = s.dao.TxSetWeight(tx, newWeight); err != nil {
  301. log.Error("s.dao.SetWeight(%v) error(%v)", newWeight, err)
  302. tx.Rollback()
  303. return
  304. }
  305. if err = tx.Commit().Error; err != nil {
  306. log.Error("tx.Commit() error:%v", err)
  307. tx.Rollback()
  308. return
  309. }
  310. // async write sorted set
  311. if len(newWeightInSortedSet) == 0 {
  312. return
  313. }
  314. _ = s.cache.Do(c, func(c context.Context) {
  315. if err = s.dao.SetWeightSortedSet(c, cond.Bid[0], newWeightInSortedSet); err != nil {
  316. log.Error("s.dao.SetWeightSortedSet() error(%v)", err)
  317. }
  318. log.Info("appeals in feedback sorted set bid(%d) weight(%+v)", cond.Bid[0], newWeightInSortedSet)
  319. })
  320. return
  321. }
  322. func (s *Service) delRelatedMissions(c context.Context, bid int, delIDs []int64) (err error) {
  323. cond := model.AppealSearchCond{
  324. Fields: []string{"id", "transfer_admin"},
  325. IDs: delIDs,
  326. Order: "id",
  327. Sort: "asc",
  328. PS: 100,
  329. PN: 1,
  330. }
  331. var res *model.AppealSearchRes
  332. if res, err = s.dao.SearchAppeal(c, cond); err != nil {
  333. log.Error("s.dao.SearchAppeal() error:%v")
  334. return
  335. }
  336. tadmin := make(map[int][]int64)
  337. for _, r := range res.Result {
  338. if _, ok := tadmin[r.TransferAdmin]; !ok {
  339. tadmin[r.TransferAdmin] = make([]int64, 0)
  340. }
  341. tadmin[r.TransferAdmin] = append(tadmin[r.TransferAdmin], r.ID)
  342. }
  343. for uid, ids := range tadmin {
  344. if err = s.dao.DelRelatedMissions(c, bid, uid, ids); err != nil {
  345. log.Error("s.dao.DelRelatedMissions() bid(%d) uid(%d) ap_ids(%v) error:%v", bid, uid, ids, err)
  346. }
  347. time.Sleep(100 * time.Millisecond)
  348. }
  349. return
  350. }
  351. // calcWeight 计算权重值
  352. func (s *Service) calcWeight(ap *model.Appeal, p int64) (incr int64) {
  353. switch p { // 粉丝/特殊用户组维度
  354. case 1:
  355. incr += 8
  356. case 2:
  357. incr += 10
  358. case 3:
  359. incr += 12
  360. case 4:
  361. incr += 15
  362. case 5:
  363. incr += 18
  364. default:
  365. incr += 8
  366. }
  367. switch { // 时间维度
  368. case time.Since(ap.CTime.Time()) < 3*time.Minute:
  369. incr += 0
  370. case time.Since(ap.CTime.Time()) >= 3*time.Minute && time.Since(ap.CTime.Time()) < 6*time.Minute:
  371. incr += 3
  372. case time.Since(ap.CTime.Time()) >= 6*time.Minute && time.Since(ap.CTime.Time()) < 9*time.Minute:
  373. incr += 6
  374. case time.Since(ap.CTime.Time()) >= 9*time.Minute && time.Since(ap.CTime.Time()) < 15*time.Minute:
  375. incr += 9
  376. case time.Since(ap.CTime.Time()) >= 15*time.Minute:
  377. incr += 12
  378. default:
  379. incr += 0
  380. }
  381. return incr
  382. }