dao.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/interface/main/dm2/conf"
  6. "go-common/library/cache/memcache"
  7. "go-common/library/cache/redis"
  8. "go-common/library/database/bfs"
  9. "go-common/library/database/elastic"
  10. "go-common/library/database/sql"
  11. "go-common/library/log"
  12. bm "go-common/library/net/http/blademaster"
  13. "go-common/library/queue/databus"
  14. "go-common/library/stat/prom"
  15. )
  16. var (
  17. errorsCount = prom.BusinessErrCount
  18. missedCount = prom.CacheMiss
  19. cachedCount = prom.CacheHit
  20. )
  21. // Dao dm dao.
  22. type Dao struct {
  23. conf *conf.Config
  24. // mysql
  25. dmWriter *sql.DB
  26. dmReader *sql.DB
  27. dbDM *sql.DB
  28. // redis
  29. dmRds *redis.Pool
  30. dmRdsExpire int32
  31. // recent dm redis
  32. dmRctRds *redis.Pool
  33. dmRctExpire int32
  34. // segment dm redis
  35. dmSegRds *redis.Pool
  36. dmSegExpire int32
  37. // memcache
  38. dmMC *memcache.Pool
  39. dmExpire int32
  40. subjectExpire int32
  41. historyExpire int32
  42. ajaxExpire int32
  43. dmMaskExpire int32
  44. filterMC *memcache.Pool
  45. filterMCExpire int32
  46. dmSegMC *memcache.Pool
  47. dmSegMCExpire int32
  48. dmLimiterMCExpire int32
  49. // http
  50. httpCli *bm.Client
  51. // databus
  52. databus *databus.Databus
  53. actionPub *databus.Databus
  54. // elastic
  55. elastic *elastic.Elastic
  56. // bfsCli
  57. bfsCli *bfs.BFS
  58. // subtitle mc
  59. subtitleMc *memcache.Pool
  60. subtitleMcExpire int32
  61. subtitleCheckPub *databus.Databus
  62. }
  63. // New new a dao and return.
  64. func New(c *conf.Config) (d *Dao) {
  65. d = &Dao{
  66. conf: c,
  67. // mysql
  68. dmWriter: sql.NewMySQL(c.DB.DMWriter),
  69. dmReader: sql.NewMySQL(c.DB.DMReader),
  70. dbDM: sql.NewMySQL(c.DB.DM),
  71. // redis
  72. dmRds: redis.NewPool(c.Redis.DM.Config),
  73. dmRdsExpire: int32(time.Duration(c.Redis.DM.Expire) / time.Second),
  74. // recent dm redis
  75. dmRctRds: redis.NewPool(c.Redis.DMRct.Config),
  76. dmRctExpire: int32(time.Duration(c.Redis.DMRct.Expire) / time.Second),
  77. // segment dm redis
  78. dmSegRds: redis.NewPool(c.Redis.DMSeg.Config),
  79. dmSegExpire: int32(time.Duration(c.Redis.DMSeg.Expire) / time.Second),
  80. // memcache
  81. dmMC: memcache.NewPool(c.Memcache.DM.Config),
  82. dmExpire: int32(time.Duration(c.Memcache.DM.DMExpire) / time.Second),
  83. subjectExpire: int32(time.Duration(c.Memcache.DM.SubjectExpire) / time.Second),
  84. historyExpire: int32(time.Duration(c.Memcache.DM.HistoryExpire) / time.Second),
  85. ajaxExpire: int32(time.Duration(c.Memcache.DM.AjaxExpire) / time.Second),
  86. dmMaskExpire: int32(time.Duration(c.Memcache.DM.DMMaskExpire) / time.Second),
  87. filterMC: memcache.NewPool(c.Memcache.Filter.Config),
  88. filterMCExpire: int32(time.Duration(c.Memcache.Filter.Expire) / time.Second),
  89. dmSegMC: memcache.NewPool(c.Memcache.DMSeg.Config),
  90. dmSegMCExpire: int32(time.Duration(c.Memcache.DMSeg.DMExpire) / time.Second),
  91. dmLimiterMCExpire: int32(time.Duration(c.Memcache.DMSeg.DMLimiterExpire) / time.Second),
  92. // http
  93. httpCli: bm.NewClient(c.HTTPCli),
  94. // databus
  95. databus: databus.New(c.Databus),
  96. actionPub: databus.New(c.ActionPub),
  97. // elastic
  98. elastic: elastic.NewElastic(c.Elastic),
  99. // bfscli
  100. bfsCli: bfs.New(c.Bfs.Client),
  101. // subtitle MC
  102. subtitleMc: memcache.NewPool(c.Memcache.Subtitle.Config),
  103. subtitleMcExpire: int32(time.Duration(c.Memcache.Subtitle.Expire) / time.Second),
  104. subtitleCheckPub: databus.New(c.SubtitleCheckPub),
  105. }
  106. return
  107. }
  108. func (d *Dao) hitSubject(oid int64) int64 {
  109. return oid % _subjectSharding
  110. }
  111. func (d *Dao) hitIndex(oid int64) int64 {
  112. return oid % _indexSharding
  113. }
  114. func (d *Dao) hitContent(dmid int64) int64 {
  115. return dmid % _contentSharding
  116. }
  117. // BeginBiliDMTrans begin db transaction.
  118. func (d *Dao) BeginBiliDMTrans(c context.Context) (*sql.Tx, error) {
  119. return d.dbDM.Begin(c)
  120. }
  121. // Ping ping success.
  122. func (d *Dao) Ping(c context.Context) (err error) {
  123. if err = d.dmReader.Ping(c); err != nil {
  124. log.Error("d.dmReader error(%v)", err)
  125. return
  126. }
  127. if err = d.dmWriter.Ping(c); err != nil {
  128. log.Error("d.dmWriter error(%v)", err)
  129. return
  130. }
  131. // mc
  132. dmMC := d.dmMC.Get(c)
  133. defer dmMC.Close()
  134. if err = dmMC.Set(&memcache.Item{Key: "ping", Value: []byte("pong"), Expiration: 0}); err != nil {
  135. log.Error("dmMC.Set error(%v)", err)
  136. return
  137. }
  138. filterMC := d.filterMC.Get(c)
  139. defer filterMC.Close()
  140. if err = filterMC.Set(&memcache.Item{Key: "ping", Value: []byte("pong"), Expiration: 0}); err != nil {
  141. log.Error("filterMC.Set error(%v)", err)
  142. return
  143. }
  144. dmSegMC := d.dmSegMC.Get(c)
  145. defer dmSegMC.Close()
  146. if err = dmSegMC.Set(&memcache.Item{Key: "ping", Value: []byte("pong"), Expiration: 0}); err != nil {
  147. log.Error("dmSegMC.Set error(%v)", err)
  148. return
  149. }
  150. // dm redis
  151. dmConn := d.dmRds.Get(c)
  152. defer dmConn.Close()
  153. if _, err = dmConn.Do("SET", "ping", "pong"); err != nil {
  154. log.Error("dmConn.Do(SET) error(%v)", err)
  155. return
  156. }
  157. rctRdsConn := d.dmRctRds.Get(c)
  158. defer rctRdsConn.Close()
  159. if _, err = rctRdsConn.Do("SET", "ping", "pong"); err != nil {
  160. log.Error("rctRdsConn.Do(SET) error(%v)", err)
  161. return
  162. }
  163. // segment dm redis
  164. segRdsConn := d.dmSegRds.Get(c)
  165. defer segRdsConn.Close()
  166. if _, err = segRdsConn.Do("SET", "ping", "pong"); err != nil {
  167. log.Error("segRdsConn.Do(SET) error(%v)", err)
  168. }
  169. return
  170. }
  171. // PromError prom error
  172. func PromError(name string) {
  173. errorsCount.Incr(name)
  174. }
  175. // PromCacheHit prom cache hit
  176. func PromCacheHit(name string, v int64) {
  177. cachedCount.Add(name, v)
  178. }
  179. // PromCacheMiss prom cache hit
  180. func PromCacheMiss(name string, v int64) {
  181. missedCount.Add(name, v)
  182. }