dao.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/service/main/member/conf"
  6. "go-common/app/service/main/member/dao/block"
  7. "go-common/library/cache/memcache"
  8. "go-common/library/cache/redis"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. bm "go-common/library/net/http/blademaster"
  12. "go-common/library/queue/databus"
  13. "go-common/library/sync/pipeline/fanout"
  14. xtime "go-common/library/time"
  15. )
  16. const (
  17. _searchLogURI = "/x/admin/search/log"
  18. _deleteLogURI = "/x/admin/search/log/delete"
  19. )
  20. // Dao struct info of Dao.
  21. type Dao struct {
  22. *cacheTTL
  23. accdb *sql.DB
  24. db *sql.DB
  25. mc *memcache.Pool
  26. c *conf.Config
  27. client *bm.Client
  28. redis *redis.Pool
  29. cache *fanout.Fanout
  30. // databus
  31. logDatabus *databus.Databus
  32. accNotify *databus.Databus
  33. block *block.Dao
  34. }
  35. type cacheTTL struct {
  36. baseTTL int32
  37. moralTTL int32
  38. captureTimesTTL int32
  39. captureCodeTTL int32
  40. captureErrTimesTTL int32
  41. applyInfoTTL int32
  42. }
  43. // New new a Dao and return.
  44. func New(c *conf.Config) (d *Dao) {
  45. d = &Dao{
  46. c: c,
  47. accdb: sql.NewMySQL(c.AccMysql),
  48. db: sql.NewMySQL(c.Mysql),
  49. mc: memcache.NewPool(c.Memcache),
  50. client: bm.NewClient(c.HTTPClient),
  51. redis: redis.NewPool(c.Redis),
  52. cache: fanout.New("memberServiceCache", fanout.Worker(1), fanout.Buffer(10240)),
  53. logDatabus: databus.New(c.Databus),
  54. accNotify: databus.New(c.AccountNotify),
  55. }
  56. d.block = block.New(
  57. c,
  58. sql.NewMySQL(c.BlockMySQL),
  59. memcache.NewPool(c.BlockMemcache),
  60. d.client,
  61. d.NotifyPurgeCache,
  62. )
  63. d.cacheTTL = newCacheTTL(c.CacheTTL)
  64. return
  65. }
  66. // Ping ping health.
  67. func (d *Dao) Ping(c context.Context) error {
  68. if err := d.db.Ping(c); err != nil {
  69. log.Error("Failed to ping database: %+v", err)
  70. return err
  71. }
  72. return d.pingMC(c)
  73. }
  74. // Close close connections of mc, redis, db.
  75. func (d *Dao) Close() {
  76. if d.mc != nil {
  77. d.mc.Close()
  78. }
  79. if d.db != nil {
  80. d.db.Close()
  81. }
  82. if d.block != nil {
  83. d.block.Close()
  84. }
  85. }
  86. func durationToSeconds(expire xtime.Duration) int32 {
  87. return int32(time.Duration(expire) / time.Second)
  88. }
  89. func newCacheTTL(c *conf.CacheTTL) *cacheTTL {
  90. return &cacheTTL{
  91. baseTTL: durationToSeconds(c.BaseTTL),
  92. moralTTL: durationToSeconds(c.MoralTTL),
  93. captureTimesTTL: durationToSeconds(c.CaptureErrTimesTTL),
  94. captureCodeTTL: durationToSeconds(c.CaptureCodeTTL),
  95. captureErrTimesTTL: durationToSeconds(c.CaptureErrTimesTTL),
  96. applyInfoTTL: durationToSeconds(c.ApplyInfoTTL),
  97. }
  98. }
  99. // BlockImpl is
  100. func (d *Dao) BlockImpl() *block.Dao {
  101. return d.block
  102. }