dao.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package dao
  2. import (
  3. "context"
  4. "go-common/app/admin/main/dm/conf"
  5. "go-common/app/admin/main/dm/model"
  6. "go-common/library/cache/memcache"
  7. "go-common/library/database/bfs"
  8. "go-common/library/database/elastic"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. bm "go-common/library/net/http/blademaster"
  12. "go-common/library/queue/databus"
  13. )
  14. // Dao dao layer.
  15. type Dao struct {
  16. actionPub *databus.Databus
  17. // http
  18. httpSearch *bm.Client
  19. httpCli *bm.Client
  20. // mysql
  21. dmMetaWriter *sql.DB
  22. dmMetaReader *sql.DB
  23. biliDM *sql.DB
  24. // memcache
  25. filterMC *memcache.Pool
  26. // subtitle mc
  27. subtitleMC *memcache.Pool
  28. // elastic client
  29. esCli *elastic.Elastic
  30. // bfs client
  31. bfsCli *bfs.BFS
  32. // http uri
  33. sendNotifyURI string
  34. addMoralURI string
  35. blockUserURI string
  36. blockInfoAddURI string
  37. sendJudgeURI string
  38. viewsURI string
  39. typesURI string
  40. seasonURI string
  41. maskURI string
  42. workFlowURI string
  43. berserkerURI string
  44. }
  45. // New new a dao and return.
  46. func New(c *conf.Config) (d *Dao) {
  47. d = &Dao{
  48. actionPub: databus.New(c.ActionPub),
  49. // mysql
  50. dmMetaWriter: sql.NewMySQL(c.DB.DMMetaWriter),
  51. dmMetaReader: sql.NewMySQL(c.DB.DMMetaReader),
  52. biliDM: sql.NewMySQL(c.DB.DM),
  53. // memcache
  54. filterMC: memcache.NewPool(c.Memcache.Filter.Config),
  55. subtitleMC: memcache.NewPool(c.Memcache.Subtitle.Config),
  56. // elastic client
  57. esCli: elastic.NewElastic(c.Elastic),
  58. // http client
  59. sendNotifyURI: c.Host.Message + _sendNotify,
  60. addMoralURI: c.Host.Account + _addMoral,
  61. blockUserURI: c.Host.API + _blockUser,
  62. blockInfoAddURI: c.Host.API + _blockInfoAdd,
  63. sendJudgeURI: c.Host.API + _sendJudge,
  64. viewsURI: c.Host.Videoup + _views,
  65. typesURI: c.Host.Videoup + _types,
  66. seasonURI: c.Host.Season + _season,
  67. maskURI: c.Host.Mask + _mask,
  68. berserkerURI: c.Host.Berserker,
  69. workFlowURI: c.Host.API + _workFlowAppealDelete,
  70. httpCli: bm.NewClient(c.HTTPClient.ClientConfig),
  71. httpSearch: bm.NewClient(c.HTTPSearch.ClientConfig),
  72. bfsCli: bfs.New(c.BFS),
  73. }
  74. return
  75. }
  76. // SendAction send action to job.
  77. func (d *Dao) SendAction(c context.Context, k string, action *model.Action) (err error) {
  78. if err = d.actionPub.Send(c, k, action); err != nil {
  79. log.Error("actionPub.Send(action:%s,data:%s) error(%v)", action.Action, action.Data, err)
  80. } else {
  81. log.Info("actionPub.Send(action:%s,data:%s) success", action.Action, action.Data)
  82. }
  83. return
  84. }
  85. //BeginBiliDMTrans begin a transsaction of biliDM
  86. func (d *Dao) BeginBiliDMTrans(c context.Context) (*sql.Tx, error) {
  87. return d.biliDM.Begin(c)
  88. }
  89. // Ping ping success.
  90. func (d *Dao) Ping(c context.Context) (err error) {
  91. if err = d.dmMetaWriter.Ping(c); err != nil {
  92. log.Error("d.dmMetaWriter error(%v)", err)
  93. return
  94. }
  95. if err = d.dmMetaReader.Ping(c); err != nil {
  96. log.Error("d.dmMetaReader error(%v)", err)
  97. return
  98. }
  99. if err = d.biliDM.Ping(c); err != nil {
  100. log.Error("d.biliDM error(%v)", err)
  101. }
  102. // mc
  103. filterMC := d.filterMC.Get(c)
  104. defer filterMC.Close()
  105. if err = filterMC.Set(&memcache.Item{Key: "ping", Value: []byte("pong"), Expiration: 0}); err != nil {
  106. log.Error("filterMC.Set error(%v)", err)
  107. return
  108. }
  109. return
  110. }