service.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package service
  2. import (
  3. "context"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. dm2rpc "go-common/app/interface/main/dm2/rpc/client"
  8. tagrpc "go-common/app/interface/main/tag/rpc/client"
  9. "go-common/app/interface/main/web/conf"
  10. "go-common/app/interface/main/web/dao"
  11. "go-common/app/interface/main/web/model"
  12. artrpc "go-common/app/interface/openplatform/article/rpc/client"
  13. accclient "go-common/app/service/main/account/api"
  14. arcclient "go-common/app/service/main/archive/api"
  15. arcrpc "go-common/app/service/main/archive/api/gorpc"
  16. brdrpc "go-common/app/service/main/broadcast/api/grpc/v1"
  17. coinclient "go-common/app/service/main/coin/api"
  18. couprpc "go-common/app/service/main/coupon/rpc/client"
  19. dyrpc "go-common/app/service/main/dynamic/rpc/client"
  20. favrpc "go-common/app/service/main/favorite/api/gorpc"
  21. locrpc "go-common/app/service/main/location/rpc/client"
  22. relrpc "go-common/app/service/main/relation/rpc/client"
  23. resrpc "go-common/app/service/main/resource/rpc/client"
  24. shareclient "go-common/app/service/main/share/api"
  25. thumbrpc "go-common/app/service/main/thumbup/rpc/client"
  26. ugcclient "go-common/app/service/main/ugcpay/api/grpc/v1"
  27. "go-common/library/log"
  28. "go-common/library/log/anticheat"
  29. "go-common/library/sync/pipeline/fanout"
  30. )
  31. // Service service
  32. type Service struct {
  33. c *conf.Config
  34. dao *dao.Dao
  35. // rpc
  36. arc *arcrpc.Service2
  37. dy *dyrpc.Service
  38. tag *tagrpc.Service
  39. loc *locrpc.Service
  40. art *artrpc.Service
  41. res *resrpc.Service
  42. relation *relrpc.Service
  43. thumbup *thumbrpc.Service
  44. coupon *couprpc.Service
  45. dm2 *dm2rpc.Service
  46. fav *favrpc.Service
  47. // chans
  48. rids map[int32]struct{}
  49. // cache proc
  50. cache *fanout.Fanout
  51. regionCount map[int16]int
  52. onlineArcs []*model.OnlineArc
  53. allArchivesCount, playOnline, webOnline int64
  54. // elec show typeids
  55. elecShowTypeIds map[int32]struct{}
  56. // no related data aids
  57. noRelAids map[int64]struct{}
  58. // rand source
  59. r *rand.Rand
  60. indexIcon *model.IndexIcon
  61. // infoc2
  62. CheatInfoc *anticheat.AntiCheat
  63. // TypeNames
  64. typeNames map[int32]*arcclient.Tp
  65. // Broadcast grpc client
  66. broadcastClient brdrpc.ZergClient
  67. coinClient coinclient.CoinClient
  68. arcClient arcclient.ArchiveClient
  69. accClient accclient.AccountClient
  70. shareClient shareclient.ShareClient
  71. ugcPayClient ugcclient.UGCPayClient
  72. // searchEggs
  73. searchEggs map[int64]*model.SearchEggRes
  74. // bnj
  75. bnj2019View *model.Bnj2019View
  76. BnjElecInfo *model.ElecShow
  77. bnj2019List []*model.Bnj2019Related
  78. bnj2019LiveArc *arcclient.ArcReply
  79. bnjGrayUids map[int64]struct{}
  80. specialMids map[int64]struct{}
  81. }
  82. // New new
  83. func New(c *conf.Config) *Service {
  84. s := &Service{
  85. c: c,
  86. dao: dao.New(c),
  87. arc: arcrpc.New2(c.ArchiveRPC),
  88. dy: dyrpc.New(c.DynamicRPC),
  89. tag: tagrpc.New2(c.TagRPC),
  90. loc: locrpc.New(c.LocationRPC),
  91. art: artrpc.New(c.ArticleRPC),
  92. res: resrpc.New(c.ResourceRPC),
  93. relation: relrpc.New(c.RelationRPC),
  94. thumbup: thumbrpc.New(c.ThumbupRPC),
  95. coupon: couprpc.New(c.CouponRPC),
  96. dm2: dm2rpc.New(c.Dm2RPC),
  97. fav: favrpc.New2(c.FavRPC),
  98. cache: fanout.New("cache"),
  99. regionCount: make(map[int16]int),
  100. r: rand.New(rand.NewSource(time.Now().Unix())),
  101. specialMids: map[int64]struct{}{},
  102. }
  103. var err error
  104. if s.broadcastClient, err = brdrpc.NewClient(c.BroadcastClient); err != nil {
  105. panic(err)
  106. }
  107. if s.coinClient, err = coinclient.NewClient(c.CoinClient); err != nil {
  108. panic(err)
  109. }
  110. if s.arcClient, err = arcclient.NewClient(c.ArcClient); err != nil {
  111. panic(err)
  112. }
  113. if s.accClient, err = accclient.NewClient(c.AccClient); err != nil {
  114. panic(err)
  115. }
  116. if s.shareClient, err = shareclient.NewClient(c.ShareClient); err != nil {
  117. panic(err)
  118. }
  119. if s.ugcPayClient, err = ugcclient.NewClient(c.UGCClient); err != nil {
  120. panic(err)
  121. }
  122. s.initRules()
  123. go s.newCountproc()
  124. go s.onlineCountproc()
  125. go s.onlineListproc()
  126. go s.indexIconproc()
  127. go s.typeNameproc()
  128. go s.searchEggproc()
  129. go s.bnj2019proc()
  130. go s.loadManager()
  131. // init infoc
  132. if c.Infoc2 != nil {
  133. s.CheatInfoc = anticheat.New(c.Infoc2)
  134. }
  135. return s
  136. }
  137. func (s *Service) initRules() {
  138. tmpRids := make(map[int32]struct{}, len(s.c.Rule.Rids))
  139. for _, v := range s.c.Rule.Rids {
  140. tmpRids[v] = struct{}{}
  141. }
  142. s.rids = tmpRids
  143. tmpElec := make(map[int32]struct{}, len(s.c.Rule.ElecShowTypeIDs))
  144. for _, id := range s.c.Rule.ElecShowTypeIDs {
  145. tmpElec[id] = struct{}{}
  146. }
  147. s.elecShowTypeIds = tmpElec
  148. tmpNoRel := make(map[int64]struct{}, len(s.c.Rule.NoRelAids))
  149. for _, id := range s.c.Rule.NoRelAids {
  150. tmpNoRel[id] = struct{}{}
  151. }
  152. s.noRelAids = tmpNoRel
  153. }
  154. func (s *Service) typeNameproc() {
  155. for {
  156. if typesReply, err := s.arcClient.Types(context.Background(), &arcclient.NoArgRequest{}); err != nil {
  157. log.Error("s.arc.Types2 error(%v)", err)
  158. time.Sleep(time.Second)
  159. continue
  160. } else {
  161. s.typeNames = typesReply.Types
  162. }
  163. time.Sleep(time.Duration(s.c.WEB.PullOnlineInterval))
  164. }
  165. }
  166. func (s *Service) searchEggproc() {
  167. var mutex = sync.Mutex{}
  168. for {
  169. if eggs, err := s.dao.SearchEgg(context.Background()); err != nil {
  170. log.Error("s.dao.SearchEgg error(%v)", err)
  171. time.Sleep(5 * time.Second)
  172. continue
  173. } else {
  174. data := make(map[int64]*model.SearchEggRes, len(eggs))
  175. for _, v := range eggs {
  176. if source, ok := v.Plat[_searchEggWebPlat]; ok {
  177. for _, egg := range source {
  178. if _, isSet := data[egg.EggID]; !isSet {
  179. data[egg.EggID] = &model.SearchEggRes{
  180. EggID: egg.EggID,
  181. ShowCount: v.ShowCount,
  182. }
  183. }
  184. source := &model.SearchEggSource{URL: egg.URL, MD5: egg.MD5, Size: egg.Size}
  185. data[egg.EggID].Source = append(data[egg.EggID].Source, source)
  186. }
  187. }
  188. }
  189. mutex.Lock()
  190. s.searchEggs = data
  191. mutex.Unlock()
  192. }
  193. time.Sleep(time.Duration(s.c.WEB.SearchEggInterval))
  194. }
  195. }
  196. // Ping check connection success.
  197. func (s *Service) Ping(c context.Context) (err error) {
  198. err = s.dao.Ping(c)
  199. return
  200. }
  201. // Close close resource.
  202. func (s *Service) Close() {
  203. s.dao.Close()
  204. }
  205. func archivesArgLog(name string, aids []int64) {
  206. if aidLen := len(aids); aidLen >= 50 {
  207. log.Info("s.arc.Archives3 func(%s) len(%d), arg(%v)", name, aidLen, aids)
  208. }
  209. }