dao.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/service/main/thumbup/conf"
  6. "go-common/app/service/main/thumbup/model"
  7. "go-common/library/cache/memcache"
  8. xredis "go-common/library/cache/redis"
  9. "go-common/library/database/tidb"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. "go-common/library/stat/prom"
  13. "go-common/library/sync/pipeline/fanout"
  14. )
  15. // PromError prom error
  16. func PromError(name string) {
  17. prom.BusinessErrCount.Incr(name)
  18. }
  19. // Dao dao
  20. type Dao struct {
  21. // config
  22. c *conf.Config
  23. // db
  24. tidb *tidb.DB
  25. // memcache
  26. mc *memcache.Pool
  27. mcStatsExpire int32
  28. //redis
  29. redis *xredis.Pool
  30. redisStatsExpire int64
  31. redisUserLikesExpire int64
  32. redisItemLikesExpire int64
  33. // redisSortExpire int64
  34. // stmt
  35. businessesStmt *tidb.Stmts
  36. likeStateStmt *tidb.Stmts
  37. userLikeCountStmt *tidb.Stmts
  38. itemLikeListStmt *tidb.Stmts
  39. userLikeListStmt *tidb.Stmts
  40. statsOriginStmt *tidb.Stmts
  41. statStmt *tidb.Stmts
  42. updateLikeStmt *tidb.Stmts
  43. updateCountChangeStmt *tidb.Stmts
  44. statDbus *databus.Databus
  45. likeDbus *databus.Databus
  46. itemDbus *databus.Databus
  47. userDbus *databus.Databus
  48. cache *fanout.Fanout
  49. async *fanout.Fanout
  50. tidbAsync *fanout.Fanout
  51. BusinessMap map[string]*model.Business
  52. BusinessIDMap map[int64]*model.Business
  53. }
  54. // New dao new
  55. func New(c *conf.Config) (d *Dao) {
  56. d = &Dao{
  57. // config
  58. c: c,
  59. // mc
  60. mc: memcache.NewPool(c.Memcache.Config),
  61. mcStatsExpire: int32(time.Duration(c.Memcache.StatsExpire) / time.Second),
  62. // redis
  63. redis: xredis.NewPool(c.Redis.Config),
  64. redisStatsExpire: int64(time.Duration(c.Redis.StatsExpire) / time.Second),
  65. redisUserLikesExpire: int64(time.Duration(c.Redis.UserLikesExpire) / time.Second),
  66. redisItemLikesExpire: int64(time.Duration(c.Redis.ItemLikesExpire) / time.Second),
  67. // db
  68. tidb: tidb.NewTiDB(c.Tidb),
  69. statDbus: databus.New(c.StatDatabus),
  70. likeDbus: databus.New(c.LikeDatabus),
  71. itemDbus: databus.New(c.ItemDatabus),
  72. userDbus: databus.New(c.UserDatabus),
  73. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(10240)),
  74. async: fanout.New("async", fanout.Worker(2), fanout.Buffer(10240)),
  75. tidbAsync: fanout.New("tidb-async", fanout.Worker(10), fanout.Buffer(10240)),
  76. }
  77. d.businessesStmt = d.tidb.Prepared(_tidbBusinessesSQL)
  78. d.likeStateStmt = d.tidb.Prepared(_tidbLikeMidSQL)
  79. d.userLikeCountStmt = d.tidb.Prepared(_tidbUserLikeCountSQL)
  80. d.itemLikeListStmt = d.tidb.Prepared(_tidbItemLikeListSQL)
  81. d.userLikeListStmt = d.tidb.Prepared(_tidbUserLikeListSQL)
  82. d.statsOriginStmt = d.tidb.Prepared(_tidbStatsOriginSQL)
  83. d.statStmt = d.tidb.Prepared(_tidbStatSQL)
  84. d.updateLikeStmt = d.tidb.Prepared(_tidbUpdateLikeSQL)
  85. d.updateCountChangeStmt = d.tidb.Prepared(_tidbupdateCountChange)
  86. d.loadBusiness()
  87. go d.loadBusinessproc()
  88. return d
  89. }
  90. // Ping check connection success.
  91. func (d *Dao) Ping(c context.Context) (err error) {
  92. if err = d.pingMC(c); err != nil {
  93. PromError("mc:Ping")
  94. log.Error("d.pingMC error(%v)", err)
  95. return
  96. }
  97. if err = d.pingRedis(c); err != nil {
  98. PromError("redis:Ping")
  99. log.Error("d.pingRedis error(%v)", err)
  100. return
  101. }
  102. return
  103. }
  104. // Close close resource.
  105. func (d *Dao) Close() {
  106. d.async.Close()
  107. d.tidbAsync.Close()
  108. d.tidb.Close()
  109. d.mc.Close()
  110. d.redis.Close()
  111. }
  112. // pingMc ping memcache
  113. func (d *Dao) pingMC(c context.Context) (err error) {
  114. conn := d.mc.Get(c)
  115. defer conn.Close()
  116. item := memcache.Item{Key: "ping", Value: []byte{1}, Expiration: 100}
  117. err = conn.Set(&item)
  118. return
  119. }
  120. // pingRedis ping redis.
  121. func (d *Dao) pingRedis(c context.Context) (err error) {
  122. conn := d.redis.Get(c)
  123. if _, err = conn.Do("SET", "PING", "PONG"); err != nil {
  124. PromError("redis: ping remote")
  125. log.Error("remote redis: conn.Do(SET,PING,PONG) error(%v)", err)
  126. }
  127. conn.Close()
  128. return
  129. }
  130. // LoadBusiness .
  131. func (d *Dao) loadBusiness() {
  132. var business []*model.Business
  133. var err error
  134. businessMap := make(map[string]*model.Business)
  135. businessIDMap := make(map[int64]*model.Business)
  136. for {
  137. if business, err = d.Businesses(context.TODO()); err != nil {
  138. time.Sleep(time.Second)
  139. continue
  140. }
  141. for _, b := range business {
  142. businessMap[b.Name] = b
  143. businessIDMap[b.ID] = b
  144. }
  145. d.BusinessMap = businessMap
  146. d.BusinessIDMap = businessIDMap
  147. return
  148. }
  149. }
  150. func (d *Dao) loadBusinessproc() {
  151. for {
  152. time.Sleep(time.Minute * 5)
  153. d.loadBusiness()
  154. }
  155. }