dao.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package like
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/interface/main/activity/conf"
  6. "go-common/library/cache/memcache"
  7. "go-common/library/cache/redis"
  8. "go-common/library/database/elastic"
  9. xsql "go-common/library/database/sql"
  10. "go-common/library/log"
  11. httpx "go-common/library/net/http/blademaster"
  12. "go-common/library/stat/prom"
  13. "go-common/library/sync/pipeline/fanout"
  14. )
  15. const (
  16. _lotteryIndex = "/matsuri/api/mission"
  17. _lotteryAddTimes = "/matsuri/api/add/times"
  18. _likeItemURI = "/activity/likes/list/%d"
  19. _sourceItemURI = "/activity/web/view/data/%d"
  20. _tagsURI = "/x/internal/tag/archive/multi/tags"
  21. )
  22. // Dao struct
  23. type Dao struct {
  24. db *xsql.DB
  25. subjectStmt *xsql.Stmt
  26. voteLogStmt *xsql.Stmt
  27. mc *memcache.Pool
  28. mcLikeExpire int32
  29. mcLikeIPExpire int32
  30. mcPerpetualExpire int32
  31. mcItemExpire int32
  32. mcSubStatExpire int32
  33. mcViewRankExpire int32
  34. mcSourceItemExpire int32
  35. mcProtocolExpire int32
  36. redis *redis.Pool
  37. redisExpire int32
  38. matchExpire int32
  39. followExpire int32
  40. hotDotExpire int32
  41. randomExpire int32
  42. lotteryIndexURL string
  43. addLotteryTimesURL string
  44. likeItemURL string
  45. sourceItemURL string
  46. tagURL string
  47. client *httpx.Client
  48. cacheCh chan func()
  49. cache *fanout.Fanout
  50. es *elastic.Elastic
  51. }
  52. // New init
  53. func New(c *conf.Config) (dao *Dao) {
  54. dao = &Dao{
  55. db: xsql.NewMySQL(c.MySQL.Like),
  56. mc: memcache.NewPool(c.Memcache.Like),
  57. mcLikeExpire: int32(time.Duration(c.Memcache.LikeExpire) / time.Second),
  58. mcLikeIPExpire: int32(time.Duration(c.Memcache.LikeIPExpire) / time.Second),
  59. mcPerpetualExpire: int32(time.Duration(c.Memcache.PerpetualExpire) / time.Second),
  60. mcItemExpire: int32(time.Duration(c.Memcache.ItemExpire) / time.Second),
  61. mcSubStatExpire: int32(time.Duration(c.Memcache.SubStatExpire) / time.Second),
  62. mcViewRankExpire: int32(time.Duration(c.Memcache.ViewRankExpire) / time.Second),
  63. mcSourceItemExpire: int32(time.Duration(c.Memcache.SourceItemExpire) / time.Second),
  64. mcProtocolExpire: int32(time.Duration(c.Memcache.ProtocolExpire) / time.Second),
  65. redis: redis.NewPool(c.Redis.Config),
  66. cacheCh: make(chan func(), 1024),
  67. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  68. redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),
  69. matchExpire: int32(time.Duration(c.Redis.MatchExpire) / time.Second),
  70. followExpire: int32(time.Duration(c.Redis.FollowExpire) / time.Second),
  71. hotDotExpire: int32(time.Duration(c.Redis.HotDotExpire) / time.Second),
  72. randomExpire: int32(time.Duration(c.Redis.RandomExpire) / time.Second),
  73. lotteryIndexURL: c.Host.Activity + _lotteryIndex,
  74. addLotteryTimesURL: c.Host.Activity + _lotteryAddTimes,
  75. likeItemURL: c.Host.Activity + _likeItemURI,
  76. sourceItemURL: c.Host.Activity + _sourceItemURI,
  77. tagURL: c.Host.APICo + _tagsURI,
  78. client: httpx.NewClient(c.HTTPClient),
  79. es: elastic.NewElastic(c.Elastic),
  80. }
  81. dao.subjectStmt = dao.db.Prepared(_selSubjectSQL)
  82. dao.voteLogStmt = dao.db.Prepared(_votLogSQL)
  83. go dao.cacheproc()
  84. return
  85. }
  86. // CVoteLog chan Vote Log
  87. func (dao *Dao) CVoteLog(c context.Context, sid int64, aid int64, mid int64, stage int64, vote int64) {
  88. dao.cacheCh <- func() {
  89. dao.VoteLog(c, sid, aid, mid, stage, vote)
  90. }
  91. }
  92. // Close Dao
  93. func (dao *Dao) Close() {
  94. if dao.db != nil {
  95. dao.db.Close()
  96. }
  97. if dao.redis != nil {
  98. dao.redis.Close()
  99. }
  100. if dao.mc != nil {
  101. dao.mc.Close()
  102. }
  103. close(dao.cacheCh)
  104. }
  105. // Ping Dao
  106. func (dao *Dao) Ping(c context.Context) error {
  107. return dao.db.Ping(c)
  108. }
  109. func (dao *Dao) cacheproc() {
  110. for {
  111. f, ok := <-dao.cacheCh
  112. if !ok {
  113. return
  114. }
  115. f()
  116. }
  117. }
  118. // PromError stat and log.
  119. func PromError(name string, format string, args ...interface{}) {
  120. prom.BusinessErrCount.Incr(name)
  121. log.Error(format, args...)
  122. }