dao.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package timemachine
  2. import (
  3. "time"
  4. "go-common/app/interface/main/activity/conf"
  5. "go-common/library/cache/memcache"
  6. "go-common/library/sync/pipeline/fanout"
  7. "go-common/library/database/hbase.v2"
  8. )
  9. // Dao .
  10. type Dao struct {
  11. c *conf.Config
  12. hbase *hbase.Client
  13. mc *memcache.Pool
  14. cache *fanout.Fanout
  15. //limiter *rate.Limiter
  16. mcTmExpire int32
  17. //tmProcStart int64
  18. //tmProcStop int64
  19. }
  20. // New .
  21. func New(c *conf.Config) (d *Dao) {
  22. d = &Dao{
  23. c: c,
  24. hbase: hbase.NewClient(c.Hbase),
  25. mc: memcache.NewPool(c.TimeMc.Timemachine),
  26. cache: fanout.New("timemachine", fanout.Worker(8), fanout.Buffer(10240)),
  27. }
  28. d.mcTmExpire = int32(time.Duration(c.TimeMc.TmExpire) / time.Second)
  29. //d.limiter = rate.NewLimiter(1000, 100)
  30. //go d.startTmproc(context.Background())
  31. return d
  32. }
  33. // StartTmproc start time machine proc
  34. //func (d *Dao) startTmproc(c context.Context) {
  35. // if env.DeployEnv != env.DeployEnvPre {
  36. // return
  37. // }
  38. // for {
  39. // time.Sleep(time.Second)
  40. // if d.tmProcStart != 0 {
  41. // go func() {
  42. // // scan key
  43. // max := 10000000000
  44. // step := max / 10000
  45. // prefix := step - 1
  46. // for i := 0; i < max; i += step {
  47. // time.Sleep(10 * time.Millisecond)
  48. // startRow := fmt.Sprintf("%0*d", 10, i)
  49. // endRow := fmt.Sprintf("%0*d", 10, i+prefix)
  50. // if err := d.timemachineScan(c, startRow, endRow); err != nil {
  51. // log.Error("startTmproc timemachineScan startRow(%s) endRow(%s) error(%v)", startRow, endRow, err)
  52. // continue
  53. // }
  54. // log.Info("startTmproc finish startRow(%s) endRow(%s)", startRow, endRow)
  55. // }
  56. // }()
  57. // break
  58. // }
  59. // }
  60. //}
  61. // StartTmProc start time machine proc.
  62. //func (d *Dao) StartTmProc(c context.Context) {
  63. // atomic.StoreInt64(&d.tmProcStart, 1)
  64. //}
  65. // StopTmproc stop time machine proc.
  66. //func (d *Dao) StopTmproc(c context.Context) {
  67. // atomic.StoreInt64(&d.tmProcStop, 1)
  68. //}