dao.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/dm2/conf"
  6. "go-common/library/cache/memcache"
  7. "go-common/library/cache/redis"
  8. "go-common/library/database/bfs"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. bm "go-common/library/net/http/blademaster"
  12. )
  13. const (
  14. _pageSize = 1000
  15. )
  16. // Dao dao struct
  17. type Dao struct {
  18. conf *conf.Config
  19. // batch query size
  20. pageSize int
  21. // database
  22. dmWriter *sql.DB
  23. dmReader *sql.DB
  24. biliDMWriter *sql.DB
  25. // redis
  26. dmRds *redis.Pool
  27. dmRdsExpire int32
  28. // recent dm redis
  29. dmRctRds *redis.Pool
  30. dmRctExpire int32
  31. // segment dm redis
  32. dmSegRds *redis.Pool
  33. dmSegExpire int32
  34. // memcache
  35. mc *memcache.Pool
  36. mcExpire int32
  37. subtitleMc *memcache.Pool
  38. subtitleMcExpire int32
  39. // memcache new
  40. dmSegMC *memcache.Pool
  41. dmSegMCExpire int32
  42. // recent dm redis
  43. rctRds *redis.Pool
  44. rctRdsExpire int32
  45. // http client
  46. httpCli *bm.Client
  47. // upload dm
  48. bfsCli *bfs.BFS
  49. }
  50. // New new a dao.
  51. func New(c *conf.Config) (d *Dao) {
  52. d = &Dao{
  53. conf: c,
  54. dmWriter: sql.NewMySQL(c.DB.DMWriter),
  55. dmReader: sql.NewMySQL(c.DB.DMReader),
  56. biliDMWriter: sql.NewMySQL(c.DB.BiliDMWriter),
  57. dmRds: redis.NewPool(c.Redis.DM.Config),
  58. dmRdsExpire: int32(time.Duration(c.Redis.DM.Expire) / time.Second),
  59. dmRctRds: redis.NewPool(c.Redis.DMRct.Config),
  60. dmRctExpire: int32(time.Duration(c.Redis.DMRct.Expire) / time.Second),
  61. dmSegRds: redis.NewPool(c.Redis.DMSeg.Config),
  62. dmSegExpire: int32(time.Duration(c.Redis.DMSeg.Expire) / time.Second),
  63. mc: memcache.NewPool(c.Memcache.Config),
  64. mcExpire: int32(time.Duration(c.Memcache.Expire) / time.Second),
  65. subtitleMc: memcache.NewPool(c.SubtitleMemcache.Config),
  66. subtitleMcExpire: int32(time.Duration(c.SubtitleMemcache.Expire) / time.Second),
  67. dmSegMC: memcache.NewPool(c.DMMemcache.Config),
  68. dmSegMCExpire: int32(time.Duration(c.DMMemcache.Expire) / time.Second),
  69. rctRds: redis.NewPool(c.Redis.DMRct.Config),
  70. rctRdsExpire: int32(time.Duration(c.Redis.DMRct.Expire) / time.Second),
  71. httpCli: bm.NewClient(c.HTTPClient),
  72. bfsCli: bfs.New(c.Bfs.Client),
  73. pageSize: int(c.DB.QueryPageSize),
  74. }
  75. if d.pageSize <= 0 {
  76. d.pageSize = _pageSize
  77. }
  78. return
  79. }
  80. // BeginTran begin mysql transaction
  81. func (d *Dao) BeginTran(c context.Context) (*sql.Tx, error) {
  82. return d.dmWriter.Begin(c)
  83. }
  84. // BeginBiliDMTran .
  85. func (d *Dao) BeginBiliDMTran(c context.Context) (*sql.Tx, error) {
  86. return d.biliDMWriter.Begin(c)
  87. }
  88. // Ping dm dao ping.
  89. func (d *Dao) Ping(c context.Context) (err error) {
  90. if err = d.dmWriter.Ping(c); err != nil {
  91. log.Error("dmWriter.Ping() error(%v)", err)
  92. return
  93. }
  94. if err = d.dmReader.Ping(c); err != nil {
  95. log.Error("dmReader.Ping() error(%v)", err)
  96. return
  97. }
  98. if err = d.biliDMWriter.Ping(c); err != nil {
  99. log.Error("biliDMWriter.Ping() error(%v)", err)
  100. return
  101. }
  102. // mc
  103. mconn := d.mc.Get(c)
  104. defer mconn.Close()
  105. if err = mconn.Set(&memcache.Item{Key: "ping", Value: []byte("pong"), Expiration: 0}); err != nil {
  106. log.Error("mc.Set error(%v)", err)
  107. return
  108. }
  109. // dm redis
  110. dmRdsConn := d.dmRds.Get(c)
  111. defer dmRdsConn.Close()
  112. if _, err = dmRdsConn.Do("SET", "ping", "pong"); err != nil {
  113. log.Error("dmRds.Set error(%v)", err)
  114. return
  115. }
  116. rctRdsConn := d.dmRctRds.Get(c)
  117. defer rctRdsConn.Close()
  118. if _, err = rctRdsConn.Do("SET", "ping", "pong"); err != nil {
  119. log.Error("rctRds.Set error(%v)", err)
  120. return
  121. }
  122. dmSegConn := d.dmSegRds.Get(c)
  123. defer dmSegConn.Close()
  124. if _, err = dmSegConn.Do("SET", "ping", "pong"); err != nil {
  125. log.Error("dmSegConn.Set error(%v)", err)
  126. return
  127. }
  128. return
  129. }