task.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "hash/crc32"
  7. "time"
  8. "go-common/app/job/main/creative/model"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. //commitTask for commit task
  14. func (s *Service) commitTask() {
  15. for i := 0; i < s.c.Task.TableJobNum; i++ {
  16. go func(i int) {
  17. s.dispatchData(fmt.Sprintf("%02d", i))
  18. }(i)
  19. }
  20. }
  21. //shardingQueueIndex sharding queue index
  22. func (s *Service) shardingQueueIndex(name string, ql int) (i int) { ////注:使用校验和取模的原因是:使得获取的消息均匀散入不同的worker队列中
  23. ch := crc32.ChecksumIEEE([]byte(name))
  24. i = int(ch) % ql
  25. return
  26. }
  27. func (s *Service) dispatchData(index string) { //index 分表后缀名
  28. var id int64
  29. limit := s.c.Task.RowLimit
  30. for {
  31. res, err := s.newc.UserTasks(context.Background(), index, id, limit)
  32. if err != nil {
  33. log.Error("s.newc.UserTasks table index(%s)|id(%d)|limit(%d)|err(%v)", index, id, limit, err)
  34. return
  35. }
  36. if len(res) == 0 {
  37. time.Sleep(600 * time.Second)
  38. id = 0 //reset id
  39. continue
  40. }
  41. id = res[len(res)-1].ID
  42. tks := make([]*model.UserTask, 0, len(res))
  43. for _, v := range res {
  44. target, ok := s.TaskMapCache[v.TaskID]
  45. if !ok || target == nil {
  46. continue
  47. }
  48. //过滤 非T+1的任务
  49. if target.TargetType != model.TargetType004 &&
  50. target.TargetType != model.TargetType005 &&
  51. target.TargetType != model.TargetType006 &&
  52. target.TargetType != model.TargetType007 &&
  53. target.TargetType != model.TargetType008 &&
  54. target.TargetType != model.TargetType009 {
  55. continue
  56. }
  57. tks = append(tks, v)
  58. }
  59. if len(tks) > 0 {
  60. s.taskQueue[s.shardingQueueIndex(index, s.c.Task.TableConsumeNum)] <- tks
  61. }
  62. time.Sleep(5 * time.Second)
  63. }
  64. }
  65. func (s *Service) initTaskQueue() {
  66. for i := 0; i < s.c.Task.TableConsumeNum; i++ {
  67. ut := make(chan []*model.UserTask, s.chanSize)
  68. s.taskQueue[i] = ut
  69. go func(ch chan []*model.UserTask) {
  70. s.updateTaskStateByGRPC(ch)
  71. }(ut)
  72. }
  73. }
  74. //updateTaskStateByGRPC for check task by call grpc.
  75. func (s *Service) updateTaskStateByGRPC(c chan []*model.UserTask) {
  76. for msg := range c {
  77. for _, v := range msg {
  78. mid, tid := v.MID, v.TaskID
  79. reply, err := s.newc.CheckTaskState(context.Background(), mid, tid)
  80. if err != nil {
  81. if ec := ecode.Cause(err); ec.Code() == ecode.ServiceUnavailable.Code() {
  82. log.Error("s.newc.CheckTaskState mid(%d)|task id(%d)|err(%v)", mid, tid, err)
  83. return
  84. }
  85. log.Warn("s.newc.CheckTaskState mid(%d)|task id(%d)|err(%v)", mid, tid, err)
  86. continue
  87. }
  88. log.Info("updateTaskStateByGRPC mid(%d)|task id(%d)", mid, tid)
  89. if reply != nil && reply.FinishState {
  90. _, err := s.newc.UpUserTask(context.Background(), mid, tid)
  91. if err != nil {
  92. log.Error("DriveStateByUser s.newc.UpUserTask mid(%d)|task id(%d)|err(%v)", mid, tid, err)
  93. return
  94. }
  95. }
  96. time.Sleep(50 * time.Millisecond)
  97. }
  98. }
  99. }
  100. func (s *Service) initStatViewQueue() {
  101. for i := 0; i < s.statViewQueueLen; i++ {
  102. view := make(chan *model.StatView, s.chanSize)
  103. s.statViewSubQueue[i] = view
  104. go func(m chan *model.StatView) { //播放
  105. for v := range m {
  106. log.Info("StatView v(%+v)|指标:该UID下任意avid的获得-点击量(%d)", v, v.Count)
  107. s.completeUserTask(s.getMIDByAID(v.ID), v.ID, model.TargetType015, v.Count)
  108. time.Sleep(time.Millisecond * 10)
  109. }
  110. }(view)
  111. }
  112. }
  113. func (s *Service) initStatLikeQueue() {
  114. for i := 0; i < s.statLikeQueueLen; i++ {
  115. li := make(chan *model.StatLike, s.chanSize)
  116. s.statLikeSubQueue[i] = li
  117. go func(m chan *model.StatLike) { //点赞
  118. for v := range m {
  119. log.Info("StatLike v(%+v)|指标:该UID下任意avid的获得-点赞量(%d)", v, v.Count)
  120. s.completeUserTask(s.getMIDByAID(v.ID), v.ID, model.TargetType020, v.Count)
  121. time.Sleep(time.Millisecond * 10)
  122. }
  123. }(li)
  124. }
  125. }
  126. func (s *Service) initDatabusQueue() {
  127. for i := 0; i < s.databusQueueLen; i++ {
  128. tk := make(chan *databus.Message, s.chanSize)
  129. sh := make(chan *model.ShareMsg, s.chanSize)
  130. fl := make(chan *model.Stat, s.chanSize) //粉丝数
  131. rela := make(chan *model.Relation, s.chanSize) //关注
  132. np := make(chan *model.Up, s.chanSize) //最新投稿
  133. op := make(chan *model.Up, s.chanSize) //最新投稿
  134. mp := make(chan *model.Up, s.chanSize) //手机投稿
  135. //单个稿件计数
  136. stsh := make(chan *model.StatShare, s.chanSize)
  137. coin := make(chan *model.StatCoin, s.chanSize)
  138. fav := make(chan *model.StatFav, s.chanSize)
  139. rep := make(chan *model.StatReply, s.chanSize)
  140. dm := make(chan *model.StatDM, s.chanSize)
  141. s.taskSubQueue[i] = tk
  142. s.shareSubQueue[i] = sh
  143. s.followerQueue[i] = fl //粉丝
  144. s.relationQueue[i] = rela //关注
  145. s.newUpQueue[i] = np //新投稿
  146. s.oldUpQueue[i] = op //投下5个稿
  147. s.mobileUpQueue[i] = mp //手机投稿
  148. //单个稿件计数
  149. s.statShareSubQueue[i] = stsh
  150. s.statCoinSubQueue[i] = coin
  151. s.statFavSubQueue[i] = fav
  152. s.statReplySubQueue[i] = rep
  153. s.statDMSubQueue[i] = dm
  154. //水印设置、观看创作学院视频、参加激励计划
  155. go func(m chan *databus.Message) {
  156. s.startByTask(m)
  157. }(tk)
  158. //分享自己的稿件
  159. go func(m chan *model.ShareMsg) {
  160. for v := range m {
  161. log.Info("startByShare mid(%d)|v(%+v)|指标:该UID分享自己视频的次数≥1", v.MID, v)
  162. s.completeUserTask(v.MID, v.OID, model.TargetType002, 1)
  163. time.Sleep(time.Millisecond * 10)
  164. }
  165. }(sh)
  166. //关注哔哩哔哩创作中心和粉丝数判断
  167. go func(m chan *model.Stat) {
  168. for v := range m {
  169. log.Info("followerStat mid(%d)|v(%+v)|指标:该UID的粉丝数≥10 或者 1000", v.MID, v)
  170. s.completeUserTask(v.MID, 0, model.TargetType010, v.Follower) //新手任务粉丝数
  171. s.completeUserTask(v.MID, 0, model.TargetType022, v.Follower) //进阶任务粉丝数
  172. }
  173. }(fl)
  174. go func(m chan *model.Relation) {
  175. for v := range m {
  176. log.Info("relationMID mid(%d)|v(%+v)|指标:该UID的关注列表含有“哔哩哔哩创作中心", v.MID, v)
  177. s.completeUserTask(v.MID, 0, model.TargetType012, 1)
  178. }
  179. }(rela)
  180. // 该UID下开放浏览的稿件≥1
  181. go func(m chan *model.Up) {
  182. for v := range m {
  183. log.Info("newUP mid(%d)|v(%+v)|指标:该UID下开放浏览的稿件≥1", v.MID, v)
  184. s.completeUserTask(v.MID, v.AID, model.TargetType001, 1)
  185. }
  186. }(np)
  187. // 该UID下开放浏览的稿件≥5
  188. go func(m chan *model.Up) {
  189. for v := range m {
  190. log.Info("oldUP mid(%d)|v(%+v)|指标:该UID下开放浏览的稿件≥5", v.MID, v)
  191. s.completeUserTask(v.MID, v.AID, model.TargetType014, 1)
  192. }
  193. }(op)
  194. //单个稿件计数
  195. go func(m chan *model.StatReply) {
  196. for v := range m {
  197. log.Info("StatReply v(%+v)|指标:该UID下任意avid的获得-评论量(%d)", v, v.Count)
  198. s.completeUserTask(s.getMIDByAID(v.ID), v.ID, model.TargetType016, v.Count)
  199. time.Sleep(time.Millisecond * 10)
  200. }
  201. }(rep)
  202. go func(m chan *model.StatShare) {
  203. for v := range m {
  204. log.Info("StatShare v(%+v)|指标:该UID下任意avid的获得-分享量(%d)", v, v.Count)
  205. s.completeUserTask(s.getMIDByAID(v.ID), v.ID, model.TargetType017, v.Count)
  206. time.Sleep(time.Millisecond * 10)
  207. }
  208. }(stsh)
  209. go func(m chan *model.StatFav) {
  210. for v := range m {
  211. log.Info("StatFav v(%+v)|指标:该UID下任意avid的获得-收藏量(%d)", v, v.Count)
  212. s.completeUserTask(s.getMIDByAID(v.ID), v.ID, model.TargetType018, v.Count)
  213. time.Sleep(time.Millisecond * 10)
  214. }
  215. }(fav)
  216. go func(m chan *model.StatCoin) {
  217. for v := range m {
  218. log.Info("StatCoin v(%+v)|指标:该UID下任意avid的获得-硬币量(%d)", v, v.Count)
  219. s.completeUserTask(s.getMIDByAID(v.ID), v.ID, model.TargetType019, v.Count)
  220. time.Sleep(time.Millisecond * 10)
  221. }
  222. }(coin)
  223. go func(m chan *model.StatDM) {
  224. for v := range m {
  225. log.Info("StatDM v(%+v)|指标:该UID下任意avid的获得-弹幕量(%d)", v, v.Count)
  226. s.completeUserTask(s.getMIDByAID(v.ID), v.ID, model.TargetType021, v.Count)
  227. time.Sleep(time.Millisecond * 10)
  228. }
  229. }(dm)
  230. go func(m chan *model.Up) {
  231. for v := range m {
  232. log.Info("Mobile mid(%d)|v(%+v)|指标:该UID通过手机投稿的稿件≥1", v.MID, v)
  233. s.completeUserTask(v.MID, v.AID, model.TargetType013, 1)
  234. time.Sleep(time.Millisecond * 10)
  235. }
  236. }(mp)
  237. }
  238. }
  239. func (s *Service) startByTask(c chan *databus.Message) {
  240. for msg := range c {
  241. v := &model.TaskMsg{}
  242. if err := json.Unmarshal(msg.Value, v); err != nil {
  243. log.Error("startByTask json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  244. continue
  245. }
  246. switch v.From {
  247. case model.MsgForWaterMark:
  248. log.Info("startByTask WaterMark mid(%d)|v(%+v)|指标:任务完成期间该UID的水印开关为打开状态", v.MID, v)
  249. s.completeUserTask(v.MID, 0, model.TargetType011, v.Count)
  250. case model.MsgForAcademyFavVideo:
  251. log.Info("startByTask AcademyFavVideo mid(%d)|v(%+v)|指标:该UID在创作学院的观看记录≥1", v.MID, v)
  252. s.completeUserTask(v.MID, 0, model.TargetType003, v.Count)
  253. case model.MsgForGrowAccount:
  254. log.Info("startByTask GrowAccount mid(%d)|v(%+v)|指标:该UID的激励计划状态为已开通", v.MID, v)
  255. s.completeUserTask(v.MID, 0, model.TargetType023, v.Count)
  256. case model.MsgForOpenFansMedal:
  257. log.Info("startByTask OpenFansMedal mid(%d)|v(%+v)|指标:该UID粉丝勋章为开启状态", v.MID, v)
  258. s.completeUserTask(v.MID, 0, model.TargetType024, v.Count)
  259. }
  260. time.Sleep(time.Millisecond * 10)
  261. }
  262. }
  263. func (s *Service) getMIDByAID(aid int64) (mid int64) {
  264. arc, err := s.arc.Archive(context.Background(), aid)
  265. if err != nil || arc == nil {
  266. log.Error("getMIDByAID s.arc.Archive aid(%d)|err(%v)", aid, err)
  267. return
  268. }
  269. mid = arc.Author.Mid
  270. return
  271. }
  272. func (s *Service) getTaskIDByTargetType(mid int64, ty int8) (tid int64) {
  273. userTasks, err := s.newc.UserTasksByMIDAndState(context.Background(), mid, model.TaskIncomplete)
  274. if err != nil {
  275. log.Error("s.newc.UserTasksByMIDAndState mid(%d)|err(%v)", mid, err)
  276. return
  277. }
  278. if len(userTasks) == 0 {
  279. return
  280. }
  281. for _, v := range userTasks {
  282. t, ok := s.TaskMapCache[v.TaskID]
  283. if ok && t.TargetType == ty {
  284. tid = v.TaskID
  285. break
  286. }
  287. }
  288. return
  289. }
  290. // completeUserTask update user task to complete
  291. func (s *Service) completeUserTask(mid, aid int64, ty int8, count int64) {
  292. tid := s.getTaskIDByTargetType(mid, ty)
  293. if tid == 0 {
  294. return
  295. }
  296. target, ok := s.TaskMapCache[tid]
  297. if !ok || target == nil {
  298. return
  299. }
  300. if count >= target.TargetValue {
  301. if _, err := s.newc.UpUserTask(context.Background(), mid, tid); err != nil {
  302. log.Error("s.newc.UpUserTask mid(%d)|tid(%d)|err(%v)", mid, tid, err)
  303. return
  304. }
  305. log.Info("completeUserTask mid(%d)|aid(%d)|count(%d)|taskID(%d)|targetType(%d)|targetValue(%d)", mid, aid, count, tid, ty, target.TargetValue)
  306. }
  307. }
  308. // 对第30天未完成新手任务的UP主,发送消息通知;记录时间点为用户加入任务成就的时间;该消息有且仅发送一次。
  309. func (s *Service) expireTaskNotify() {
  310. for i := 0; i < s.c.Task.TaskTableJobNum; i++ {
  311. go func(i int) {
  312. s.dispatchTasksNotify(fmt.Sprintf("%02d", i))
  313. }(i)
  314. }
  315. }
  316. func (s *Service) dispatchTasksNotify(index string) {
  317. var id int64
  318. ext := s.c.Task.TaskExpireTime
  319. th := s.c.Task.TaskSendHour
  320. tm := s.c.Task.TaskSendMiniute
  321. ts := s.c.Task.TaskSendSecond
  322. limit := s.c.Task.TaskRowLimitNum
  323. batchSize := s.c.Task.TaskBatchMidNum //每次发送mid数量
  324. for {
  325. now := time.Now()
  326. if now.Hour() != th || now.Minute() != tm || now.Second() != ts {
  327. // log.Info("dispatchTasksNotify minuts(%d) second(%d)", now.Minute(), now.Second())
  328. time.Sleep(1 * time.Second)
  329. continue
  330. }
  331. ctime := now.Unix() - ext //检查任务是否超过30天未完成
  332. year, month, day := time.Unix(ctime, 0).Date()
  333. start := time.Date(year, month, day, 0, 0, 0, 0, time.Local).Format("2006-01-02 15:04:05")
  334. end := time.Date(year, month, day, 23, 59, 59, 999, time.Local).Format("2006-01-02 15:04:05")
  335. log.Info("dispatchTasksNotify now(%s)|start(%s)|end(%s)", now.Format("2006-01-02 15:04:05"), start, end)
  336. midMap := make(map[int64]*model.UserTask)
  337. for {
  338. res, err := s.newc.UserTasksNotify(context.Background(), index, id, start, end, limit)
  339. if err != nil {
  340. log.Error("s.newc.UserTasksNotify table index(%s)|start(%s)|end(%s)|limit(%d)|err(%v)", index, start, end, limit, err)
  341. return
  342. }
  343. if len(res) == 0 {
  344. id = 0
  345. break
  346. }
  347. for _, v := range res {
  348. midMap[v.MID] = v
  349. }
  350. id = res[len(res)-1].ID //next limit
  351. time.Sleep(1 * time.Second)
  352. }
  353. if len(midMap) == 0 {
  354. continue
  355. }
  356. mids := make([]int64, 0, len(midMap))
  357. for mid := range midMap {
  358. mids = append(mids, mid)
  359. }
  360. var tmids []int64
  361. count := len(mids)/batchSize + 1
  362. for i := 0; i < count; i++ {
  363. if i == count-1 {
  364. tmids = mids[i*batchSize:]
  365. } else {
  366. tmids = mids[i*batchSize : (i+1)*batchSize]
  367. }
  368. if len(tmids) > 0 {
  369. s.taskNotifyQueue[s.shardingQueueIndex(index, s.c.Task.TaskTableConsumeNum)] <- tmids
  370. }
  371. }
  372. }
  373. }
  374. func (s *Service) initTaskNotifyQueue() {
  375. for i := 0; i < s.c.Task.TaskTableConsumeNum; i++ {
  376. ut := make(chan []int64, s.chanSize)
  377. s.taskNotifyQueue[i] = ut
  378. go func(ch chan []int64) {
  379. s.sendTaskNotify(ch)
  380. }(ut)
  381. }
  382. }
  383. func (s *Service) sendTaskNotify(c chan []int64) {
  384. for mids := range c {
  385. if len(mids) == 0 {
  386. time.Sleep(time.Second * 60)
  387. continue
  388. }
  389. for i := 1; i < 3; i++ {
  390. if err := s.newc.SendNotify(context.Background(), mids, s.c.Task.TaskMsgCode, s.c.Task.TaskTitle, s.c.Task.TaskContent); err != nil {
  391. log.Error("sendTaskNotify s.newc.SendNotify(%v) error(%v)", mids, err)
  392. time.Sleep(time.Millisecond * 10)
  393. continue
  394. } else {
  395. log.Info("sendTaskNotify s.newc.SendNotify mids(%+v)", mids)
  396. break
  397. }
  398. }
  399. time.Sleep(time.Millisecond * 10)
  400. }
  401. }
  402. // loadproc
  403. func (s *Service) loadProc() {
  404. for {
  405. time.Sleep(3 * time.Minute)
  406. s.loadTasks()
  407. s.loadGiftRewards()
  408. }
  409. }
  410. //load tags
  411. func (s *Service) loadTasks() {
  412. res, err := s.newc.Tasks(context.Background())
  413. if err != nil {
  414. log.Error("s.newc.Tasks error(%v)", err)
  415. return
  416. }
  417. if len(res) == 0 {
  418. return
  419. }
  420. s.TaskCache = res
  421. temp := make(map[int64]*model.Task)
  422. for _, v := range s.TaskCache {
  423. temp[v.ID] = v
  424. }
  425. s.TaskMapCache = temp
  426. }
  427. //load gift-reward
  428. func (s *Service) loadGiftRewards() {
  429. res, err := s.newc.AllGiftRewards(context.Background())
  430. if err != nil {
  431. log.Error("s.newc.AllGiftRewards error(%v)", err)
  432. return
  433. }
  434. if len(res) == 0 {
  435. return
  436. }
  437. s.GiftRewardCache = res
  438. }
  439. // 检查用户是否有奖励可领取
  440. func (s *Service) checkRewardReceive(c context.Context, mid int64) (is bool) {
  441. tasks, err := s.newc.UserTasksByMID(c, mid)
  442. if err != nil {
  443. log.Error("s.newc.UserTasksByMID mid(%v)|error(%v)", mid, err)
  444. return
  445. }
  446. groupMap := make(map[int64][]*model.UserTask)
  447. giftMap := make(map[int8][]*model.UserTask)
  448. for _, v := range tasks {
  449. groupMap[v.TaskGroupID] = append(groupMap[v.TaskGroupID], v)
  450. if _, ok := s.GiftRewardCache[v.TaskType]; ok {
  451. giftMap[v.TaskType] = append(giftMap[v.TaskType], v)
  452. }
  453. }
  454. groupNum := 0 // 组奖励 已完成个数
  455. giftNum := make(map[int8]bool) // 礼包奖励 已完成个数
  456. for _, ts := range groupMap {
  457. for _, t := range ts {
  458. if t.State == model.TaskIncomplete {
  459. groupNum++
  460. if _, ok := s.GiftRewardCache[t.TaskType]; ok {
  461. giftNum[t.TaskType] = true
  462. }
  463. break
  464. }
  465. }
  466. }
  467. r1, err := s.newc.BaseRewardCount(c, mid) // 组奖励 已领取个数
  468. if err != nil {
  469. log.Error("s.newc.BaseRewardCount mid(%v)|error(%v)", mid, err)
  470. return
  471. }
  472. r2, err := s.newc.GiftRewardCount(c, mid) // 礼包奖励 已领取个数
  473. if err != nil {
  474. log.Error("s.newc.GiftRewardCount mid(%v)|error(%v)", mid, err)
  475. return
  476. }
  477. total := len(groupMap) + len(giftMap) //奖励总数
  478. untotal := groupNum + len(giftNum) //未完成的奖励
  479. receive := r1 + r2 //已领取奖励
  480. // 可领取的奖励 = 奖励总数 -未完成的奖励 - 已领取奖励
  481. count := total - untotal - receive
  482. log.Info("checkRewardReceive mid(%d)|奖励总数(%d)|未完成奖励总数(%d)|已领取奖励总数(%d)|可领取奖励总数(%d)", mid, total, untotal, receive, count)
  483. if count > 0 {
  484. is = true
  485. }
  486. return
  487. }
  488. // 该消息每周最多发送 1 条,发送时间为每周六的20:00,用户为上周周六18:00 - 本周周六17:59所有达到领取奖励且 未领取 的用户。
  489. // 通知仅限用户有未领取的奖励时发送:若在该时间段,用户已领取全部可领取的奖励,
  490. // 则不发送通知,如果用户已领取部分可领取的奖励,仍有部分奖励未领取,则仍然发送通知
  491. func (s *Service) rewardReceiveNotify() {
  492. for i := 0; i < s.c.Task.RewardTableJobNum; i++ {
  493. go func(i int) {
  494. s.dispatchRewardNotify(fmt.Sprintf("%02d", i))
  495. }(i)
  496. }
  497. }
  498. func (s *Service) dispatchRewardNotify(index string) {
  499. var id int64
  500. week := s.c.Task.RewardWeek //星期几
  501. ld := s.c.Task.RewardLastDay //从过去多少天开始查询
  502. lh := s.c.Task.RewardLastHour //几点开始查询
  503. lm := s.c.Task.RewardLastMiniute //几分开始查询
  504. ls := s.c.Task.RewardLastSecond //几秒开始查询
  505. nh := s.c.Task.RewardNowHour //从当前时间几点开始
  506. nm := s.c.Task.RewardNowMiniute //从当前时间几分开始
  507. ns := s.c.Task.RewardNowSecond //从当前时间几秒开始
  508. limit := s.c.Task.RewardRowLimitNum
  509. batchSize := s.c.Task.RewardBatchMidNum //每次发送mid数量
  510. for {
  511. now := time.Now()
  512. if int(now.Weekday()) != week || now.Hour() != nh || now.Minute() != nm || now.Second() != ns {
  513. // log.Info("dispatchRewardNotify Weekday(%d) Hour(%d) Minute(%d) Second(%d)", now.Weekday(), now.Hour(), now.Minute(), now.Second())
  514. time.Sleep(1 * time.Second)
  515. continue
  516. }
  517. last := now.AddDate(0, 0, ld).Add(time.Hour * time.Duration(lh)).Add(time.Minute * time.Duration(lm)).Add(time.Second * time.Duration(ls))
  518. log.Info("dispatchRewardNotify last(%s) now(%s)\n", last.Format("2006-01-02 15:04:05"), now.Format("2006-01-02 15:04:05"))
  519. midMap := make(map[int64]*model.UserTask)
  520. for {
  521. res, err := s.newc.CheckTasksForRewardNotify(context.Background(), index, id, last, now, limit)
  522. if err != nil {
  523. log.Error("s.newc.CheckTasksForRewardNotify table index(%s)|id(%d)|limit(%d)|err(%v)", index, id, limit, err)
  524. return
  525. }
  526. if len(res) == 0 {
  527. id = 0
  528. break
  529. }
  530. for _, v := range res {
  531. midMap[v.MID] = v
  532. }
  533. id = res[len(res)-1].ID //next limit
  534. time.Sleep(1 * time.Second)
  535. }
  536. if len(midMap) == 0 {
  537. continue
  538. }
  539. mids := make([]int64, 0, len(midMap))
  540. for mid := range midMap {
  541. if s.checkRewardReceive(context.Background(), mid) {
  542. mids = append(mids, mid)
  543. }
  544. }
  545. var tmids []int64
  546. count := len(mids)/batchSize + 1
  547. for i := 0; i < count; i++ {
  548. if i == count-1 {
  549. tmids = mids[i*batchSize:]
  550. } else {
  551. tmids = mids[i*batchSize : (i+1)*batchSize]
  552. }
  553. if len(tmids) > 0 {
  554. s.rewardNotifyQueue[s.shardingQueueIndex(index, s.c.Task.RewardTableConsumeNum)] <- tmids
  555. }
  556. }
  557. }
  558. }
  559. func (s *Service) initRewardNotifyQueue() {
  560. for i := 0; i < s.c.Task.RewardTableConsumeNum; i++ {
  561. ut := make(chan []int64, s.chanSize)
  562. s.rewardNotifyQueue[i] = ut
  563. go func(ch chan []int64) {
  564. s.sendRewardNotify(ch)
  565. }(ut)
  566. }
  567. }
  568. func (s *Service) sendRewardNotify(c chan []int64) {
  569. for mids := range c {
  570. if len(mids) == 0 {
  571. time.Sleep(time.Second * 1)
  572. continue
  573. }
  574. for i := 1; i < 3; i++ {
  575. if err := s.newc.SendNotify(context.Background(), mids, s.c.Task.RewardMsgCode, s.c.Task.RewardTitle, s.c.Task.RewardContent); err != nil {
  576. log.Error("sendRewardNotify s.newc.SendNotify mids(%+v) error(%v)", mids, err)
  577. time.Sleep(time.Millisecond * 100)
  578. continue
  579. } else {
  580. log.Info("sendRewardNotify s.newc.SendNotify mids(%+v)", mids)
  581. break
  582. }
  583. }
  584. time.Sleep(time.Millisecond * 10)
  585. }
  586. }