bnj.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync/atomic"
  7. "time"
  8. "go-common/app/admin/main/activity/model"
  9. "go-common/app/job/main/activity/model/bnj"
  10. "go-common/app/service/main/account/api"
  11. "go-common/library/log"
  12. )
  13. const (
  14. _preScore = 20000
  15. _stepOne = 1
  16. _stepTwo = 2
  17. _stepThree = 3
  18. _stepFour = 4
  19. _stepFlagValue = "1"
  20. )
  21. var bnjSteps = []int{_stepOne, _stepTwo, _stepThree, _stepFour}
  22. func (s *Service) bnjproc() {
  23. defer s.waiter.Done()
  24. var (
  25. c = context.Background()
  26. lastTs int64
  27. )
  28. for {
  29. if s.closed {
  30. return
  31. }
  32. if s.bnjTimeFinish == 1 {
  33. log.Warn("bnjproc bnjTimeFinish")
  34. return
  35. }
  36. msg, ok := <-s.bnjSub.Messages()
  37. if !ok {
  38. log.Info("bnjproc: databus consumer exit!")
  39. return
  40. }
  41. msg.Commit()
  42. m := &bnj.ResetMsg{}
  43. if err := json.Unmarshal(msg.Value, m); err != nil {
  44. log.Error("bnjproc json.Unmarshal(%s) error(%+v)", msg.Value, err)
  45. continue
  46. }
  47. if m.Mid <= 0 {
  48. continue
  49. }
  50. // broadcast max every 1s
  51. if m.Ts-lastTs < 1 {
  52. continue
  53. }
  54. lastTs = m.Ts
  55. atomic.StoreInt64(&s.bnjLessSecond, s.bnjMaxSecond)
  56. // default name
  57. pushMsg := &bnj.Push{Second: s.bnjLessSecond, Name: ""}
  58. if info, err := s.accClient.Info3(c, &api.MidReq{Mid: m.Mid}); err != nil || info == nil {
  59. log.Error("bnjproc s.accClient.Info3(%d) error(%v)", m.Mid, err)
  60. } else {
  61. var name []rune
  62. runes := []rune(info.Info.Name)
  63. nameLen := len(runes)
  64. if nameLen == 2 {
  65. name = append(runes[0:1], []rune("*")...)
  66. } else if nameLen > 2 {
  67. for i, v := range runes {
  68. if i == 0 {
  69. name = append(name, v)
  70. } else if i == nameLen-1 {
  71. name = append(name, runes[nameLen-1:]...)
  72. } else if i == 1 {
  73. name = append(name, []rune("*")...)
  74. }
  75. }
  76. } else {
  77. name = runes
  78. }
  79. pushMsg.Name = string(name)
  80. if pushStr, err := json.Marshal(pushMsg); err != nil {
  81. log.Error("bnjproc json.Marshal(%+v) error(%v)", pushMsg, err)
  82. } else {
  83. atomic.StoreInt64(&s.bnjLessSecond, s.bnjMaxSecond)
  84. log.Info("bnjproc mid(%d) reset lessTime(%d) maxTime(%d)", m.Mid, s.bnjLessSecond, s.bnjMaxSecond)
  85. if err := s.retryPushAll(context.Background(), string(pushStr), _retryTimes); err != nil {
  86. log.Error("bnjproc s.bnj.PushAll(%s) error(%v)", string(pushStr), err)
  87. }
  88. }
  89. }
  90. log.Info("bnjproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  91. }
  92. }
  93. func bnjWxFlagKey(lid int64, step int) string {
  94. return fmt.Sprintf("bnj_wx_%d_%d", lid, step)
  95. }
  96. func bnjMsgFlagKey(lid int64, step int) string {
  97. return fmt.Sprintf("bnj_msg_%d_%d", lid, step)
  98. }
  99. func (s *Service) initBnjSecond() {
  100. for {
  101. time.Sleep(time.Second)
  102. if time.Now().Unix() < s.c.Bnj2019.StartTime.Unix() {
  103. continue
  104. }
  105. break
  106. }
  107. if value, err := s.retryCacheTimeFinish(context.Background(), _retryTimes); err != nil {
  108. log.Error("initBnjSecond s.dao.retryCacheTimeFinish error(%v)", err)
  109. return
  110. } else if value > 0 {
  111. log.Warn("initBnjSecond time finish")
  112. atomic.StoreInt64(&s.bnjTimeFinish, value)
  113. return
  114. }
  115. // init step flag
  116. for _, v := range s.c.Bnj2019.Time {
  117. if v.Step > 0 {
  118. if value, err := s.retryRsGet(context.Background(), bnjMsgFlagKey(s.c.Bnj2019.LID, v.Step), _retryTimes); err != nil {
  119. log.Error("initBnjSecond msg s.dao.retryRsGet error(%v)")
  120. } else if value != "" {
  121. log.Info("initBnjSecond msg bnjMsgFlagMap[step:%d]", v.Step)
  122. s.bnjMsgFlagMap[v.Step] = 1
  123. }
  124. if value, err := s.retryRsGet(context.Background(), bnjWxFlagKey(s.c.Bnj2019.LID, v.Step), _retryTimes); err != nil {
  125. log.Error("initBnjSecond wx s.dao.retryRsGet error(%v)")
  126. } else if value != "" {
  127. log.Info("initBnjSecond wx bnjWxMsgFlagMap[step:%d]", v.Step)
  128. s.bnjWxMsgFlagMap[v.Step] = 1
  129. }
  130. }
  131. }
  132. scores, err := s.retryBatchLikeActSum(context.Background(), []int64{s.c.Bnj2019.LID}, _retryTimes)
  133. if err != nil {
  134. // TODO need to restart
  135. log.Error("initBnjSecond failed s.dao.BatchLikeActSum(%d) error(%v)", s.c.Bnj2019.LID, err)
  136. return
  137. }
  138. if score, ok := scores[s.c.Bnj2019.LID]; ok {
  139. for i, v := range s.c.Bnj2019.Time {
  140. if score >= v.Score {
  141. atomic.StoreInt64(&s.bnjMaxSecond, v.Second)
  142. atomic.StoreInt64(&s.bnjLessSecond, v.Second)
  143. break
  144. }
  145. if i == len(s.c.Bnj2019.Time)-1 {
  146. atomic.StoreInt64(&s.bnjMaxSecond, v.Second)
  147. atomic.StoreInt64(&s.bnjLessSecond, v.Second)
  148. }
  149. }
  150. } else {
  151. // max second
  152. atomic.StoreInt64(&s.bnjMaxSecond, s.c.Bnj2019.Time[len(s.c.Bnj2019.Time)-1].Second)
  153. atomic.StoreInt64(&s.bnjLessSecond, s.c.Bnj2019.Time[len(s.c.Bnj2019.Time)-1].Second)
  154. }
  155. if lessSecond, err := s.bnj.CacheLessTime(context.Background()); err != nil {
  156. log.Error("initBnjSecond s.dao.CacheLessTime error(%v)", err)
  157. } else if lessSecond > 0 {
  158. atomic.StoreInt64(&s.bnjLessSecond, lessSecond)
  159. }
  160. log.Warn("initBnjSecond maxSecond(%d) lessSecond(%d)", s.bnjMaxSecond, s.bnjLessSecond)
  161. go s.maxSecondproc()
  162. go s.lessSecondproc()
  163. s.waiter.Add(1)
  164. go s.bnjproc()
  165. }
  166. func (s *Service) maxSecondproc() {
  167. ctx := context.Background()
  168. for {
  169. if s.closed {
  170. return
  171. }
  172. time.Sleep(time.Second)
  173. if scores, err := s.dao.BatchLikeActSum(context.Background(), []int64{s.c.Bnj2019.LID}); err != nil {
  174. log.Error("maxSecondproc s.dao.BatchLikeActSum(%d) error(%v)", s.c.Bnj2019.LID, err)
  175. } else {
  176. if score, ok := scores[s.c.Bnj2019.LID]; ok {
  177. for _, v := range s.c.Bnj2019.Time {
  178. if score >= v.Score {
  179. atomic.StoreInt64(&s.bnjMaxSecond, v.Second)
  180. if s.bnjLessSecond > s.bnjMaxSecond {
  181. atomic.StoreInt64(&s.bnjLessSecond, s.bnjMaxSecond)
  182. }
  183. msg := v.Msg
  184. mc := v.MsgMc
  185. msgTitle := v.MsgTitle
  186. step := v.Step
  187. if step > 0 && s.bnjMsgFlagMap[step] == 0 {
  188. if err = s.retryRsSet(ctx, bnjMsgFlagKey(s.c.Bnj2019.LID, step), _stepFlagValue, _retryTimes); err != nil {
  189. log.Error("s.retryRsSet(%d,%d) error(%v)", s.c.Bnj2019.LID, step, err)
  190. break
  191. }
  192. if msg != "" && msgTitle != "" && mc != "" {
  193. go s.sendMessageToSubs(ctx, s.c.Bnj2019.LID, mc, msgTitle, msg, _retryTimes)
  194. } else {
  195. log.Error("bnj msg conf step(%d) error", step)
  196. break
  197. }
  198. log.Info("bnj send msg step:%d finish", step)
  199. s.bnjMsgFlagMu.Lock()
  200. s.bnjMsgFlagMap[step] = 1
  201. s.bnjMsgFlagMu.Unlock()
  202. }
  203. break
  204. }
  205. }
  206. for _, v := range s.c.Bnj2019.Time {
  207. if score+_preScore >= v.Score {
  208. wxMsg := v.WxMsg
  209. step := v.Step
  210. if step > 0 && s.bnjWxMsgFlagMap[step] == 0 {
  211. if err = s.retryRsSet(ctx, bnjWxFlagKey(s.c.Bnj2019.LID, step), _stepFlagValue, _retryTimes); err != nil {
  212. log.Error("s.retryRsSet(%d,%d) error(%v)", s.c.Bnj2019.LID, step, err)
  213. break
  214. }
  215. if wxMsg != "" && s.c.Bnj2019.WxUser != "" {
  216. if err = s.retrySendWechat(ctx, s.c.Bnj2019.WxTitle, wxMsg, s.c.Bnj2019.WxUser, _retryTimes); err != nil {
  217. log.Error("s.retrySendWechat(%s,%s) error(%v)", s.c.Bnj2019.WxTitle, wxMsg, err)
  218. break
  219. }
  220. } else {
  221. log.Error("bnj wx msg conf step(%d) error", step)
  222. break
  223. }
  224. log.Info("bnj send wx step:%d finish", step)
  225. s.bnjWxMsgFlagMu.Lock()
  226. s.bnjWxMsgFlagMap[step] = 1
  227. s.bnjWxMsgFlagMu.Unlock()
  228. }
  229. break
  230. }
  231. }
  232. } else {
  233. log.Warn("maxSecondproc lid not found")
  234. }
  235. }
  236. }
  237. }
  238. func (s *Service) lessSecondproc() {
  239. for {
  240. if s.closed {
  241. return
  242. }
  243. time.Sleep(time.Second)
  244. atomic.AddInt64(&s.bnjLessSecond, -1)
  245. if s.c.Bnj2019.GameCancel != 0 {
  246. log.Warn("lessSecondproc bnj game cancel")
  247. atomic.StoreInt64(&s.bnjLessSecond, 0)
  248. }
  249. if s.bnjLessSecond <= 0 {
  250. if err := s.retryAddCacheTimeFinish(context.Background(), 1, _retryTimes); err != nil {
  251. log.Error("lessSecondproc s.bnj.AddCacheTimeFinish error(%v)", err)
  252. continue
  253. }
  254. log.Warn("lessSecondproc bnj time Finish")
  255. atomic.StoreInt64(&s.bnjTimeFinish, 1)
  256. pushMsg := &bnj.Push{Second: 0, Name: "", TimelinePic: s.c.Bnj2019.TimelinePic, H5TimelinePic: s.c.Bnj2019.H5TimelinePic}
  257. if pushStr, err := json.Marshal(pushMsg); err != nil {
  258. log.Error("lessSecondproc json.Marshal(%+v) error(%v)", pushMsg, err)
  259. } else {
  260. atomic.StoreInt64(&s.bnjLessSecond, s.bnjMaxSecond)
  261. if err := s.retryPushAll(context.Background(), string(pushStr), _retryTimes); err != nil {
  262. log.Error("lessSecondproc s.bnj.PushAll error(%v)", err)
  263. }
  264. }
  265. return
  266. }
  267. pushMsg := &bnj.Push{Second: s.bnjLessSecond, Name: ""}
  268. if pushStr, err := json.Marshal(pushMsg); err != nil {
  269. log.Error("lessSecondproc json.Marshal(%+v) error(%v)", pushMsg, err)
  270. } else {
  271. if err := s.retryPushAll(context.Background(), string(pushStr), 1); err != nil {
  272. log.Error("lessSecondproc s.bnj.PushAll error(%v)", err)
  273. }
  274. }
  275. }
  276. }
  277. func (s *Service) retryBatchLikeActSum(c context.Context, lids []int64, retryCnt int) (res map[int64]int64, err error) {
  278. for i := 0; i < retryCnt; i++ {
  279. if res, err = s.dao.BatchLikeActSum(c, lids); err == nil {
  280. break
  281. }
  282. time.Sleep(100 * time.Millisecond)
  283. }
  284. return
  285. }
  286. func (s *Service) retryAddCacheTimeFinish(c context.Context, value int64, retryCnt int) (err error) {
  287. for i := 0; i < retryCnt; i++ {
  288. if err = s.bnj.AddCacheTimeFinish(c, value); err == nil {
  289. break
  290. }
  291. time.Sleep(100 * time.Millisecond)
  292. }
  293. return
  294. }
  295. func (s *Service) retryCacheTimeFinish(c context.Context, retryCnt int) (value int64, err error) {
  296. for i := 0; i < retryCnt; i++ {
  297. if value, err = s.bnj.CacheTimeFinish(c); err == nil {
  298. break
  299. }
  300. time.Sleep(100 * time.Millisecond)
  301. }
  302. return
  303. }
  304. func (s *Service) retryPushAll(c context.Context, msg string, retryCnt int) (err error) {
  305. for i := 0; i < retryCnt; i++ {
  306. if err = s.bnj.PushAll(c, msg); err == nil {
  307. break
  308. }
  309. time.Sleep(10 * time.Millisecond)
  310. }
  311. return
  312. }
  313. func (s *Service) cronInformationMessage() {
  314. log.Info("cronInformationMessage start cron")
  315. if s.c.Bnj2019.LID == 0 || s.c.Bnj2019.MidLimit == 0 {
  316. log.Error("cronInformationMessage conf error")
  317. return
  318. }
  319. var (
  320. c = context.Background()
  321. title, msg, mc string
  322. )
  323. for _, v := range s.c.Bnj2019.Message {
  324. if time.Now().Unix() >= v.Start.Unix() {
  325. title = v.Title
  326. msg = v.Content
  327. mc = v.Mc
  328. break
  329. }
  330. }
  331. if title == "" || msg == "" || mc == "" {
  332. log.Error("cronInformationMessage message conf error")
  333. return
  334. }
  335. s.sendMessageToSubs(c, s.c.Bnj2019.LID, mc, title, msg, _retryTimes)
  336. log.Info("cronInformationMessage finish title(%s)", title)
  337. }
  338. func (s *Service) sendMessageToSubs(c context.Context, lid int64, mc, title, msg string, retryCnt int) {
  339. var minID int64
  340. log.Info("sendMessageToSubs mc:%s title:%s start", mc, title)
  341. for {
  342. time.Sleep(100 * time.Millisecond)
  343. actions, err := s.retryLikeActList(c, lid, minID, s.c.Bnj2019.MidLimit, retryCnt)
  344. if err != nil {
  345. log.Error("sendMessageToSubs s.dao.LikeActList(%d,%d,%d) error(%v)", lid, minID, s.c.Bnj2019.MidLimit, err)
  346. continue
  347. }
  348. if len(actions) == 0 {
  349. log.Info("sendMessageToSubs finish")
  350. break
  351. }
  352. var mids []int64
  353. for i, v := range actions {
  354. if v.Mid > 0 {
  355. mids = append(mids, v.Mid)
  356. }
  357. if i == len(actions)-1 {
  358. minID = v.ID
  359. }
  360. }
  361. if len(mids) == 0 {
  362. continue
  363. }
  364. if err = s.retrySendMessage(c, mids, mc, title, msg, _retryTimes); err != nil {
  365. log.Error("sendMessageToSubs s.dao.SendMessage(mids:%v) error(%v)", mids, err)
  366. }
  367. }
  368. }
  369. func (s *Service) retryLikeActList(c context.Context, lid, minID, limit int64, retryCnt int) (list []*model.LikeAction, err error) {
  370. for i := 0; i < retryCnt; i++ {
  371. if list, err = s.dao.LikeActList(c, lid, minID, limit); err == nil {
  372. break
  373. }
  374. time.Sleep(10 * time.Millisecond)
  375. }
  376. return
  377. }
  378. func (s *Service) retrySendMessage(c context.Context, mids []int64, mc, title, msg string, retryCnt int) (err error) {
  379. for i := 0; i < retryCnt; i++ {
  380. if err = s.bnj.SendMessage(c, mids, mc, title, msg); err == nil {
  381. break
  382. }
  383. time.Sleep(10 * time.Millisecond)
  384. }
  385. return
  386. }
  387. func (s *Service) retryRsSet(c context.Context, key, value string, retryCnt int) (err error) {
  388. for i := 0; i < retryCnt; i++ {
  389. if err = s.dao.RsSet(c, key, value); err == nil {
  390. break
  391. }
  392. time.Sleep(10 * time.Millisecond)
  393. }
  394. return
  395. }
  396. func (s *Service) retryRsGet(c context.Context, key string, retryCnt int) (value string, err error) {
  397. for i := 0; i < retryCnt; i++ {
  398. if value, err = s.dao.RsGet(c, key); err == nil {
  399. break
  400. }
  401. time.Sleep(10 * time.Millisecond)
  402. }
  403. return
  404. }
  405. func (s *Service) retrySendWechat(c context.Context, title, msg, user string, retryCnt int) (err error) {
  406. for i := 0; i < retryCnt; i++ {
  407. if err = s.bnj.SendWechat(c, title, msg, user); err == nil {
  408. break
  409. }
  410. time.Sleep(10 * time.Millisecond)
  411. }
  412. return
  413. }