sms.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "regexp"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "go-common/app/job/main/sms/dao"
  11. "go-common/app/job/main/sms/model"
  12. smsmdl "go-common/app/service/main/sms/model"
  13. "go-common/library/conf/env"
  14. "go-common/library/log"
  15. )
  16. const _retry = 3
  17. var _contentRe = regexp.MustCompile(`\d`)
  18. func (s *Service) subproc() {
  19. defer s.waiter.Done()
  20. for {
  21. item, ok := <-s.databus.Messages()
  22. if !ok {
  23. close(s.sms)
  24. close(s.actSms)
  25. close(s.batchSms)
  26. log.Info("databus: sms-job subproc consumer exit!")
  27. return
  28. }
  29. s.smsCount++
  30. msg := new(smsmdl.ModelSend)
  31. if err := json.Unmarshal(item.Value, &msg); err != nil {
  32. log.Error("json.Unmarshal (%v) error(%v)", string(item.Value), err)
  33. continue
  34. }
  35. log.Info("subproc topic(%s) key(%s) partition(%v) offset(%d) message(%s)", item.Topic, item.Key, item.Partition, item.Offset, item.Value)
  36. // 黑名单,用于压测
  37. if _, ok := s.blacklist[msg.Country+msg.Mobile]; ok {
  38. log.Info("country(%s) mobile(%s) in blacklist", msg.Country, msg.Mobile)
  39. item.Commit()
  40. continue
  41. }
  42. switch msg.Type {
  43. case smsmdl.TypeSms:
  44. s.sms <- msg
  45. case smsmdl.TypeActSms:
  46. s.actSms <- msg
  47. case smsmdl.TypeActBatch:
  48. s.batchSms <- msg
  49. }
  50. item.Commit()
  51. }
  52. }
  53. func (s *Service) smsproc() {
  54. defer s.waiter.Done()
  55. var (
  56. err error
  57. msgid string
  58. )
  59. for {
  60. m, ok := <-s.sms
  61. if !ok {
  62. log.Info("smsproc exit!")
  63. return
  64. }
  65. if m.Mobile == "" {
  66. if m.Country, m.Mobile, err = s.userMobile(m.Mid); err != nil {
  67. continue
  68. }
  69. }
  70. if m.Country == "" || m.Mobile == "" {
  71. log.Error("invalid country or mobile, info(%+v)", m)
  72. continue
  73. }
  74. content := _contentRe.ReplaceAllString(m.Content, "*")
  75. l := &smsmdl.ModelUserActionLog{Mobile: m.Mobile, Content: content, Type: smsmdl.TypeSms, Action: smsmdl.UserActionTypeSend}
  76. if m.Country == smsmdl.CountryChina {
  77. for i := 0; i < s.providers; i++ {
  78. s.smsp.Lock()
  79. p := s.smsp.Value.(model.Provider)
  80. s.smsp.Ring = s.smsp.Next()
  81. s.smsp.Unlock()
  82. l.Provider = p.GetPid()
  83. if msgid, err = p.SendSms(context.Background(), m); err == nil {
  84. break
  85. }
  86. dao.PromInfo(fmt.Sprintf("service:retry %d", l.Provider))
  87. log.Error("retry send sms(%v) platform(%d) error(%v)", m, l.Provider, err)
  88. }
  89. } else {
  90. for i := 0; i < s.providers; i++ {
  91. s.intep.Lock()
  92. p := s.intep.Value.(model.Provider)
  93. s.intep.Ring = s.intep.Next()
  94. s.intep.Unlock()
  95. l.Provider = p.GetPid()
  96. if msgid, err = p.SendInternationalSms(context.Background(), m); err == nil {
  97. break
  98. }
  99. dao.PromInfo(fmt.Sprintf("service:retry international %d", l.Provider))
  100. log.Error("retry send international sms(%v) platform(%d) error(%v)", m, l.Provider, err)
  101. }
  102. }
  103. if err == nil {
  104. l.Status = smsmdl.UserActionSendSuccessStatus
  105. l.Desc = smsmdl.UserActionSendSuccessDesc
  106. dao.PromInfo(fmt.Sprintf("service:success %d", l.Provider))
  107. log.Info("send sms(%v) platform(%d) success", m, l.Provider)
  108. } else {
  109. l.Status = smsmdl.UserActionSendFailedStatus
  110. l.Desc = smsmdl.UserActionSendFailedDesc
  111. dao.PromError("service:sms")
  112. log.Error("send sms(%v) error(%v)", m, err)
  113. s.cache.Do(context.Background(), func(ctx context.Context) {
  114. s.dao.SendWechat(fmt.Sprintf("sms-job send msg(%d) error(%v)", m.ID, err))
  115. })
  116. }
  117. l.MsgID = msgid
  118. l.Ts = time.Now().Unix()
  119. s.sendUserActionLog(l)
  120. }
  121. }
  122. func (s *Service) actsmsproc() {
  123. defer s.waiter.Done()
  124. var (
  125. err error
  126. msgid string
  127. )
  128. for {
  129. m, ok := <-s.actSms
  130. if !ok {
  131. log.Info("actsmsproc exit!")
  132. return
  133. }
  134. if m.Mobile == "" {
  135. if m.Country, m.Mobile, err = s.userMobile(m.Mid); err != nil {
  136. continue
  137. }
  138. }
  139. if m.Country == "" || m.Mobile == "" {
  140. log.Error("invalid country or mobile, info(%+v)", m)
  141. continue
  142. }
  143. content := _contentRe.ReplaceAllString(m.Content, "*")
  144. l := &smsmdl.ModelUserActionLog{Mobile: m.Mobile, Content: content, Type: smsmdl.TypeActSms, Action: smsmdl.UserActionTypeSend}
  145. if m.Country == smsmdl.CountryChina {
  146. for i := 0; i < s.providers; i++ {
  147. s.actp.Lock()
  148. p := s.actp.Value.(model.Provider)
  149. s.actp.Ring = s.actp.Next()
  150. s.actp.Unlock()
  151. l.Provider = p.GetPid()
  152. if msgid, err = p.SendActSms(context.Background(), m); err == nil {
  153. break
  154. }
  155. dao.PromInfo(fmt.Sprintf("service:retry act china %d", l.Provider))
  156. log.Error("retry send act sms(%v) platform(%d) error(%v)", m, l.Provider, err)
  157. }
  158. } else {
  159. for i := 0; i < s.providers; i++ {
  160. s.intep.Lock()
  161. p := s.intep.Value.(model.Provider)
  162. s.intep.Ring = s.intep.Next()
  163. s.intep.Unlock()
  164. l.Provider = p.GetPid()
  165. if msgid, err = p.SendInternationalSms(context.Background(), m); err == nil {
  166. break
  167. }
  168. dao.PromInfo(fmt.Sprintf("service:retry act international %d", l.Provider))
  169. log.Error("retry send act international sms(%v) platform(%d)", m, l.Provider, err)
  170. }
  171. }
  172. if err == nil {
  173. l.Status = smsmdl.UserActionSendSuccessStatus
  174. l.Desc = smsmdl.UserActionSendSuccessDesc
  175. dao.PromInfo(fmt.Sprintf("service:act china success %d", l.Provider))
  176. log.Info("send act sms(%v) platform(%d) success", m, l.Provider)
  177. } else {
  178. l.Status = smsmdl.UserActionSendFailedStatus
  179. l.Desc = smsmdl.UserActionSendFailedDesc
  180. dao.PromError("service:actSms")
  181. log.Error("send act sms(%v) error(%v)", m, err)
  182. s.cache.Do(context.Background(), func(ctx context.Context) {
  183. s.dao.SendWechat(fmt.Sprintf("sms-job send msg(%d) error(%v)", m.ID, err))
  184. })
  185. }
  186. l.MsgID = msgid
  187. l.Ts = time.Now().Unix()
  188. s.sendUserActionLog(l)
  189. }
  190. }
  191. func (s *Service) actbatchproc() {
  192. defer s.waiter.Done()
  193. var (
  194. err error
  195. mids []string
  196. country string
  197. mobile string
  198. msgid string
  199. )
  200. for {
  201. m, ok := <-s.batchSms
  202. if !ok {
  203. log.Info("actbatchproc exit!")
  204. return
  205. }
  206. if m.Mobile == "" && m.Mid != "" {
  207. mids = strings.Split(m.Mid, ",")
  208. var mobiles []string
  209. for _, midStr := range mids {
  210. if country, mobile, err = s.userMobile(midStr); err != nil {
  211. continue
  212. }
  213. if country == "" || mobile == "" {
  214. log.Error("invalid country or mobile, code(%s) mid(%s) country(%s) mobile(%s)", m.Code, midStr, country, mobile)
  215. continue
  216. }
  217. if country != smsmdl.CountryChina {
  218. continue
  219. }
  220. mobiles = append(mobiles, mobile)
  221. }
  222. m.Mobile = strings.Join(mobiles, ",")
  223. }
  224. if m.Mobile == "" {
  225. continue
  226. }
  227. content := _contentRe.ReplaceAllString(m.Content, "*")
  228. l := &smsmdl.ModelUserActionLog{Mobile: m.Mobile, Content: content, Type: smsmdl.TypeActSms, Action: smsmdl.UserActionTypeSend, Ts: time.Now().Unix()}
  229. send := &smsmdl.ModelSend{Mobile: m.Mobile, Content: m.Content, Type: smsmdl.TypeActSms}
  230. for i := 0; i < s.providers; i++ {
  231. s.batchp.Lock()
  232. p := s.batchp.Value.(model.Provider)
  233. s.batchp.Ring = s.batchp.Next()
  234. s.batchp.Unlock()
  235. l.Provider = p.GetPid()
  236. if msgid, err = p.SendBatchActSms(context.Background(), send); err == nil {
  237. break
  238. }
  239. dao.PromInfo(fmt.Sprintf("service:retry batch %d", l.Provider))
  240. log.Error("retry send act batch sms(%v) platform(%d)", m, l.Provider, err)
  241. }
  242. if err == nil {
  243. dao.PromInfo(fmt.Sprintf("service:batch success %d", l.Provider))
  244. log.Info("send act batch sms(%v) platform(%d) success", m, l.Provider)
  245. l.Status = smsmdl.UserActionSendSuccessStatus
  246. l.Desc = smsmdl.UserActionSendSuccessDesc
  247. } else {
  248. dao.PromError("service:actBatchSms")
  249. log.Error("send act batch sms(%v) error(%v)", m, err)
  250. s.cache.Do(context.Background(), func(ctx context.Context) {
  251. s.dao.SendWechat(fmt.Sprintf("sms-job send msg(%d) error(%v)", m.ID, err))
  252. })
  253. l.Status = smsmdl.UserActionSendFailedStatus
  254. l.Desc = smsmdl.UserActionSendFailedDesc
  255. }
  256. l.MsgID = msgid
  257. l.Ts = time.Now().Unix()
  258. s.sendUserActionLog(l)
  259. }
  260. }
  261. func (s *Service) userMobile(midStr string) (country, mobile string, err error) {
  262. mid, err := strconv.ParseInt(midStr, 10, 64)
  263. if err != nil {
  264. log.Error("userMobile parse mid(%s) error(%v)", midStr, err)
  265. return
  266. }
  267. if mid <= 0 {
  268. log.Error("userMobile invalid mid(%s)", midStr)
  269. return
  270. }
  271. var um *model.UserMobile
  272. for i := 0; i < _retry; i++ {
  273. if um, err = s.dao.UserMobile(context.Background(), mid); err == nil {
  274. break
  275. }
  276. time.Sleep(10 * time.Millisecond)
  277. }
  278. if err != nil {
  279. log.Error("UserMobile mid(%d) error(%v)", mid, err)
  280. return
  281. }
  282. country = um.CountryCode
  283. mobile = um.Mobile
  284. return
  285. }
  286. func (s *Service) monitorproc() {
  287. if env.DeployEnv != env.DeployEnvProd {
  288. return
  289. }
  290. var smsCount int64
  291. for {
  292. time.Sleep(time.Duration(s.c.Sms.MonitorProcDuration))
  293. if s.smsCount-smsCount == 0 {
  294. msg := fmt.Sprintf("sms-job sms did not consume within %s seconds", time.Duration(s.c.Sms.MonitorProcDuration).String())
  295. s.dao.SendWechat(msg)
  296. log.Warn(msg)
  297. }
  298. smsCount = s.smsCount
  299. }
  300. }