dao.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package dao
  2. import (
  3. "context"
  4. "go-common/app/service/video/stream-mng/conf"
  5. "go-common/library/cache"
  6. "go-common/library/cache/memcache"
  7. "go-common/library/cache/redis"
  8. xsql "go-common/library/database/sql"
  9. bm "go-common/library/net/http/blademaster"
  10. "go-common/library/sync/pipeline/fanout"
  11. "github.com/bluele/gcache"
  12. )
  13. // Dao dao
  14. type Dao struct {
  15. c *conf.Config
  16. mc *memcache.Pool
  17. redis *redis.Pool
  18. db *xsql.DB
  19. tidb *xsql.DB
  20. httpClient *bm.Client
  21. cache *cache.Cache
  22. liveAside *fanout.Fanout
  23. localCache gcache.Cache
  24. // DB
  25. // 新版本主流
  26. stmtMainStreamCreate *xsql.Stmt
  27. stmtMainStreamChangeDefaultVendor *xsql.Stmt
  28. stmtMainStreamChangeOptions *xsql.Stmt
  29. stmtMainStreamClearAllStreaming *xsql.Stmt
  30. // 备用流
  31. stmtBackupStreamCreate *xsql.Stmt
  32. // 旧版本流
  33. stmtLegacyStreamCreate *xsql.Stmt
  34. stmtLegacyStreamEnableNewUpRank *xsql.Stmt
  35. stmtLegacyStreamDisableUpRank *xsql.Stmt
  36. stmtLegacyStreamClearStreamFoward *xsql.Stmt
  37. stmtLegacyStreamNotify *xsql.Stmt
  38. // tidb
  39. stmtUpStreamDispatch *xsql.Stmt
  40. }
  41. // New init mysql db
  42. func New(c *conf.Config) (dao *Dao) {
  43. dao = &Dao{
  44. c: c,
  45. mc: memcache.NewPool(c.Memcache),
  46. redis: redis.NewPool(c.Redis),
  47. db: xsql.NewMySQL(c.MySQL),
  48. tidb: xsql.NewMySQL(c.TIDB),
  49. httpClient: bm.NewClient(c.HTTPClient),
  50. cache: cache.New(1, 10240),
  51. liveAside: fanout.New("stream-mng"),
  52. localCache: gcache.New(10240).Simple().Build(),
  53. }
  54. // 新版本主流
  55. dao.stmtMainStreamCreate = dao.db.Prepared(_insertMainStream)
  56. dao.stmtMainStreamChangeDefaultVendor = dao.db.Prepared(_changeDefaultVendor)
  57. dao.stmtMainStreamChangeOptions = dao.db.Prepared(_changeOptions)
  58. dao.stmtMainStreamClearAllStreaming = dao.db.Prepared(_clearAllStreaming)
  59. // 备用流
  60. dao.stmtBackupStreamCreate = dao.db.Prepared(_insertBackupStream)
  61. // 旧版本流
  62. dao.stmtLegacyStreamCreate = dao.db.Prepared(_insertOfficialStream)
  63. dao.stmtLegacyStreamEnableNewUpRank = dao.db.Prepared(_updateUpOfficialStreamStatus)
  64. dao.stmtLegacyStreamDisableUpRank = dao.db.Prepared(_updateForwardOfficialStreamStatus)
  65. dao.stmtLegacyStreamClearStreamFoward = dao.db.Prepared(_updateOfficalStreamUpRankStatus)
  66. dao.stmtLegacyStreamNotify = dao.db.Prepared(_setOriginStreamingStatus)
  67. // tidb
  68. dao.stmtUpStreamDispatch = dao.tidb.Prepared(_insertUpStreamInfo)
  69. return
  70. }
  71. // Close close the resource.
  72. func (d *Dao) Close() {
  73. d.mc.Close()
  74. d.redis.Close()
  75. d.db.Close()
  76. d.liveAside.Close()
  77. return
  78. }
  79. // Ping dao ping
  80. func (d *Dao) Ping(c context.Context) error {
  81. // TODO: if you need use mc,redis, please add
  82. return d.db.Ping(c)
  83. }