dao.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package data
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/interface/main/creative/conf"
  6. "go-common/app/interface/main/creative/model/data"
  7. "go-common/library/cache/memcache"
  8. "go-common/library/log"
  9. bm "go-common/library/net/http/blademaster"
  10. "go-common/library/database/hbase.v2"
  11. )
  12. // Dao is data dao.
  13. type Dao struct {
  14. c *conf.Config
  15. // http
  16. client *bm.Client
  17. statClient *bm.Client
  18. // uri
  19. statURI string
  20. tagV2URI string
  21. coverBFSURI string
  22. // mc
  23. mc *memcache.Pool
  24. mcExpire int32
  25. mcIdxExpire int32
  26. statCacheOn bool
  27. // hbase
  28. hbase *hbase.Client
  29. hbaseTimeOut time.Duration
  30. // chan
  31. missch chan func()
  32. }
  33. // New init dao
  34. func New(c *conf.Config) (d *Dao) {
  35. d = &Dao{
  36. c: c,
  37. // http
  38. client: bm.NewClient(c.HTTPClient.Slow),
  39. statClient: bm.NewClient(c.HTTPClient.Slow),
  40. statURI: c.Host.Data + _statURL,
  41. tagV2URI: c.Host.Tag + _tagv2URL,
  42. coverBFSURI: c.Host.Coverrec + _coverURL,
  43. // memcache
  44. mc: memcache.NewPool(c.Memcache.Data.Config),
  45. mcExpire: int32(time.Duration(c.Memcache.Data.DataExpire) / time.Second),
  46. mcIdxExpire: int32(time.Duration(c.Memcache.Data.IndexExpire) / time.Second),
  47. statCacheOn: c.StatCacheOn,
  48. // hbase
  49. hbase: hbase.NewClient(c.HBase.Config),
  50. // chan
  51. missch: make(chan func(), 1024),
  52. hbaseTimeOut: time.Duration(time.Millisecond * 200),
  53. }
  54. go d.cacheproc()
  55. return
  56. }
  57. // Stat get user stat play/fans/...
  58. func (d *Dao) Stat(c context.Context, ip string, mid int64) (st *data.Stat, err error) {
  59. // try cache
  60. if st, _ = d.statCache(c, mid); st != nil {
  61. return
  62. }
  63. // from api
  64. if st, err = d.stat(c, mid, ip); st != nil {
  65. d.AddCache(func() {
  66. d.addStatCache(context.TODO(), mid, st)
  67. })
  68. }
  69. return
  70. }
  71. // UpStat get up stat from hbase
  72. func (d *Dao) UpStat(c context.Context, mid int64, dt string) (st *data.UpBaseStat, err error) {
  73. // try cache
  74. if d.statCacheOn {
  75. if st, _ = d.upBaseStatCache(c, mid, dt); st != nil {
  76. log.Info("upBaseStatCache cache found mid(%d)", mid)
  77. return
  78. }
  79. }
  80. // from hbase
  81. if st, err = d.BaseUpStat(c, mid, dt); st != nil {
  82. d.AddCache(func() {
  83. d.addUpBaseStatCache(context.TODO(), mid, dt, st)
  84. })
  85. }
  86. return
  87. }
  88. // Ping ping success.
  89. func (d *Dao) Ping(c context.Context) (err error) {
  90. conn := d.mc.Get(c)
  91. defer conn.Close()
  92. if err = conn.Set(&memcache.Item{Key: "ping", Value: []byte("pong"), Expiration: 0}); err != nil {
  93. log.Error("mc.ping.Store error(%v)", err)
  94. }
  95. return
  96. }
  97. // Close mc close
  98. func (d *Dao) Close() (err error) {
  99. if d.mc != nil {
  100. d.mc.Close()
  101. }
  102. if d.hbase != nil {
  103. d.hbase.Close()
  104. }
  105. return
  106. }
  107. // AddCache add to chan for cache
  108. func (d *Dao) AddCache(f func()) {
  109. select {
  110. case d.missch <- f:
  111. default:
  112. log.Warn("cacheproc chan full")
  113. }
  114. }
  115. // cacheproc is a routine for execute closure.
  116. func (d *Dao) cacheproc() {
  117. for {
  118. f := <-d.missch
  119. f()
  120. }
  121. }