dao.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package dao
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net/url"
  7. "time"
  8. "go-common/app/job/main/history/conf"
  9. "go-common/app/service/main/history/model"
  10. "go-common/library/cache/redis"
  11. "go-common/library/database/tidb"
  12. "go-common/library/log"
  13. bm "go-common/library/net/http/blademaster"
  14. "go-common/library/database/hbase.v2"
  15. )
  16. var errFlushRequest = errors.New("error flush history to store")
  17. // Dao dao.
  18. type Dao struct {
  19. conf *conf.Config
  20. HTTPClient *bm.Client
  21. URL string
  22. info *hbase.Client
  23. redis *redis.Pool
  24. db *tidb.DB
  25. longDB *tidb.DB
  26. insertStmt *tidb.Stmts
  27. businessesStmt *tidb.Stmts
  28. allHisStmt *tidb.Stmts
  29. delUserStmt *tidb.Stmts
  30. BusinessesMap map[int64]*model.Business
  31. BusinessNames map[string]*model.Business
  32. }
  33. // New new history dao and return.
  34. func New(c *conf.Config) (dao *Dao) {
  35. dao = &Dao{
  36. conf: c,
  37. redis: redis.NewPool(c.Redis),
  38. HTTPClient: bm.NewClient(c.Job.Client),
  39. URL: c.Job.URL,
  40. info: hbase.NewClient(c.Info.Config),
  41. db: tidb.NewTiDB(c.TiDB),
  42. longDB: tidb.NewTiDB(c.LongTiDB),
  43. }
  44. dao.businessesStmt = dao.db.Prepared(_businessesSQL)
  45. dao.insertStmt = dao.db.Prepared(_addHistorySQL)
  46. dao.allHisStmt = dao.db.Prepared(_allHisSQL)
  47. dao.delUserStmt = dao.db.Prepared(_delUserHisSQL)
  48. dao.loadBusiness()
  49. go dao.loadBusinessproc()
  50. return
  51. }
  52. // Flush flush history to store by mids.
  53. func (d *Dao) Flush(c context.Context, mids string, stime int64) (err error) {
  54. params := url.Values{}
  55. params.Set("mids", mids)
  56. params.Set("time", fmt.Sprintf("%d", stime))
  57. var res = &struct {
  58. Code int `json:"code"`
  59. Msg string `json:"message"`
  60. }{}
  61. if err = d.HTTPClient.Post(c, d.URL, "", params, res); err != nil {
  62. log.Error("d.HTTPClient.Post(%s?%s) error(%v)", d.URL, params.Encode(), err)
  63. return
  64. }
  65. if res.Code != 0 {
  66. log.Error("d.HTTPClient.Post(%s?%s) code:%d msg:%s", d.URL, params.Encode(), res.Code, res.Msg)
  67. err = errFlushRequest
  68. return
  69. }
  70. return
  71. }
  72. // Ping check connection success.
  73. func (d *Dao) Ping(c context.Context) (err error) {
  74. return
  75. }
  76. // Close close the redis and kafka resource.
  77. func (d *Dao) Close() {
  78. d.redis.Close()
  79. d.db.Close()
  80. d.longDB.Close()
  81. }
  82. func (d *Dao) loadBusiness() {
  83. var business []*model.Business
  84. var err error
  85. businessMap := make(map[string]*model.Business)
  86. businessIDMap := make(map[int64]*model.Business)
  87. for {
  88. if business, err = d.Businesses(context.TODO()); err != nil {
  89. time.Sleep(time.Second)
  90. continue
  91. }
  92. for _, b := range business {
  93. businessMap[b.Name] = b
  94. businessIDMap[b.ID] = b
  95. }
  96. d.BusinessNames = businessMap
  97. d.BusinessesMap = businessIDMap
  98. return
  99. }
  100. }
  101. func (d *Dao) loadBusinessproc() {
  102. for {
  103. time.Sleep(time.Minute * 5)
  104. d.loadBusiness()
  105. }
  106. }