service.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "runtime"
  6. "strconv"
  7. "time"
  8. "go-common/app/interface/main/reply/conf"
  9. "go-common/app/interface/main/reply/dao/bigdata"
  10. "go-common/app/interface/main/reply/dao/drawyoo"
  11. "go-common/app/interface/main/reply/dao/fans"
  12. "go-common/app/interface/main/reply/dao/reply"
  13. "go-common/app/interface/main/reply/dao/search"
  14. "go-common/app/interface/main/reply/dao/vip"
  15. "go-common/app/interface/main/reply/dao/workflow"
  16. model "go-common/app/interface/main/reply/model/reply"
  17. artrpc "go-common/app/interface/openplatform/article/rpc/client"
  18. accrpc "go-common/app/service/main/account/api"
  19. arcrpc "go-common/app/service/main/archive/api/gorpc"
  20. assrpc "go-common/app/service/main/assist/rpc/client"
  21. figrpc "go-common/app/service/main/figure/rpc/client"
  22. filgrpc "go-common/app/service/main/filter/api/grpc/v1"
  23. replyfeed "go-common/app/service/main/reply-feed/api"
  24. locrpc "go-common/app/service/main/location/rpc/client"
  25. seqmdl "go-common/app/service/main/seq-server/model"
  26. seqrpc "go-common/app/service/main/seq-server/rpc/client"
  27. thumbup "go-common/app/service/main/thumbup/rpc/client"
  28. ugcpay "go-common/app/service/main/ugcpay/api/grpc/v1"
  29. "go-common/library/log"
  30. "go-common/library/log/infoc"
  31. "go-common/library/sync/pipeline/fanout"
  32. )
  33. var _defMemberJSON = []byte(`{"mid":"0","uname":"","sex":"保密","sign":"没签名","avatar":"http://static.hdslb.com/images/member/noface.gif","rank":"10000","DisplayRank":"0","level_info":{"current_level":0,"current_min":0,"current_exp":0,"next_exp":0},"pendant":{"pid":0,"name":"","image":"","expire":0},"nameplate":{"nid":0,"name":"","image":"","image_small":"","level":"","condition":""},"official_verify":{"type":-1,"desc":""},"vip":{"vipType":0,"vipDueDate":0,"dueRemark":"","accessStatus":1,"vipStatus":0,"vipStatusWarn":""}}`)
  34. // Service is reply service
  35. type Service struct {
  36. sndDefCnt int
  37. oneDaySec int // If block time from word filter is greater than one day, then banned reply and add moral to account.
  38. useBigData bool
  39. // content filter
  40. bigdata *bigdata.Dao
  41. drawyoo *drawyoo.Dao
  42. // fans http get method
  43. fans *fans.Dao
  44. // http request
  45. search *search.Dao
  46. // rpc client
  47. acc accrpc.AccountClient
  48. feedClient replyfeed.ReplyFeedClient
  49. filcli filgrpc.FilterClient
  50. location *locrpc.Service
  51. assist *assrpc.Service
  52. figure *figrpc.Service
  53. thumbup thumbup.ThumbupRPC
  54. // seq rpc
  55. seqArg *seqmdl.ArgBusiness
  56. seqSrv *seqrpc.Service2
  57. // workflow dao
  58. workflow *workflow.Dao
  59. arcSrv *arcrpc.Service2
  60. articleSrv *artrpc.Service
  61. ugcpay ugcpay.UGCPayClient
  62. // reply cache dao
  63. dao *reply.Dao
  64. // asynchronized add cache use channel
  65. replyChan chan replyChan
  66. topRpChan chan topRpChan
  67. // vip
  68. vip *vip.Dao
  69. emojisM map[string]int64
  70. emojis []*model.EmojiPackage
  71. // reply notice
  72. notice map[int8][]*model.Notice
  73. // default member
  74. defMember *model.Info
  75. cache *fanout.Fanout
  76. typeMapping map[int32]string
  77. aliasMapping map[string]int32
  78. aidWhiteList []int64
  79. infoc *infoc.Infoc
  80. bnjAid map[int64]struct{}
  81. sortByHot map[int64]int8
  82. sortByTime map[int64]int8
  83. hideFloor map[int64]int8
  84. hotReplyConfig map[int8]map[int64]int
  85. }
  86. // New new a service
  87. func New(c *conf.Config) (s *Service) {
  88. s = &Service{
  89. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  90. sndDefCnt: conf.Conf.Reply.SecondDefSize,
  91. oneDaySec: 24 * 60 * 60,
  92. useBigData: conf.Conf.Reply.BigdataFilter,
  93. replyChan: make(chan replyChan, _replyChanBuf),
  94. topRpChan: make(chan topRpChan, _topRpChanBuf),
  95. aidWhiteList: c.Reply.AidWhiteList,
  96. arcSrv: arcrpc.New2(c.RPCClient2.Archive),
  97. articleSrv: artrpc.New(c.RPCClient2.Article),
  98. infoc: infoc.New(c.Infoc),
  99. bnjAid: make(map[int64]struct{}),
  100. sortByTime: make(map[int64]int8),
  101. sortByHot: make(map[int64]int8),
  102. hideFloor: make(map[int64]int8),
  103. hotReplyConfig: make(map[int8]map[int64]int),
  104. }
  105. for oidStr, tp := range c.Reply.SortByHotOids {
  106. var (
  107. oid int64
  108. err error
  109. )
  110. if oid, err = strconv.ParseInt(oidStr, 10, 64); err != nil {
  111. panic(err)
  112. }
  113. s.sortByHot[oid] = tp
  114. }
  115. for oidStr, tp := range c.Reply.SortByTimeOids {
  116. var (
  117. oid int64
  118. err error
  119. )
  120. if oid, err = strconv.ParseInt(oidStr, 10, 64); err != nil {
  121. panic(err)
  122. }
  123. s.sortByTime[oid] = tp
  124. }
  125. for oidStr, tp := range c.Reply.HideFloorOids {
  126. var (
  127. oid int64
  128. err error
  129. )
  130. if oid, err = strconv.ParseInt(oidStr, 10, 64); err != nil {
  131. panic(err)
  132. }
  133. s.hideFloor[oid] = tp
  134. }
  135. for tpStr, mapping := range c.Reply.HotReplyConfig {
  136. var (
  137. tp int64
  138. oid int64
  139. err error
  140. )
  141. if tp, err = strconv.ParseInt(tpStr, 10, 64); err != nil {
  142. panic(err)
  143. }
  144. businessConfig := make(map[int64]int)
  145. for oidStr, num := range mapping {
  146. if oid, err = strconv.ParseInt(oidStr, 10, 64); err != nil {
  147. panic(err)
  148. }
  149. businessConfig[oid] = num
  150. }
  151. s.hotReplyConfig[int8(tp)] = businessConfig
  152. }
  153. for _, oid := range c.Reply.BnjAidList {
  154. s.bnjAid[oid] = struct{}{}
  155. }
  156. acc, err := accrpc.NewClient(c.AccountGRPCClient)
  157. if err != nil {
  158. panic(err)
  159. }
  160. s.acc = acc
  161. filcli, err := filgrpc.NewClient(c.FilterGRPCClient)
  162. if err != nil {
  163. panic(err)
  164. }
  165. s.filcli = filcli
  166. feedClient, err := replyfeed.NewClient(c.FeedGRPCClient)
  167. if err != nil {
  168. panic(err)
  169. }
  170. s.feedClient = feedClient
  171. // init depend dao
  172. s.fans = fans.New(c)
  173. s.search = search.New(c)
  174. s.drawyoo = drawyoo.New(c)
  175. s.bigdata = bigdata.New(c)
  176. s.vip = vip.New(c)
  177. s.emojisM = make(map[string]int64)
  178. s.emojis = make([]*model.EmojiPackage, 0)
  179. // init rpc client
  180. s.location = locrpc.New(c.RPCClient2.Location)
  181. s.assist = assrpc.New(c.RPCClient2.Assist)
  182. s.figure = figrpc.New(c.RPCClient2.Figure)
  183. s.seqSrv = seqrpc.New2(c.RPCClient2.Seq)
  184. s.seqArg = &seqmdl.ArgBusiness{Token: c.Seq.Token, BusinessID: c.Seq.BusinessID}
  185. s.thumbup = thumbup.New(c.RPCClient2.Thumbup)
  186. s.workflow = workflow.New(c)
  187. s.ugcpay, err = ugcpay.NewClient(nil)
  188. if err != nil {
  189. panic(err)
  190. }
  191. // init reply cache dao
  192. s.dao = reply.New(c)
  193. // default member
  194. s.defMember = &model.Info{}
  195. s.typeMapping = make(map[int32]string)
  196. s.aliasMapping = make(map[string]int32)
  197. if err := json.Unmarshal(_defMemberJSON, s.defMember); err != nil {
  198. panic(err)
  199. }
  200. for i := 0; i < runtime.NumCPU(); i++ {
  201. go s.cacheproc()
  202. }
  203. s.loadEmoji()
  204. s.loadRplNotice()
  205. s.loadBusiness()
  206. go s.loadproc()
  207. return
  208. }
  209. // TypeToAlias map type to alias
  210. func (s *Service) TypeToAlias(t int32) (alias string, exists bool) {
  211. alias, exists = s.typeMapping[t]
  212. return
  213. }
  214. // AliasToType map alias to type
  215. func (s *Service) AliasToType(alias string) (t int32, exists bool) {
  216. t, exists = s.aliasMapping[alias]
  217. return
  218. }
  219. func (s *Service) loadproc() {
  220. for {
  221. s.loadEmoji()
  222. s.loadRplNotice()
  223. s.loadBusiness()
  224. time.Sleep(time.Duration(conf.Conf.Reply.EmojiExpire))
  225. }
  226. }
  227. // Ping check service health
  228. func (s *Service) Ping(c context.Context) (err error) {
  229. return s.dao.Ping(c)
  230. }
  231. // Close close service connection
  232. func (s *Service) Close() {
  233. s.dao.Close()
  234. }
  235. // ResetFloor ...
  236. func (s *Service) ResetFloor(rps ...*model.Reply) {
  237. for _, rp := range rps {
  238. if rp == nil {
  239. continue
  240. }
  241. rp.Floor = 0
  242. if rp.Replies == nil {
  243. continue
  244. }
  245. for _, childRp := range rp.Replies {
  246. if childRp == nil {
  247. continue
  248. }
  249. childRp.Floor = 0
  250. }
  251. }
  252. }
  253. // hotNum ...
  254. func (s *Service) hotNumWeb(oid int64, tp int8) int {
  255. if businessConfig, ok := s.hotReplyConfig[tp]; ok {
  256. if num, exists := businessConfig[oid]; exists {
  257. return num
  258. }
  259. }
  260. return _hotSizeWeb
  261. }
  262. // hotNum ...
  263. func (s *Service) hotNum(oid int64, tp int8) int {
  264. if businessConfig, ok := s.hotReplyConfig[tp]; ok {
  265. if num, exists := businessConfig[oid]; exists {
  266. return num
  267. }
  268. }
  269. return _hotSize
  270. }
  271. // IsBnj avid...
  272. func (s *Service) IsBnj(oid int64, tp int8) bool {
  273. if _, ok := s.bnjAid[oid]; ok && tp == model.SubTypeArchive {
  274. return true
  275. }
  276. return false
  277. }
  278. // ShowFloor ...
  279. func (s *Service) ShowFloor(oid int64, tp int8) bool {
  280. if typ, ok := s.hideFloor[oid]; ok && typ == tp {
  281. return false
  282. }
  283. return true
  284. }
  285. // loadEmoji load emoji
  286. func (s *Service) loadEmoji() (err error) {
  287. var (
  288. emojis = make([]*model.EmojiPackage, 0)
  289. c = context.Background()
  290. emjM = make(map[string]int64)
  291. )
  292. emjs, err := s.dao.Emoji.ListEmojiPack(c)
  293. if err != nil {
  294. log.Error("service.ListEmojiPack error (%v)", err)
  295. return err
  296. }
  297. for _, d := range emjs {
  298. d.Emojis, err = s.dao.Emoji.EmojiListByPid(c, d.ID)
  299. if err != nil {
  300. log.Error("service.EmojiListByPid err (%v)", err)
  301. return err
  302. }
  303. for _, emo := range d.Emojis {
  304. emjM[emo.Name] = emo.ID
  305. }
  306. if len(d.Emojis) > 0 {
  307. emojis = append(emojis, d)
  308. }
  309. }
  310. s.emojis = emojis
  311. s.emojisM = emjM
  312. return
  313. }