dao.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "time"
  7. "go-common/app/interface/main/push-archive/conf"
  8. "go-common/app/interface/main/push-archive/model"
  9. xredis "go-common/library/cache/redis"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. xhttp "go-common/library/net/http/blademaster"
  13. "go-common/library/stat/prom"
  14. "go-common/library/database/hbase.v2"
  15. )
  16. // Dao .
  17. type Dao struct {
  18. c *conf.Config
  19. db *xsql.DB
  20. redis *xredis.Pool
  21. relationHBase *hbase.Client
  22. relationHBaseReadTimeout time.Duration
  23. relationHBaseWriteTimeout time.Duration
  24. fanHBase *hbase.Client
  25. fanHBaseReadTimeout time.Duration
  26. httpClient *xhttp.Client
  27. settingStmt *xsql.Stmt
  28. setSettingStmt *xsql.Stmt
  29. settingsMaxIDStmt *xsql.Stmt
  30. setStatisticsStmt *xsql.Stmt
  31. UpperLimitExpire int32
  32. FanGroups map[string]*FanGroup
  33. GroupOrder []string
  34. Proportions []Proportion
  35. ActiveDefaultTime map[int]int
  36. PushBusinessID string
  37. PushAuth string
  38. }
  39. var (
  40. errorsCount = prom.BusinessErrCount
  41. infosCount = prom.BusinessInfoCount
  42. )
  43. // New creates a push-service DAO instance.
  44. func New(c *conf.Config) *Dao {
  45. d := &Dao{
  46. c: c,
  47. db: xsql.NewMySQL(c.MySQL),
  48. relationHBase: hbase.NewClient(&c.HBase.Config),
  49. relationHBaseReadTimeout: time.Duration(c.HBase.ReadTimeout),
  50. relationHBaseWriteTimeout: time.Duration(c.HBase.WriteTimeout),
  51. fanHBase: hbase.NewClient(&c.FansHBase.Config),
  52. fanHBaseReadTimeout: time.Duration(c.FansHBase.ReadTimeout),
  53. redis: xredis.NewPool(c.Redis),
  54. httpClient: xhttp.NewClient(c.HTTPClient),
  55. UpperLimitExpire: int32(time.Duration(c.ArcPush.UpperLimitExpire) / time.Second),
  56. FanGroups: NewFanGroups(c),
  57. Proportions: NewProportion(c.ArcPush.Proportions),
  58. }
  59. d.settingStmt = d.db.Prepared(_settingSQL)
  60. d.setSettingStmt = d.db.Prepared(_setSettingSQL)
  61. d.settingsMaxIDStmt = d.db.Prepared(_settingsMaxIDSQL)
  62. d.setStatisticsStmt = d.db.Prepared(_inStatisticsSQL)
  63. for _, gp := range c.ArcPush.Order {
  64. if _, exist := d.FanGroups[gp]; !exist {
  65. log.Error("order config error, group %s not exist", gp)
  66. fmt.Printf("order config error, group %s not exist\r\n\r\n", gp)
  67. os.Exit(1)
  68. }
  69. }
  70. d.GroupOrder = c.ArcPush.Order
  71. // default active time
  72. d.ActiveDefaultTime = map[int]int{}
  73. for _, one := range c.ArcPush.ActiveTime {
  74. d.ActiveDefaultTime[one] = 1
  75. }
  76. return d
  77. }
  78. // PromError prom error
  79. func PromError(name string) {
  80. errorsCount.Incr(name)
  81. }
  82. // PromInfo add prom info
  83. func PromInfo(name string) {
  84. infosCount.Incr(name)
  85. }
  86. // PromInfoAdd add prom info by value
  87. func PromInfoAdd(name string, value int64) {
  88. infosCount.Add(name, value)
  89. }
  90. // PromChanLen channel length
  91. func PromChanLen(name string, length int64) {
  92. infosCount.State(name, length)
  93. }
  94. // BeginTx begin transaction.
  95. func (d *Dao) BeginTx(c context.Context) (*xsql.Tx, error) {
  96. return d.db.Begin(c)
  97. }
  98. // Close dao.
  99. func (d *Dao) Close() (err error) {
  100. if err = d.relationHBase.Close(); err != nil {
  101. log.Error("d.relationHBase.Close() error(%v)", err)
  102. PromError("hbase:close")
  103. }
  104. if err = d.fanHBase.Close(); err != nil {
  105. log.Error("d.fanHBase.Close() error(%v)", err)
  106. PromError("fanHBase:close")
  107. }
  108. if err = d.redis.Close(); err != nil {
  109. log.Error("d.redis.Close() error(%v)", err)
  110. PromError("redis:close")
  111. }
  112. if err = d.db.Close(); err != nil {
  113. log.Error("d.db.Close() error(%v)", err)
  114. PromError("db:close")
  115. }
  116. return
  117. }
  118. // Ping check connection status.
  119. func (d *Dao) Ping(c context.Context) (err error) {
  120. if err = d.db.Ping(c); err != nil {
  121. PromError("mysql:Ping")
  122. log.Error("d.db.Ping error(%v)", err)
  123. return
  124. }
  125. if err = d.pingRedis(c); err != nil {
  126. PromError("redis:Ping")
  127. log.Error("d.redis.Ping error(%v)", err)
  128. }
  129. return
  130. }
  131. // Batch 批量处理
  132. func Batch(list *[]int64, batchSize int, retry int, params *model.BatchParam, f func(fans *[]int64, params map[string]interface{}) error) {
  133. if params == nil {
  134. log.Warn("Batch params(%+v) nil", params)
  135. return
  136. }
  137. for {
  138. var (
  139. mids []int64
  140. err error
  141. )
  142. l := len(*list)
  143. if l == 0 {
  144. break
  145. } else if l <= batchSize {
  146. mids = (*list)[:l]
  147. } else {
  148. mids = (*list)[:batchSize]
  149. l = batchSize
  150. }
  151. *list = (*list)[l:]
  152. params.Handler(&params.Params, mids)
  153. for i := 0; i < retry; i++ {
  154. if err = f(&mids, params.Params); err == nil {
  155. break
  156. }
  157. }
  158. if err != nil {
  159. log.Error("Batch error(%v), params(%+v)", err, params)
  160. }
  161. }
  162. }