service.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "sync"
  8. "time"
  9. dm2rpc "go-common/app/interface/main/dm2/rpc/client"
  10. "go-common/app/job/main/archive/conf"
  11. "go-common/app/job/main/archive/dao/archive"
  12. "go-common/app/job/main/archive/dao/email"
  13. "go-common/app/job/main/archive/dao/monitor"
  14. "go-common/app/job/main/archive/dao/reply"
  15. "go-common/app/job/main/archive/dao/result"
  16. dbusmdl "go-common/app/job/main/archive/model/databus"
  17. resmdl "go-common/app/job/main/archive/model/result"
  18. "go-common/app/job/main/archive/model/retry"
  19. accgrpc "go-common/app/service/main/account/api"
  20. arcrpc "go-common/app/service/main/archive/api/gorpc"
  21. xredis "go-common/library/cache/redis"
  22. "go-common/library/log"
  23. "go-common/library/queue/databus"
  24. )
  25. // Service service
  26. type Service struct {
  27. c *conf.Config
  28. closeRetry bool
  29. closeSub bool
  30. archiveDao *archive.Dao
  31. emailDao *email.Dao
  32. monitorDao *monitor.Dao
  33. replyDao *reply.Dao
  34. resultDao *result.Dao
  35. redis *xredis.Pool
  36. waiter sync.WaitGroup
  37. videoupSub *databus.Databus
  38. archiveResultPub *databus.Databus
  39. dmPub *databus.Databus
  40. dmSub *databus.Databus
  41. cacheSub *databus.Databus
  42. accountNotifySub *databus.Databus
  43. sfTpsCache map[int16]int16
  44. adtTpsCache map[int16]struct{}
  45. arcServices []*arcrpc.Service2
  46. accGRPC accgrpc.AccountClient
  47. dm2RPC *dm2rpc.Service
  48. // databus channel
  49. videoupAids []chan int64
  50. pgcAids chan int64
  51. // dm count
  52. dmCids map[int64]struct{}
  53. dmMu sync.Mutex
  54. notifyMid map[int64]struct{}
  55. notifyMu sync.Mutex
  56. }
  57. // New is archive service implementation.
  58. func New(c *conf.Config) (s *Service) {
  59. s = &Service{
  60. c: c,
  61. archiveDao: archive.New(c),
  62. emailDao: email.New(c),
  63. monitorDao: monitor.New(c),
  64. replyDao: reply.New(c),
  65. resultDao: result.New(c),
  66. dm2RPC: dm2rpc.New(c.Dm2RPC),
  67. videoupSub: databus.New(c.VideoupSub),
  68. dmSub: databus.New(c.DmSub),
  69. dmPub: databus.New(c.DmPub),
  70. archiveResultPub: databus.New(c.ArchiveResultPub),
  71. cacheSub: databus.New(c.CacheSub),
  72. accountNotifySub: databus.New(c.AccountNotifySub),
  73. redis: xredis.NewPool(c.Redis),
  74. pgcAids: make(chan int64, 1024),
  75. dmCids: make(map[int64]struct{}),
  76. notifyMid: make(map[int64]struct{}),
  77. arcServices: make([]*arcrpc.Service2, 0),
  78. }
  79. var err error
  80. if s.accGRPC, err = accgrpc.NewClient(nil); err != nil {
  81. panic(fmt.Sprintf("account.service grpc not found!!!!!!!!!!!! error(%v)", err))
  82. }
  83. for _, sc := range s.c.ArchiveServices {
  84. s.arcServices = append(s.arcServices, arcrpc.New2(sc))
  85. }
  86. for i := 0; i < s.c.ChanSize; i++ {
  87. s.videoupAids = append(s.videoupAids, make(chan int64, 1024))
  88. s.waiter.Add(1)
  89. go s.consumerVideoup(i)
  90. s.waiter.Add(1)
  91. go s.pgcConsumer()
  92. }
  93. s.loadType()
  94. go s.cacheproc()
  95. // sync archive_result db!!!!!!!
  96. s.waiter.Add(1)
  97. go s.videoupConsumer()
  98. s.waiter.Add(1)
  99. go s.dmConsumer()
  100. // check consumer
  101. go s.checkConsume()
  102. s.waiter.Add(1)
  103. go s.retryproc()
  104. s.waiter.Add(1)
  105. go s.dmCounter()
  106. s.waiter.Add(1)
  107. go s.cachesubproc()
  108. s.waiter.Add(1)
  109. go s.accountNotifyproc()
  110. s.waiter.Add(1)
  111. go s.clearMidCache()
  112. return s
  113. }
  114. func (s *Service) sendNotify(upInfo *resmdl.ArchiveUpInfo) {
  115. var (
  116. nw []byte
  117. old []byte
  118. err error
  119. msg *dbusmdl.Message
  120. c = context.TODO()
  121. rt = &retry.Info{}
  122. )
  123. if nw, err = json.Marshal(upInfo.Nw); err != nil {
  124. log.Error("json.Marshal(%+v) error(%v)", upInfo.Nw, err)
  125. return
  126. }
  127. if old, err = json.Marshal(upInfo.Old); err != nil {
  128. log.Error("json.Marshal(%+v) error(%v)", upInfo.Old, err)
  129. return
  130. }
  131. msg = &dbusmdl.Message{Action: upInfo.Action, Table: upInfo.Table, New: nw, Old: old}
  132. if err = s.archiveResultPub.Send(c, strconv.FormatInt(upInfo.Nw.AID, 10), msg); err != nil {
  133. log.Error("s.archiveResultPub.Send(%+v) error(%v)", msg, err)
  134. rt.Action = retry.FailDatabus
  135. rt.Data.Aid = upInfo.Nw.AID
  136. rt.Data.DatabusMsg = upInfo
  137. s.PushFail(c, rt)
  138. return
  139. }
  140. msgStr, _ := json.Marshal(msg)
  141. log.Info("sendNotify(%s) successed", msgStr)
  142. }
  143. func (s *Service) loadType() {
  144. tpm, err := s.archiveDao.TypeMapping(context.TODO())
  145. if err != nil {
  146. log.Error("s.dede.TypeMapping error(%v)", err)
  147. return
  148. }
  149. s.sfTpsCache = tpm
  150. // audit types
  151. adt, err := s.archiveDao.AuditTypesConf(context.TODO())
  152. if err != nil {
  153. log.Error("s.dede.AuditTypesConf error(%v)", err)
  154. return
  155. }
  156. s.adtTpsCache = adt
  157. }
  158. func (s *Service) cacheproc() {
  159. for {
  160. time.Sleep(1 * time.Minute)
  161. s.loadType()
  162. }
  163. }
  164. // check consumer stat
  165. func (s *Service) checkConsume() {
  166. if s.c.Env != "pro" {
  167. return
  168. }
  169. for {
  170. time.Sleep(1 * time.Minute)
  171. for i := 0; i < s.c.ChanSize; i++ {
  172. if l := len(s.videoupAids[i]); l > s.c.MonitorSize {
  173. s.monitorDao.Send(context.TODO(), s.c.WeChantUsers, fmt.Sprintf("archive-job报警了啊\n UGC的chan太大了!!!\n s.videoupAids[%d] size(%d) is too large\n 是不是有人在刷数据!!!!", i, l), s.c.WeChatToken, s.c.WeChatSecret)
  174. }
  175. }
  176. if l := len(s.pgcAids); l > s.c.MonitorSize {
  177. s.monitorDao.Send(context.TODO(), s.c.WeChantUsers, fmt.Sprintf("archive-job报警了啊\n PGC的chan太大了!!!\n chan size(%d) is too large \n 是不是有人在刷数据!!!!", l), s.c.WeChatToken, s.c.WeChatSecret)
  178. }
  179. }
  180. }
  181. // Close kafaka consumer close.
  182. func (s *Service) Close() (err error) {
  183. s.closeSub = true
  184. time.Sleep(2 * time.Second)
  185. s.videoupSub.Close()
  186. s.closeRetry = true
  187. s.waiter.Wait()
  188. return
  189. }