active.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package offer
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "go-common/app/job/main/app-wall/model"
  8. "go-common/app/job/main/app-wall/model/offer"
  9. "go-common/library/log"
  10. )
  11. func (s *Service) activeConsumer() {
  12. defer s.waiter.Done()
  13. LOOP:
  14. for {
  15. select {
  16. case err := <-s.consumer.Errors():
  17. log.Error("group(%s) topic(%s) addr(%s) catch error(%+v)", s.c.Consumer.Group, s.c.Consumer.Topic, s.c.Consumer.Brokers, err)
  18. continue
  19. case notify := <-s.consumer.Notifications():
  20. log.Info("notification(%v)", notify)
  21. continue
  22. case msg, ok := <-s.consumer.Messages():
  23. if !ok {
  24. log.Error("active consumer exit!")
  25. break LOOP
  26. }
  27. s.consumer.MarkOffset(msg, "")
  28. active, err := s.checkMsgIllegal(msg.Value)
  29. if err != nil {
  30. log.Error("s.checkMsgIllegal(%s) error(%v)", msg.Value, err)
  31. continue
  32. }
  33. if active == nil {
  34. continue
  35. }
  36. s.activeChan <- active
  37. }
  38. }
  39. }
  40. func (s *Service) checkMsgIllegal(msg []byte) (active *offer.ActiveMsg, err error) {
  41. var (
  42. msgs []string
  43. pid int64
  44. os string
  45. androidid string
  46. imei string
  47. )
  48. msgs = strings.Split(string(msg), "|")
  49. if len(msgs) < 9 {
  50. err = fmt.Errorf("active msg(%s) split len(%d)<9", msg, len(msgs))
  51. return
  52. }
  53. if pid, err = strconv.ParseInt(msgs[8], 10, 64); err != nil {
  54. return
  55. }
  56. if pid%10 == 3 {
  57. os = model.TypeAndriod
  58. if len(msgs) > 22 {
  59. androidid = msgs[22]
  60. }
  61. if len(msgs) > 23 {
  62. imei = msgs[23]
  63. }
  64. if imei == "" {
  65. log.Warn("active msg(%s) imei(%s) is illegal", msg, imei)
  66. } else {
  67. log.Warn("active msg(%s) imei(%s) is legal", msg, imei)
  68. }
  69. if androidid == "" {
  70. log.Warn("active msg(%s) androidid(%s) is illegal", msg, androidid)
  71. }
  72. if androidid == "" && imei == "" {
  73. err = fmt.Errorf("active msg(%s) androidid(%s) and imei(%s) is illegal", msg, androidid, imei)
  74. return
  75. }
  76. } else {
  77. err = fmt.Errorf("active msg(%s) pid(%d) platform not android", msg, pid)
  78. return
  79. }
  80. active = &offer.ActiveMsg{OS: os, IMEI: imei, Androidid: androidid, Mac: ""}
  81. return
  82. }
  83. func (s *Service) activeproc() {
  84. defer s.waiter.Done()
  85. for {
  86. msg, ok := <-s.activeChan
  87. if !ok {
  88. log.Error("active chan id closed")
  89. break
  90. }
  91. s.active(msg)
  92. }
  93. }
  94. func (s *Service) active(msg *offer.ActiveMsg) {
  95. var err error
  96. c := context.TODO()
  97. if err = retry(func() (err error) {
  98. return s.dao.Active(c, msg.OS, msg.IMEI, msg.Androidid, msg.Mac, "")
  99. }, _upActiveRetry, _sleep); err != nil {
  100. log.Error("%+v", err)
  101. if err = s.syncRetry(c, offer.ActionActive, msg.OS, msg.IMEI, msg.Androidid, msg.Mac); err != nil {
  102. log.Error("%+v", err)
  103. }
  104. return
  105. }
  106. log.Info("active device os(%s) imei(%s) androidid(%s) mac(%s) success", msg.OS, msg.IMEI, msg.Androidid, msg.Mac)
  107. }