service.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "go-common/app/job/main/sms/conf"
  6. "go-common/app/job/main/sms/dao"
  7. "go-common/app/job/main/sms/dao/chuanglan"
  8. "go-common/app/job/main/sms/dao/mengwang"
  9. "go-common/app/job/main/sms/model"
  10. smsgrpc "go-common/app/service/main/sms/api"
  11. smsmdl "go-common/app/service/main/sms/model"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. "go-common/library/sync/pipeline/fanout"
  15. )
  16. // Service struct of service.
  17. type Service struct {
  18. c *conf.Config
  19. dao *dao.Dao
  20. databus *databus.Databus
  21. smsgrpc smsgrpc.SmsClient
  22. waiter *sync.WaitGroup
  23. sms chan *smsmdl.ModelSend // 验证码
  24. actSms chan *smsmdl.ModelSend // 营销
  25. batchSms chan *smsmdl.ModelSend // 批量
  26. smsp *model.ConcurrentRing
  27. intep *model.ConcurrentRing
  28. actp *model.ConcurrentRing
  29. batchp *model.ConcurrentRing
  30. cache *fanout.Fanout
  31. providers int
  32. closed bool
  33. smsCount int64
  34. blacklist map[string]struct{}
  35. }
  36. // New create service instance and return.
  37. func New(c *conf.Config) (s *Service) {
  38. proLen := len(c.Provider.Providers)
  39. if c.Speedup.Switch {
  40. proLen *= 2 // 每个短信服务商再提供一个云线路 http client
  41. }
  42. s = &Service{
  43. c: c,
  44. dao: dao.New(c),
  45. databus: databus.New(c.Databus),
  46. waiter: new(sync.WaitGroup),
  47. sms: make(chan *smsmdl.ModelSend, 10240),
  48. actSms: make(chan *smsmdl.ModelSend, 10240),
  49. batchSms: make(chan *smsmdl.ModelSend, 10240),
  50. smsp: model.NewConcurrentRing(proLen),
  51. intep: model.NewConcurrentRing(proLen),
  52. actp: model.NewConcurrentRing(proLen),
  53. batchp: model.NewConcurrentRing(proLen),
  54. cache: fanout.New("async-task", fanout.Worker(1), fanout.Buffer(10240)),
  55. providers: proLen,
  56. }
  57. s.initBlacklist()
  58. var err error
  59. if s.smsgrpc, err = smsgrpc.NewClient(c.SmsGRPC); err != nil {
  60. panic(err)
  61. }
  62. s.initProviders()
  63. s.waiter.Add(1)
  64. go s.subproc()
  65. go s.monitorproc()
  66. for i := 0; i < s.c.Sms.SingleSendProc; i++ {
  67. s.waiter.Add(1)
  68. go s.smsproc()
  69. s.waiter.Add(1)
  70. go s.actsmsproc()
  71. }
  72. for i := 0; i < s.c.Sms.BatchSendProc; i++ {
  73. s.waiter.Add(1)
  74. go s.actbatchproc()
  75. }
  76. return
  77. }
  78. func (s *Service) initBlacklist() {
  79. s.blacklist = make(map[string]struct{})
  80. for _, v := range s.c.Sms.Blacklist {
  81. s.blacklist[v] = struct{}{}
  82. }
  83. }
  84. func (s *Service) initProviders() {
  85. // 创建本地网络 http client
  86. s.newProviders(s.c)
  87. if !s.c.Speedup.Switch {
  88. return
  89. }
  90. // 替换成 云加速线路 URL 配置
  91. s.c.Provider.MengWangSmsURL = s.c.Speedup.MengWangSmsURL
  92. s.c.Provider.MengWangActURL = s.c.Speedup.MengWangActURL
  93. s.c.Provider.MengWangBatchURL = s.c.Speedup.MengWangBatchURL
  94. s.c.Provider.MengWangInternationURL = s.c.Speedup.MengWangInternationURL
  95. s.c.Provider.ChuangLanSmsURL = s.c.Speedup.ChuangLanSmsURL
  96. s.c.Provider.ChuangLanActURL = s.c.Speedup.ChuangLanActURL
  97. s.c.Provider.ChuangLanInternationURL = s.c.Speedup.ChuangLanInternationURL
  98. s.c.Provider.MengWangSmsCallbackURL = s.c.Speedup.MengWangSmsCallbackURL
  99. s.c.Provider.MengWangActCallbackURL = s.c.Speedup.MengWangActCallbackURL
  100. s.c.Provider.MengWangInternationalCallbackURL = s.c.Speedup.MengWangInternationalCallbackURL
  101. s.c.Provider.ChuangLanSmsCallbackURL = s.c.Speedup.ChuangLanSmsCallbackURL
  102. s.c.Provider.ChuangLanActCallbackURL = s.c.Speedup.ChuangLanActCallbackURL
  103. s.c.Provider.ChuangLanInternationalCallbackURL = s.c.Speedup.ChuangLanInternationalCallbackURL
  104. // 创建云加速线路 http client
  105. s.newProviders(s.c)
  106. }
  107. func (s *Service) newProviders(c *conf.Config) {
  108. var cli model.Provider
  109. for _, p := range c.Provider.Providers {
  110. switch p {
  111. case smsmdl.ProviderMengWang:
  112. cli = mengwang.NewClient(c)
  113. case smsmdl.ProviderChuangLan:
  114. cli = chuanglan.NewClient(c)
  115. default:
  116. log.Error("invalid provider(%d)", p)
  117. continue
  118. }
  119. s.smsp.Value = cli
  120. s.smsp.Ring = s.smsp.Next()
  121. s.intep.Value = cli
  122. s.intep.Ring = s.intep.Next()
  123. s.actp.Value = cli
  124. s.actp.Ring = s.actp.Next()
  125. s.batchp.Value = cli
  126. s.batchp.Ring = s.batchp.Next()
  127. for i := 0; i < s.c.Sms.CallbackProc; i++ {
  128. s.dispatchCallback(p)
  129. }
  130. }
  131. }
  132. // Ping check service health.
  133. func (s *Service) Ping(ctx context.Context) error {
  134. return s.dao.Ping(ctx)
  135. }
  136. // Close kafka consumer close.
  137. func (s *Service) Close() {
  138. s.closed = true
  139. s.databus.Close()
  140. s.waiter.Wait()
  141. }