callback.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package service
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. "go-common/app/job/main/sms/dao/chuanglan"
  7. "go-common/app/job/main/sms/dao/mengwang"
  8. smsmdl "go-common/app/service/main/sms/model"
  9. "go-common/library/log"
  10. "go-common/library/sync/errgroup"
  11. )
  12. const (
  13. _callbackSize = 100
  14. )
  15. func (s *Service) dispatchCallback(provider int32) {
  16. switch provider {
  17. case smsmdl.ProviderChuangLan:
  18. s.waiter.Add(1)
  19. go s.callbackChuangLanproc()
  20. case smsmdl.ProviderMengWang:
  21. s.waiter.Add(1)
  22. go s.callbackMengWangproc()
  23. }
  24. }
  25. func (s *Service) callbackChuangLanproc() {
  26. defer s.waiter.Done()
  27. log.Info("callbackChuangLanproc start")
  28. group := errgroup.Group{}
  29. cli := chuanglan.NewClient(s.c)
  30. for {
  31. if s.closed {
  32. log.Info("callbackChuangLanproc exit")
  33. return
  34. }
  35. group.Go(func() error {
  36. callbacks, err := cli.Callback(context.Background(), s.c.Provider.ChuangLanSmsUser, s.c.Provider.ChuangLanSmsPwd, s.c.Provider.ChuangLanSmsCallbackURL, _callbackSize)
  37. if err != nil {
  38. time.Sleep(time.Second)
  39. return nil
  40. }
  41. s.sendChuangLanCallbacks(smsmdl.TypeSms, callbacks)
  42. return nil
  43. })
  44. group.Go(func() error {
  45. callbacks, err := cli.Callback(context.Background(), s.c.Provider.ChuangLanActUser, s.c.Provider.ChuangLanActPwd, s.c.Provider.ChuangLanActCallbackURL, _callbackSize)
  46. if err != nil {
  47. time.Sleep(time.Second)
  48. return nil
  49. }
  50. s.sendChuangLanCallbacks(smsmdl.TypeActSms, callbacks)
  51. return nil
  52. })
  53. group.Go(func() error {
  54. callbacks, err := cli.CallbackInternational(context.Background(), _callbackSize)
  55. if err != nil {
  56. time.Sleep(time.Second)
  57. return nil
  58. }
  59. s.sendChuangLanCallbacks(smsmdl.TypeSms, callbacks)
  60. return nil
  61. })
  62. group.Wait()
  63. time.Sleep(time.Second)
  64. }
  65. }
  66. func (s *Service) sendChuangLanCallbacks(typ int32, cbs []*chuanglan.Callback) (err error) {
  67. ts := time.Now().Unix()
  68. for _, cb := range cbs {
  69. if cb.NotifyTime != "" {
  70. if t, e := time.ParseInLocation("060102150405", cb.NotifyTime, time.Local); e != nil {
  71. log.Warn("sendChuangLanCallbacks(%+v) parse time error(%v)", cb, e)
  72. } else {
  73. ts = t.Unix()
  74. }
  75. }
  76. s.sendUserActionLog(&smsmdl.ModelUserActionLog{
  77. MsgID: cb.MsgID,
  78. Mobile: cb.Mobile,
  79. Status: cb.Status,
  80. Desc: cb.Desc,
  81. Provider: smsmdl.ProviderChuangLan,
  82. Type: typ,
  83. Action: smsmdl.UserActionCallback,
  84. Ts: ts,
  85. })
  86. }
  87. return
  88. }
  89. func (s *Service) callbackMengWangproc() {
  90. defer s.waiter.Done()
  91. log.Info("callbackMengWangproc start")
  92. group := errgroup.Group{}
  93. cli := mengwang.NewClient(s.c)
  94. for {
  95. if s.closed {
  96. log.Info("callbackMengWangproc exit")
  97. return
  98. }
  99. group.Go(func() error {
  100. callbacks, err := cli.Callback(context.Background(), s.c.Provider.MengWangSmsUser, s.c.Provider.MengWangSmsPwd, s.c.Provider.MengWangSmsCallbackURL, _callbackSize)
  101. if err != nil {
  102. time.Sleep(time.Second)
  103. return nil
  104. }
  105. s.sendMengWangCallbacks(smsmdl.TypeSms, callbacks)
  106. return nil
  107. })
  108. group.Go(func() error {
  109. callbacks, err := cli.Callback(context.Background(), s.c.Provider.MengWangActUser, s.c.Provider.MengWangActPwd, s.c.Provider.MengWangActCallbackURL, _callbackSize)
  110. if err != nil {
  111. time.Sleep(time.Second)
  112. return nil
  113. }
  114. s.sendMengWangCallbacks(smsmdl.TypeActSms, callbacks)
  115. return nil
  116. })
  117. group.Go(func() error {
  118. callbacks, err := cli.Callback(context.Background(), s.c.Provider.MengWangInternationUser, s.c.Provider.MengWangInternationPwd, s.c.Provider.MengWangInternationalCallbackURL, _callbackSize)
  119. if err != nil {
  120. time.Sleep(time.Second)
  121. return nil
  122. }
  123. s.sendMengWangCallbacks(smsmdl.TypeSms, callbacks)
  124. return nil
  125. })
  126. group.Wait()
  127. time.Sleep(time.Second)
  128. }
  129. }
  130. func (s *Service) sendMengWangCallbacks(typ int32, cbs []*mengwang.Callback) (err error) {
  131. ts := time.Now().Unix()
  132. for _, cb := range cbs {
  133. if cb.ReportTime != "" {
  134. if t, e := time.ParseInLocation("2006-01-02 15:04:05", cb.ReportTime, time.Local); e != nil {
  135. log.Warn("sendMengWangCallbacks(%+v) parse time error(%v)", cb, e)
  136. } else {
  137. ts = t.Unix()
  138. }
  139. }
  140. s.sendUserActionLog(&smsmdl.ModelUserActionLog{
  141. MsgID: strconv.FormatInt(cb.MsgID, 10),
  142. Mobile: cb.Mobile,
  143. Status: cb.Status,
  144. Desc: cb.Desc,
  145. Provider: smsmdl.ProviderMengWang,
  146. Type: typ,
  147. Action: smsmdl.UserActionCallback,
  148. Ts: ts,
  149. })
  150. }
  151. return
  152. }