dao.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/interface/main/dm/conf"
  6. "go-common/library/cache/redis"
  7. "go-common/library/database/elastic"
  8. "go-common/library/database/sql"
  9. "go-common/library/log"
  10. bm "go-common/library/net/http/blademaster"
  11. "go-common/library/queue/databus"
  12. "go-common/library/stat/prom"
  13. )
  14. var (
  15. missedCount = prom.CacheMiss
  16. cachedCount = prom.CacheHit
  17. )
  18. // Dao dao struct
  19. type Dao struct {
  20. conf *conf.Config
  21. httpClient *bm.Client
  22. // redis
  23. redisDM *redis.Pool
  24. redisDMIDExpire int32
  25. redisLockExpire int32
  26. redisIndexExpire int32
  27. redisVideoExpire int32
  28. // database
  29. dmMetaReader *sql.DB // bilibili_dm_meta
  30. biliDM *sql.DB // blibili_dm
  31. dmWriter *sql.DB
  32. // databus
  33. databus *databus.Databus
  34. // elastic
  35. es *elastic.Elastic
  36. }
  37. // New new a dao and return.
  38. func New(c *conf.Config) (d *Dao) {
  39. d = &Dao{
  40. conf: c,
  41. httpClient: bm.NewClient(c.HTTPClient),
  42. // redis
  43. redisDM: redis.NewPool(c.Redis.DM.Config),
  44. redisDMIDExpire: int32(time.Duration(c.Redis.DM.DMIDExpire) / time.Second),
  45. redisLockExpire: int32(time.Duration(c.Redis.DM.LockExpire) / time.Second),
  46. redisIndexExpire: int32(time.Duration(c.Redis.DM.IndexExpire) / time.Second),
  47. redisVideoExpire: int32(time.Duration(c.Redis.DM.VideoExpire) / time.Second),
  48. // database
  49. dmMetaReader: sql.NewMySQL(c.DB.DMMetaReader),
  50. biliDM: sql.NewMySQL(c.DB.DM),
  51. dmWriter: sql.NewMySQL(c.DB.DMWriter),
  52. // databus
  53. databus: databus.New(c.Databus),
  54. // elastic
  55. es: elastic.NewElastic(c.ES),
  56. }
  57. return
  58. }
  59. // PromCacheHit prom cache hit
  60. func PromCacheHit(name string) {
  61. cachedCount.Incr(name)
  62. }
  63. // PromCacheMiss prom cache hit
  64. func PromCacheMiss(name string) {
  65. missedCount.Incr(name)
  66. }
  67. // Ping ping dao status
  68. func (d *Dao) Ping(c context.Context) (err error) {
  69. // dm redis
  70. rds := d.redisDM.Get(c)
  71. defer rds.Close()
  72. if _, err = rds.Do("SET", "ping", "pong"); err != nil {
  73. log.Error("rds.Do(SET) error(%v)", err)
  74. return
  75. }
  76. // database
  77. if err = d.dmMetaReader.Ping(c); err != nil {
  78. log.Error("dmMetaReader.Ping() error(%v)", err)
  79. return
  80. }
  81. if err = d.biliDM.Ping(c); err != nil {
  82. log.Error("biliDM.Ping() error(%v)", err)
  83. return
  84. }
  85. return
  86. }