dao.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package dao
  2. import (
  3. "context"
  4. "go-common/app/service/main/share/conf"
  5. "go-common/library/cache"
  6. "go-common/library/cache/redis"
  7. "go-common/library/database/sql"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. )
  11. // Dao is redis dao.
  12. type Dao struct {
  13. c *conf.Config
  14. db *sql.DB
  15. rds *redis.Pool
  16. // cache chan
  17. cacheCh chan func()
  18. // databus
  19. databus *databus.Databus
  20. // archiveDatabus
  21. archiveDatabus *databus.Databus
  22. // sources
  23. sources map[int64]struct{}
  24. // asyncCache
  25. asyncCache *cache.Cache
  26. }
  27. // New is new redis dao.
  28. func New(c *conf.Config) (d *Dao) {
  29. d = &Dao{
  30. c: c,
  31. db: sql.NewMySQL(c.DB),
  32. rds: redis.NewPool(c.Redis),
  33. cacheCh: make(chan func(), 1024),
  34. databus: databus.New(c.Databus),
  35. archiveDatabus: databus.New(c.ArchiveDatabus),
  36. sources: make(map[int64]struct{}, len(c.Sources)),
  37. asyncCache: cache.New(c.Worker, 1024),
  38. }
  39. for _, s := range c.Sources {
  40. d.sources[s] = struct{}{}
  41. }
  42. return d
  43. }
  44. // BeginTran begin transcation.
  45. func (d *Dao) BeginTran(c context.Context) (tx *sql.Tx, err error) {
  46. return d.db.Begin(c)
  47. }
  48. // Close close
  49. func (d *Dao) Close() {
  50. d.db.Close()
  51. d.rds.Close()
  52. }
  53. // Ping ping
  54. func (d *Dao) Ping() (err error) {
  55. if err = d.db.Ping(context.Background()); err != nil {
  56. log.Error("d.db.Ping error(%v)", err)
  57. return
  58. }
  59. conn := d.rds.Get(context.Background())
  60. defer conn.Close()
  61. if _, err = conn.Do("SET", "ping", "pong"); err != nil {
  62. log.Error("redis.Set error(%v)", err)
  63. return
  64. }
  65. return
  66. }