callback.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "go-common/app/interface/main/push/dao"
  9. "go-common/app/service/main/push/dao/huawei"
  10. "go-common/app/service/main/push/dao/jpush"
  11. "go-common/app/service/main/push/dao/mi"
  12. "go-common/app/service/main/push/dao/oppo"
  13. pushmdl "go-common/app/service/main/push/model"
  14. "go-common/library/log"
  15. )
  16. func (s *Service) callbackproc() {
  17. defer s.waiter.Done()
  18. var data []*pushmdl.Callback
  19. for {
  20. v, ok := <-s.callbackCh
  21. if !ok {
  22. log.Info("callbackCh has been closed.")
  23. if len(data) > 0 {
  24. s.sendCallback(data)
  25. }
  26. return
  27. }
  28. data = append(data, v)
  29. if len(data) >= s.c.Push.CallbackSize {
  30. s.sendCallback(data)
  31. data = []*pushmdl.Callback{}
  32. }
  33. }
  34. }
  35. func (s *Service) sendCallback(v []*pushmdl.Callback) (err error) {
  36. for i := 0; i < 3; i++ {
  37. if err = s.dao.PubCallback(context.TODO(), v); err == nil {
  38. break
  39. }
  40. time.Sleep(20 * time.Millisecond)
  41. }
  42. return
  43. }
  44. func (s *Service) addCallbackChan(cb *pushmdl.Callback) (err error) {
  45. if s.closed {
  46. log.Warn("addCallbackChan, channel is closed")
  47. return
  48. }
  49. select {
  50. case s.callbackCh <- cb:
  51. default:
  52. err = errors.New("callbackCh full")
  53. log.Error("callbackCh full. data(%+v)", cb)
  54. dao.PromError("callbackCh full")
  55. }
  56. return
  57. }
  58. // CallbackXiaomiRegid xiaomi regid callback.
  59. func (s *Service) CallbackXiaomiRegid(c context.Context, cb *mi.RegidCallback) (err error) {
  60. // 小米token注册回调,暂时没用
  61. log.Info("s.CallbackXiaomiRegid(%+v)", cb)
  62. return
  63. }
  64. // CallbackHuawei huawei callback.
  65. func (s *Service) CallbackHuawei(c context.Context, hcb *huawei.Callback) (err error) {
  66. for _, v := range hcb.Statuses {
  67. log.Info("huawei callback task(%s) token(%s)", v.BiTag, v.Token)
  68. appid, _ := strconv.ParseInt(v.AppID, 10, 64)
  69. cb := &pushmdl.Callback{
  70. Task: v.BiTag,
  71. APP: appid,
  72. Platform: pushmdl.PlatformHuawei,
  73. Pid: pushmdl.MobiAndroid,
  74. Token: v.Token,
  75. Extra: &pushmdl.CallbackExtra{Status: v.Status},
  76. }
  77. s.addCallbackChan(cb)
  78. }
  79. return
  80. }
  81. // CallbackXiaomi xiaomi callback.
  82. func (s *Service) CallbackXiaomi(c context.Context, m map[string]*mi.Callback) (err error) {
  83. for _, v := range m {
  84. log.Info("callback xiaomi task(%s)", v.Jobkey)
  85. barStatus := mi.CallbackBarStatusEnable
  86. if v.BarStatus == mi.CallbackBarStatusDisableStr {
  87. barStatus = mi.CallbackBarStatusDisable
  88. } else if v.BarStatus == mi.CallbackBarStatusUnknownStr {
  89. barStatus = mi.CallbackBarStatusUnknown
  90. }
  91. sp := strings.Split(v.Targets, ",")
  92. appid, _ := strconv.ParseInt(v.Param, 10, 64)
  93. for _, t := range sp {
  94. if t == "" {
  95. continue
  96. }
  97. cb := &pushmdl.Callback{
  98. Task: v.Jobkey,
  99. APP: appid,
  100. Platform: pushmdl.PlatformXiaomi,
  101. Pid: pushmdl.MobiAndroid,
  102. Token: t,
  103. Extra: &pushmdl.CallbackExtra{Status: barStatus},
  104. }
  105. log.Info("xiaomi callback task(%s) token(%s)", v.Jobkey, t)
  106. s.addCallbackChan(cb)
  107. }
  108. }
  109. return
  110. }
  111. // CallbackOppo oppo callback.
  112. func (s *Service) CallbackOppo(c context.Context, task string, cbs []*oppo.Callback) (err error) {
  113. for _, v := range cbs {
  114. for _, t := range strings.Split(v.Tokens, ",") {
  115. log.Info("oppo callback task(%s) token(%s)", task, t)
  116. cb := &pushmdl.Callback{
  117. Task: task,
  118. Platform: pushmdl.PlatformOppo,
  119. Pid: pushmdl.MobiAndroid,
  120. Token: t,
  121. }
  122. s.addCallbackChan(cb)
  123. }
  124. }
  125. return
  126. }
  127. // CallbackJpush jpush callback batch.
  128. func (s *Service) CallbackJpush(c context.Context, cbs []*jpush.CallbackReply) (err error) {
  129. for _, cb := range cbs {
  130. var (
  131. task string
  132. appid int64
  133. )
  134. if cb.Params != nil {
  135. task = cb.Params["task"]
  136. appid, _ = strconv.ParseInt(cb.Params["appid"], 10, 64)
  137. }
  138. log.Info("jpush callback task(%s) token(%s) channel(%d)", task, cb.Token, cb.Channel)
  139. status := jpush.StatusSwitchOn
  140. if !cb.Switch {
  141. status = jpush.StatusSwitchOff
  142. }
  143. s.addCallbackChan(&pushmdl.Callback{
  144. Task: task,
  145. APP: appid,
  146. Platform: pushmdl.PlatformJpush,
  147. Pid: pushmdl.MobiAndroid,
  148. Token: cb.Token,
  149. Extra: &pushmdl.CallbackExtra{Status: status, Channel: cb.Channel},
  150. })
  151. }
  152. return
  153. }
  154. // CallbackIOS ios arrived callback.
  155. func (s *Service) CallbackIOS(c context.Context, task, token string, pid int) (err error) {
  156. cb := &pushmdl.Callback{
  157. Task: task,
  158. Pid: pid,
  159. Token: token,
  160. }
  161. err = s.addCallbackChan(cb)
  162. return
  163. }
  164. // CallbackClick click callback.
  165. func (s *Service) CallbackClick(c context.Context, cb *pushmdl.Callback) error {
  166. return s.addCallbackChan(cb)
  167. }