service.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "time"
  7. artmdl "go-common/app/interface/openplatform/article/model"
  8. artrpc "go-common/app/interface/openplatform/article/rpc/client"
  9. "go-common/app/job/main/favorite/conf"
  10. favDao "go-common/app/job/main/favorite/dao/fav"
  11. musicDao "go-common/app/job/main/favorite/dao/music"
  12. pubDao "go-common/app/job/main/favorite/dao/pub"
  13. statDao "go-common/app/job/main/favorite/dao/stat"
  14. "go-common/app/service/main/archive/api"
  15. arcrpc "go-common/app/service/main/archive/api/gorpc"
  16. arcmdl "go-common/app/service/main/archive/model/archive"
  17. coinrpc "go-common/app/service/main/coin/api/gorpc"
  18. coinmdl "go-common/app/service/main/coin/model"
  19. favmdl "go-common/app/service/main/favorite/model"
  20. "go-common/library/log"
  21. "go-common/library/queue/databus"
  22. "go-common/library/sync/pipeline/fanout"
  23. )
  24. // Service favorite service.
  25. type Service struct {
  26. c *conf.Config
  27. waiter *sync.WaitGroup
  28. // fav
  29. cleanCDTime int64
  30. // dao
  31. pubDao *pubDao.Dao
  32. statDao *statDao.Dao
  33. favDao *favDao.Dao
  34. // databus
  35. consumer *databus.Databus
  36. playStatSub *databus.Databus
  37. favStatSub *databus.Databus
  38. shareStatSub *databus.Databus
  39. procChan []chan *favmdl.Message
  40. // rpc
  41. coinRPC *coinrpc.Service
  42. arcRPC *arcrpc.Service2
  43. artRPC *artrpc.Service
  44. // cache chan
  45. cache *fanout.Fanout
  46. statMerge *statMerge
  47. musicDao *musicDao.Dao
  48. }
  49. type statMerge struct {
  50. Business int
  51. Target int64
  52. Sources map[int64]bool
  53. }
  54. // New new a service and return.
  55. func New(c *conf.Config) (s *Service) {
  56. if c.Fav.Proc <= 0 {
  57. c.Fav.Proc = 32
  58. }
  59. s = &Service{
  60. c: c,
  61. waiter: new(sync.WaitGroup),
  62. // fav
  63. cleanCDTime: int64(time.Duration(c.Fav.CleanCDTime) / time.Second),
  64. // dao
  65. favDao: favDao.New(c),
  66. pubDao: pubDao.New(c),
  67. statDao: statDao.New(c),
  68. musicDao: musicDao.New(c),
  69. // databus
  70. consumer: databus.New(c.JobDatabus),
  71. procChan: make([]chan *favmdl.Message, c.Fav.Proc),
  72. // stat databus
  73. playStatSub: databus.New(c.MediaListCntDatabus),
  74. favStatSub: databus.New(c.FavStatDatabus),
  75. shareStatSub: databus.New(c.ShareStatDatabus),
  76. // rpc
  77. coinRPC: coinrpc.New(c.RPCClient2.Coin),
  78. artRPC: artrpc.New(c.RPCClient2.Article),
  79. arcRPC: arcrpc.New2(c.RPCClient2.Archive),
  80. // cache chan
  81. cache: fanout.New("cache"),
  82. }
  83. if c.StatMerge != nil {
  84. s.statMerge = &statMerge{
  85. Business: c.StatMerge.Business,
  86. Target: c.StatMerge.Target,
  87. Sources: make(map[int64]bool),
  88. }
  89. for _, id := range c.StatMerge.Sources {
  90. s.statMerge.Sources[id] = true
  91. }
  92. }
  93. for i := int64(0); i < c.Fav.Proc; i++ {
  94. ch := make(chan *favmdl.Message, 128)
  95. s.procChan[i] = ch
  96. s.waiter.Add(1)
  97. go s.jobproc(ch)
  98. }
  99. s.waiter.Add(1)
  100. go s.consumeStat()
  101. s.waiter.Add(1)
  102. go s.consumeproc()
  103. return
  104. }
  105. func (s *Service) consumeproc() {
  106. offsets := make(map[int32]int64, 9)
  107. defer func() {
  108. log.Info("end databus msg offsets:%v", offsets)
  109. s.waiter.Done()
  110. }()
  111. for {
  112. msg, ok := <-s.consumer.Messages()
  113. if !ok {
  114. log.Info("consumeproc exit")
  115. for _, c := range s.procChan {
  116. close(c)
  117. }
  118. return
  119. }
  120. if _, ok := offsets[msg.Partition]; !ok {
  121. log.Info("begin databus msg offsets:%v", offsets)
  122. }
  123. offsets[msg.Partition] = msg.Offset
  124. msg.Commit()
  125. m := &favmdl.Message{}
  126. if err := json.Unmarshal(msg.Value, m); err != nil {
  127. log.Error("json.Unmarshal() error(%v)", err)
  128. continue
  129. }
  130. if m.Mid <= 0 {
  131. log.Warn("m.Mid shuld not be equal or lesser than zero,m:%+v", m)
  132. continue
  133. }
  134. log.Info("consumer topic:%s, partitionId:%d, offset:%d, Key:%s, Value:%s Mid:%d Proc:%d", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value, m.Mid, s.c.Fav.Proc)
  135. s.procChan[m.Mid%s.c.Fav.Proc] <- m
  136. }
  137. }
  138. func (s *Service) jobproc(ch chan *favmdl.Message) {
  139. defer s.waiter.Done()
  140. for {
  141. m, ok := <-ch
  142. if !ok {
  143. log.Info("jobproc exit")
  144. return
  145. }
  146. switch m.Field {
  147. case favmdl.FieldResource:
  148. if err := s.upResource(context.Background(), m); err != nil {
  149. log.Error("upResource(%v) error(%v)", m, err)
  150. continue
  151. }
  152. default:
  153. }
  154. }
  155. }
  156. // Close close.
  157. func (s *Service) Close() (err error) {
  158. if err = s.consumer.Close(); err != nil {
  159. log.Error("s.consumer.Close() error(%v)", err)
  160. return
  161. }
  162. return s.favDao.Close()
  163. }
  164. // Wait wait.
  165. func (s *Service) Wait() {
  166. s.waiter.Wait()
  167. }
  168. // Ping ping method for server check
  169. func (s *Service) Ping(c context.Context) (err error) {
  170. if err = s.favDao.Ping(c); err != nil {
  171. log.Error("s.favDao.Ping error(%v)", err)
  172. return
  173. }
  174. return
  175. }
  176. // ArcRPC find archive by rpc
  177. func (s *Service) archiveRPC(c context.Context, aid int64) (a *api.Arc, err error) {
  178. argAid := &arcmdl.ArgAid2{
  179. Aid: aid,
  180. }
  181. if a, err = s.arcRPC.Archive3(c, argAid); err != nil {
  182. log.Error("arcRPC.Archive3(%v, archive), err(%v)", argAid, err)
  183. }
  184. return
  185. }
  186. // AddCoinRpc check user whether or not banned to post
  187. func (s *Service) addCoinRPC(c context.Context, mid int64, coin float64, reason string) (err error) {
  188. if _, err = s.coinRPC.ModifyCoin(c, &coinmdl.ArgModifyCoin{Mid: mid, Count: coin, Reason: reason}); err != nil {
  189. log.Error("coinRPC.ModifyCoin(%v, %v), err(%v)", mid, coin, err)
  190. }
  191. return
  192. }
  193. // articleRPC find aritile by rpc
  194. func (s *Service) articleRPC(c context.Context, aid int64) (a map[int64]*artmdl.Meta, err error) {
  195. argAid := &artmdl.ArgAids{
  196. Aids: []int64{aid},
  197. }
  198. if a, err = s.artRPC.ArticleMetas(c, argAid); err != nil {
  199. log.Error("d.artRPC.ArticleMetas(%+v), error(%v)", argAid, err)
  200. }
  201. return
  202. }
  203. // ArcsRPC find archives by rpc.
  204. func (s *Service) ArcsRPC(c context.Context, aids []int64) (as map[int64]*api.Arc, err error) {
  205. if len(aids) == 0 {
  206. return
  207. }
  208. argAids := &arcmdl.ArgAids2{
  209. Aids: aids,
  210. }
  211. if as, err = s.arcRPC.Archives3(c, argAids); err != nil {
  212. log.Error("s.arcRPC.Archives3(%v), error(%v)", argAids, err)
  213. }
  214. return
  215. }
  216. func (s *Service) mergeTarget(business int, aid int64) int64 {
  217. if s.statMerge != nil && s.statMerge.Business == business && s.statMerge.Sources[aid] {
  218. return s.statMerge.Target
  219. }
  220. return 0
  221. }