service.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/interface/main/history/conf"
  7. "go-common/app/interface/main/history/dao/history"
  8. "go-common/app/interface/main/history/dao/toview"
  9. "go-common/app/interface/main/history/model"
  10. arcrpc "go-common/app/service/main/archive/api/gorpc"
  11. favrpc "go-common/app/service/main/favorite/api/gorpc"
  12. hisrpc "go-common/app/service/main/history/api/grpc"
  13. "go-common/library/log"
  14. "go-common/library/queue/databus/report"
  15. "go-common/library/sync/pipeline/fanout"
  16. )
  17. type playPro struct {
  18. Type int8 `json:"type"`
  19. SubType int8 `json:"sub_type"`
  20. Mid int64 `json:"mid"`
  21. Sid int64 `json:"sid"`
  22. Epid int64 `json:"epid"`
  23. Cid int64 `json:"cid"`
  24. Progress int64 `json:"progress"`
  25. IP string `json:"ip"`
  26. Ts int64 `json:"ts"`
  27. RealTime int64 `json:"realtime"`
  28. }
  29. // Service is history service.
  30. type Service struct {
  31. conf *conf.Config
  32. historyDao *history.Dao
  33. toviewDao *toview.Dao
  34. delChan *fanout.Fanout
  35. mergeChan chan *model.Merge
  36. msgs chan *playPro
  37. proChan chan *model.History
  38. serviceChan chan func()
  39. favRPC *favrpc.Service
  40. arcRPC *arcrpc.Service2
  41. hisRPC hisrpc.HistoryClient
  42. cache *fanout.Fanout
  43. toviewCache *fanout.Fanout
  44. midMap map[int64]bool
  45. }
  46. // New new a History service.
  47. func New(c *conf.Config) (s *Service) {
  48. s = &Service{
  49. conf: c,
  50. historyDao: history.New(c),
  51. toviewDao: toview.New(c),
  52. mergeChan: make(chan *model.Merge, 1024),
  53. msgs: make(chan *playPro, 1024),
  54. proChan: make(chan *model.History, 1024),
  55. serviceChan: make(chan func(), 10240),
  56. delChan: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  57. arcRPC: arcrpc.New2(c.RPCClient2.Archive),
  58. favRPC: favrpc.New2(c.RPCClient2.Favorite),
  59. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  60. toviewCache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  61. midMap: make(map[int64]bool),
  62. }
  63. for _, v := range s.conf.History.Mids {
  64. s.midMap[v] = true
  65. }
  66. var err error
  67. if s.hisRPC, err = hisrpc.NewClient(c.RPCClient2.History); err != nil {
  68. panic(err)
  69. }
  70. go s.playProproc()
  71. go s.mergeproc()
  72. go s.proPubproc()
  73. for i := 0; i < s.conf.History.ConsumeSize; i++ {
  74. go s.serviceproc()
  75. }
  76. return
  77. }
  78. func (s *Service) addMerge(mid, now int64) {
  79. select {
  80. case s.mergeChan <- &model.Merge{Mid: mid, Now: now}:
  81. default:
  82. log.Warn("mergeChan chan is full")
  83. }
  84. }
  85. func (s *Service) addPlayPro(p *playPro) {
  86. select {
  87. case s.msgs <- p:
  88. default:
  89. log.Warn("s.msgs chan is full")
  90. }
  91. }
  92. func (s *Service) addProPub(p *model.History) {
  93. select {
  94. case s.proChan <- p:
  95. default:
  96. log.Warn("s.proChan chan is full")
  97. }
  98. }
  99. func (s *Service) mergeproc() {
  100. var (
  101. m *model.Merge
  102. ticker = time.NewTicker(time.Duration(s.conf.History.Ticker))
  103. mergeMap = make(map[int64]int64)
  104. )
  105. for {
  106. select {
  107. case m = <-s.mergeChan:
  108. if m == nil {
  109. s.merge(mergeMap)
  110. return
  111. }
  112. if _, ok := mergeMap[m.Mid]; !ok {
  113. mergeMap[m.Mid] = m.Now
  114. }
  115. if len(mergeMap) < s.conf.History.Page {
  116. continue
  117. }
  118. case <-ticker.C:
  119. }
  120. s.merge(mergeMap)
  121. mergeMap = make(map[int64]int64)
  122. }
  123. }
  124. // playProproc send history to databus.
  125. func (s *Service) playProproc() {
  126. var (
  127. msg *playPro
  128. ms []*playPro
  129. ticker = time.NewTicker(time.Second)
  130. )
  131. for {
  132. select {
  133. case msg = <-s.msgs:
  134. if msg == nil {
  135. if len(ms) > 0 {
  136. s.pushPlayPro(ms)
  137. }
  138. return
  139. }
  140. ms = append(ms, msg)
  141. if len(ms) < 100 {
  142. continue
  143. }
  144. case <-ticker.C:
  145. }
  146. if len(ms) == 0 {
  147. continue
  148. }
  149. s.pushPlayPro(ms)
  150. ms = make([]*playPro, 0, 100)
  151. }
  152. }
  153. func (s *Service) pushPlayPro(ms []*playPro) {
  154. key := fmt.Sprintf("%d%d", ms[0].Mid, ms[0].Sid)
  155. for j := 0; j < 3; j++ {
  156. if err := s.historyDao.PlayPro(context.Background(), key, ms); err == nil {
  157. return
  158. }
  159. }
  160. }
  161. // proPubroc send history to databus.
  162. func (s *Service) proPubproc() {
  163. for {
  164. msg := <-s.proChan
  165. if msg == nil {
  166. return
  167. }
  168. s.proPub(msg)
  169. }
  170. }
  171. func (s *Service) proPub(msg *model.History) {
  172. key := fmt.Sprintf("%d%d", msg.Mid, msg.Aid)
  173. for j := 0; j < 3; j++ {
  174. if err := s.historyDao.ProPub(context.Background(), key, msg); err == nil {
  175. break
  176. }
  177. }
  178. }
  179. func (s *Service) userActionLog(mid int64, action string) {
  180. report.User(&report.UserInfo{
  181. Mid: mid,
  182. Business: model.HistoryLog,
  183. Action: action,
  184. Ctime: time.Now(),
  185. })
  186. }
  187. func (s *Service) migration(mid int64) bool {
  188. if !s.conf.History.Migration || mid == 0 {
  189. return false
  190. }
  191. if _, ok := s.midMap[mid]; ok {
  192. return true
  193. }
  194. if s.conf.History.Rate != 0 && mid%s.conf.History.Rate == 0 {
  195. return true
  196. }
  197. return false
  198. }
  199. // Ping ping service.
  200. // +wd:ignore
  201. func (s *Service) Ping(c context.Context) (err error) {
  202. if s.historyDao != nil {
  203. err = s.historyDao.Ping(c)
  204. }
  205. if s.toviewDao != nil {
  206. err = s.toviewDao.Ping(c)
  207. }
  208. return
  209. }
  210. // Close close resource.
  211. // +wd:ignore
  212. func (s *Service) Close() {
  213. s.mergeChan <- nil
  214. s.msgs <- nil
  215. s.proChan <- nil
  216. if s.historyDao != nil {
  217. s.historyDao.Close()
  218. }
  219. if s.toviewDao != nil {
  220. s.toviewDao.Close()
  221. }
  222. }