dao.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/interface/main/growup/conf"
  6. article "go-common/app/interface/openplatform/article/rpc/client"
  7. account "go-common/app/service/main/account/rpc/client"
  8. vip "go-common/app/service/main/vip/rpc/client"
  9. "go-common/library/cache/redis"
  10. "go-common/library/database/hbase.v2"
  11. "go-common/library/database/sql"
  12. "go-common/library/log"
  13. bm "go-common/library/net/http/blademaster"
  14. )
  15. // Dao def dao struct
  16. type Dao struct {
  17. c *conf.Config
  18. db *sql.DB
  19. rddb *sql.DB
  20. // redis
  21. redis *redis.Pool
  22. redisExpire int64
  23. // search
  24. httpRead *bm.Client
  25. // rpc
  26. acc *account.Service3
  27. art *article.Service
  28. vip *vip.Service
  29. // hbase
  30. hbase *hbase.Client
  31. hbaseTimeOut time.Duration
  32. // chan
  33. missch chan func()
  34. }
  35. // New fn
  36. func New(c *conf.Config) (d *Dao) {
  37. d = &Dao{
  38. c: c,
  39. db: sql.NewMySQL(c.DB.Growup),
  40. rddb: sql.NewMySQL(c.DB.Allowance),
  41. // redis
  42. redis: redis.NewPool(c.Redis.Config),
  43. redisExpire: int64(time.Duration(c.Redis.Expire) / time.Second),
  44. // search
  45. httpRead: bm.NewClient(c.HTTPClient.Read),
  46. // rpc
  47. acc: account.New3(c.AccountRPC),
  48. art: article.New(c.ArticleRPC),
  49. vip: vip.New(c.VipRPC),
  50. // hbase
  51. hbase: hbase.NewClient(c.HBase.Config),
  52. hbaseTimeOut: time.Duration(time.Millisecond * 200),
  53. // chan
  54. missch: make(chan func(), 1024),
  55. }
  56. go d.cacheproc()
  57. return d
  58. }
  59. // Ping ping db
  60. func (d *Dao) Ping(c context.Context) (err error) {
  61. if err = d.db.Ping(c); err != nil {
  62. log.Error("d.db.Ping error(%v)", err)
  63. return
  64. }
  65. if err = d.pingRedis(c); err != nil {
  66. log.Error("d.pingRedis error(%v)", err)
  67. }
  68. return
  69. }
  70. func (d *Dao) pingRedis(c context.Context) (err error) {
  71. conn := d.redis.Get(c)
  72. _, err = conn.Do("SET", "PING", "PONG")
  73. conn.Close()
  74. return
  75. }
  76. // Close close db conn
  77. func (d *Dao) Close() {
  78. if d.db != nil {
  79. d.db.Close()
  80. }
  81. if d.redis != nil {
  82. d.redis.Close()
  83. }
  84. if d.hbase != nil {
  85. d.hbase.Close()
  86. }
  87. }
  88. // BeginTran begin transcation
  89. func (d *Dao) BeginTran(c context.Context) (tx *sql.Tx, err error) {
  90. return d.db.Begin(c)
  91. }
  92. // AddCache add to chan for cache
  93. func (d *Dao) AddCache(f func()) {
  94. select {
  95. case d.missch <- f:
  96. default:
  97. log.Warn("cacheproc chan full")
  98. }
  99. }
  100. // cacheproc is a routine for execute closure.
  101. func (d *Dao) cacheproc() {
  102. for {
  103. f := <-d.missch
  104. f()
  105. }
  106. }