service.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "time"
  7. "go-common/app/job/main/archive-shjd/conf"
  8. "go-common/app/job/main/archive-shjd/model"
  9. accrpc "go-common/app/service/main/account/rpc/client"
  10. "go-common/app/service/main/archive/api"
  11. arcrpc "go-common/app/service/main/archive/api/gorpc"
  12. "go-common/library/cache/redis"
  13. "go-common/library/log"
  14. "go-common/library/queue/databus"
  15. "github.com/pkg/errors"
  16. )
  17. const (
  18. _tableArchive = "archive"
  19. _tableVideo = "archive_video"
  20. _actionInsert = "insert"
  21. _actionUpdate = "update"
  22. _actionDelete = "delete"
  23. _sharding = 10
  24. )
  25. type lastTmStat struct {
  26. last int64
  27. stat *api.Stat
  28. }
  29. // Service service
  30. type Service struct {
  31. c *conf.Config
  32. waiter sync.WaitGroup
  33. canal *databus.Databus
  34. canalChan chan *model.Message
  35. subMap map[string]*databus.Databus
  36. subView *databus.Databus
  37. subDm *databus.Databus
  38. subReply *databus.Databus
  39. subFav *databus.Databus
  40. subCoin *databus.Databus
  41. subShare *databus.Databus
  42. subRank *databus.Databus
  43. subLike *databus.Databus
  44. notifyPub *databus.Databus
  45. accountNotify *databus.Databus
  46. subStatCh []chan *model.StatMsg
  47. arcRPCs map[string]*arcrpc.Service2
  48. accRPC *accrpc.Service3
  49. notifyMid map[int64]struct{}
  50. notifyMu sync.Mutex
  51. rds *redis.Pool
  52. statSM []map[int64]*lastTmStat
  53. close bool
  54. }
  55. // New is archive service implementation.
  56. func New(c *conf.Config) (s *Service) {
  57. s = &Service{
  58. c: c,
  59. canal: databus.New(c.Databus),
  60. canalChan: make(chan *model.Message, 10240),
  61. rds: redis.NewPool(c.Redis),
  62. subMap: make(map[string]*databus.Databus),
  63. // databus
  64. subView: databus.New(c.ViewSub),
  65. subDm: databus.New(c.DmSub),
  66. subReply: databus.New(c.ReplySub),
  67. subFav: databus.New(c.FavSub),
  68. subCoin: databus.New(c.CoinSub),
  69. subShare: databus.New(c.ShareSub),
  70. subRank: databus.New(c.RankSub),
  71. subLike: databus.New(c.LikeSub),
  72. notifyPub: databus.New(c.NotifyPub),
  73. accountNotify: databus.New(c.AccountNotify),
  74. notifyMid: make(map[int64]struct{}, 10240),
  75. accRPC: accrpc.New3(nil),
  76. }
  77. s.arcRPCs = make(map[string]*arcrpc.Service2)
  78. for _, cc := range c.ArchiveRPCs {
  79. s.arcRPCs[cc.Cluster] = arcrpc.New2(cc)
  80. }
  81. s.subMap[model.TypeForView] = s.subView
  82. s.subMap[model.TypeForDm] = s.subDm
  83. s.subMap[model.TypeForReply] = s.subReply
  84. s.subMap[model.TypeForFav] = s.subFav
  85. s.subMap[model.TypeForCoin] = s.subCoin
  86. s.subMap[model.TypeForShare] = s.subShare
  87. s.subMap[model.TypeForRank] = s.subRank
  88. s.subMap[model.TypeForLike] = s.subLike
  89. for i := 0; i < _sharding; i++ {
  90. s.waiter.Add(1)
  91. go s.canalChanproc()
  92. s.subStatCh = append(s.subStatCh, make(chan *model.StatMsg, 10240))
  93. s.statSM = append(s.statSM, map[int64]*lastTmStat{})
  94. s.waiter.Add(1)
  95. go s.statDealproc(i)
  96. }
  97. for k, d := range s.subMap {
  98. s.waiter.Add(1)
  99. go s.consumerproc(k, d)
  100. }
  101. s.waiter.Add(1)
  102. go s.canalproc()
  103. s.waiter.Add(1)
  104. go s.retryconsumer()
  105. s.waiter.Add(1)
  106. go s.accountNotifyproc()
  107. s.waiter.Add(1)
  108. go s.clearMidCache()
  109. return s
  110. }
  111. func (s *Service) canalChanproc() {
  112. defer s.waiter.Done()
  113. for {
  114. m, ok := <-s.canalChan
  115. if !ok {
  116. log.Info("canalChanproc closed")
  117. return
  118. }
  119. log.Info("got canal message table(%s) action(%s) old(%s) new(%s)", m.Table, m.Action, m.Old, m.New)
  120. var err error
  121. switch m.Table {
  122. case _tableArchive:
  123. var (
  124. old *model.Archive
  125. nw *model.Archive
  126. )
  127. switch m.Action {
  128. case _actionInsert:
  129. if err = json.Unmarshal(m.New, &nw); err != nil {
  130. log.Error("json.Unmarshal(%s) error(%v)", m.New, err)
  131. continue
  132. }
  133. case _actionUpdate:
  134. if err = json.Unmarshal(m.Old, &old); err != nil {
  135. log.Error("json.Unmarshal(%s) error(%v)", m.Old, err)
  136. continue
  137. }
  138. if err = json.Unmarshal(m.New, &nw); err != nil {
  139. log.Error("json.Unmarshal(%s) error(%v)", m.New, err)
  140. continue
  141. }
  142. default:
  143. log.Warn("got unknow action(%s)", m.Action)
  144. continue
  145. }
  146. s.UpdateCache(old, nw, m.Action)
  147. case _tableVideo:
  148. var video *model.Video
  149. if err = json.Unmarshal(m.New, &video); err != nil {
  150. log.Error("json.Unmarshal(%s) error(%v)", m.New, err)
  151. continue
  152. }
  153. switch m.Action {
  154. case _actionInsert, _actionUpdate:
  155. err = s.UpdateVideoCache(video.AID, video.CID)
  156. case _actionDelete:
  157. err = s.DelteVideoCache(video.AID, video.CID)
  158. default:
  159. bs, _ := json.Marshal(m)
  160. log.Error("unknow action(%s) message(%s)", m.Action, bs)
  161. }
  162. if err != nil {
  163. log.Error("%+v", err)
  164. continue
  165. }
  166. default:
  167. log.Warn("table(%s) skiped", m.Table)
  168. }
  169. }
  170. }
  171. func (s *Service) canalproc() {
  172. defer s.waiter.Done()
  173. msgs := s.canal.Messages()
  174. for {
  175. msg, ok := <-msgs
  176. if !ok || s.close {
  177. close(s.canalChan)
  178. log.Info("s.closed databus canal")
  179. return
  180. }
  181. var (
  182. m = &model.Message{}
  183. err error
  184. )
  185. msg.Commit()
  186. log.Info("got message(%s)", msg.Value)
  187. if err = json.Unmarshal(msg.Value, m); err != nil {
  188. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  189. continue
  190. }
  191. s.canalChan <- m
  192. }
  193. }
  194. // Ping check status
  195. func (s *Service) Ping() (err error) {
  196. conn := s.rds.Get(context.TODO())
  197. defer conn.Close()
  198. if _, err = conn.Do("SET", "PING", "PONG"); err != nil {
  199. err = errors.Wrap(err, "redis ping")
  200. return
  201. }
  202. return
  203. }
  204. // Close is
  205. func (s *Service) Close() (err error) {
  206. s.close = true
  207. time.Sleep(5 * time.Second)
  208. s.canal.Close()
  209. s.waiter.Wait()
  210. return
  211. }