service.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "go-common/app/job/main/stat/conf"
  9. "go-common/app/job/main/stat/dao"
  10. "go-common/app/job/main/stat/model"
  11. arcmdl "go-common/app/service/main/archive/api"
  12. archive "go-common/app/service/main/archive/api/gorpc"
  13. "go-common/library/cache/memcache"
  14. "go-common/library/conf/env"
  15. "go-common/library/log"
  16. "go-common/library/queue/databus"
  17. )
  18. const (
  19. _sharding = 100
  20. )
  21. type lastTmStat struct {
  22. last int64
  23. stat *arcmdl.Stat
  24. }
  25. // Service is stat job service.
  26. type Service struct {
  27. c *conf.Config
  28. // dao
  29. dao *dao.Dao
  30. // wait
  31. waiter sync.WaitGroup
  32. closed bool
  33. // databus
  34. subMap map[string]*databus.Databus
  35. subMonitor map[string]*model.Monitor
  36. subStatCh []chan *model.StatMsg
  37. mu sync.Mutex
  38. // stat map
  39. statSM []map[int64]*lastTmStat
  40. // rpc
  41. arcRPC *archive.Service2
  42. arcRPC2 *archive.Service2
  43. // max aid
  44. maxAid int64
  45. memcaches []*memcache.Pool
  46. }
  47. // New is stat-job service implementation.
  48. func New(c *conf.Config) (s *Service) {
  49. s = &Service{
  50. c: c,
  51. // dao
  52. dao: dao.New(c),
  53. // rpc
  54. arcRPC: archive.New2(c.ArchiveRPC),
  55. arcRPC2: archive.New2(c.ArchiveRPC2),
  56. subMap: make(map[string]*databus.Databus),
  57. subMonitor: make(map[string]*model.Monitor),
  58. }
  59. for _, mc := range s.c.Memcaches {
  60. s.memcaches = append(s.memcaches, memcache.NewPool(mc))
  61. }
  62. // view
  63. s.subMap[model.TypeForView] = databus.New(c.ViewSub)
  64. s.subMonitor[model.TypeForView] = &model.Monitor{Topic: c.ViewSub.Topic, Count: 0}
  65. // dm
  66. s.subMap[model.TypeForDm] = databus.New(c.DmSub)
  67. s.subMonitor[model.TypeForDm] = &model.Monitor{Topic: c.DmSub.Topic, Count: 0}
  68. // reply
  69. s.subMap[model.TypeForReply] = databus.New(c.ReplySub)
  70. s.subMonitor[model.TypeForReply] = &model.Monitor{Topic: c.ReplySub.Topic, Count: 0}
  71. // fav
  72. s.subMap[model.TypeForFav] = databus.New(c.FavSub)
  73. s.subMonitor[model.TypeForFav] = &model.Monitor{Topic: c.FavSub.Topic, Count: 0}
  74. // coin
  75. s.subMap[model.TypeForCoin] = databus.New(c.CoinSub)
  76. s.subMonitor[model.TypeForCoin] = &model.Monitor{Topic: c.CoinSub.Topic, Count: 0}
  77. // share
  78. s.subMap[model.TypeForShare] = databus.New(c.ShareSub)
  79. s.subMonitor[model.TypeForShare] = &model.Monitor{Topic: c.ShareSub.Topic, Count: 0}
  80. // rank
  81. s.subMap[model.TypeForRank] = databus.New(c.RankSub)
  82. // like
  83. s.subMap[model.TypeForLike] = databus.New(c.LikeSub)
  84. s.subMonitor[model.TypeForLike] = &model.Monitor{Topic: c.LikeSub.Topic, Count: 0}
  85. for i := int64(0); i < _sharding; i++ {
  86. s.subStatCh = append(s.subStatCh, make(chan *model.StatMsg, 10240))
  87. s.statSM = append(s.statSM, map[int64]*lastTmStat{})
  88. s.waiter.Add(1)
  89. go s.statDealproc(i)
  90. }
  91. go s.loadproc()
  92. if env.DeployEnv == env.DeployEnvProd {
  93. go s.monitorproc()
  94. }
  95. for k, d := range s.subMap {
  96. s.waiter.Add(1)
  97. go s.consumerproc(k, d)
  98. }
  99. return
  100. }
  101. func (s *Service) loadproc() {
  102. for {
  103. time.Sleep(1 * time.Minute)
  104. id, err := s.dao.MaxAID(context.TODO())
  105. if err != nil {
  106. s.maxAid = 0
  107. log.Error("s.dao.MaxAid error(%+v)", err)
  108. continue
  109. }
  110. s.maxAid = id
  111. }
  112. }
  113. func (s *Service) monitorproc() {
  114. for {
  115. time.Sleep(90 * time.Second)
  116. s.mu.Lock()
  117. for _, mo := range s.subMonitor {
  118. if mo.Count == 0 {
  119. s.dao.SendQiyeWX(fmt.Sprintf("日志报警:stat-job topic(%s) 没消费!!!!", mo.Topic))
  120. }
  121. mo.Count = 0
  122. }
  123. s.mu.Unlock()
  124. }
  125. }
  126. // consumerproc consumer all topic
  127. func (s *Service) consumerproc(k string, d *databus.Databus) {
  128. defer s.waiter.Done()
  129. var msgs = d.Messages()
  130. for {
  131. var (
  132. err error
  133. ok bool
  134. msg *databus.Message
  135. now = time.Now().Unix()
  136. )
  137. msg, ok = <-msgs
  138. if !ok || s.closed {
  139. log.Info("databus(%s) consumer exit", k)
  140. return
  141. }
  142. msg.Commit()
  143. var ms = &model.StatCount{}
  144. if err = json.Unmarshal(msg.Value, ms); err != nil {
  145. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  146. continue
  147. }
  148. if ms.Aid <= 0 || (ms.Type != "archive" && ms.Type != "archive_his") {
  149. log.Warn("message(%s) error", msg.Value)
  150. continue
  151. }
  152. if now-ms.TimeStamp > 8*60*60 {
  153. log.Warn("topic(%s) message(%s) too early", msg.Topic, msg.Value)
  154. continue
  155. }
  156. stat := &model.StatMsg{Aid: ms.Aid, Type: k, Ts: ms.TimeStamp}
  157. switch k {
  158. case model.TypeForView:
  159. stat.Click = ms.Count
  160. case model.TypeForDm:
  161. stat.DM = ms.Count
  162. case model.TypeForReply:
  163. stat.Reply = ms.Count
  164. case model.TypeForFav:
  165. stat.Fav = ms.Count
  166. case model.TypeForCoin:
  167. stat.Coin = ms.Count
  168. case model.TypeForShare:
  169. stat.Share = ms.Count
  170. case model.TypeForRank:
  171. stat.HisRank = ms.Count
  172. case model.TypeForLike:
  173. stat.Like = ms.Count
  174. stat.DisLike = ms.DisLike
  175. default:
  176. log.Error("unknow type(%s) message(%s)", k, msg.Value)
  177. continue
  178. }
  179. s.mu.Lock()
  180. if _, ok := s.subMonitor[k]; ok {
  181. s.subMonitor[k].Count++
  182. }
  183. s.mu.Unlock()
  184. s.subStatCh[stat.Aid%_sharding] <- stat
  185. log.Info("got message(%+v)", stat)
  186. }
  187. }
  188. // Close Databus consumer close.
  189. func (s *Service) Close() (err error) {
  190. s.closed = true
  191. time.Sleep(2 * time.Second)
  192. log.Info("start close job")
  193. for k, d := range s.subMap {
  194. d.Close()
  195. log.Info("databus(%s) cloesed", k)
  196. }
  197. for i := int64(0); i < _sharding; i++ {
  198. close(s.subStatCh[i])
  199. }
  200. log.Info("end close job")
  201. s.waiter.Wait()
  202. return
  203. }
  204. // Ping check server ok
  205. func (s *Service) Ping(c context.Context) (err error) {
  206. return s.dao.Ping(c)
  207. }