service.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package service
  2. import (
  3. "context"
  4. "io/ioutil"
  5. "time"
  6. "go-common/app/job/main/passport-game-data/conf"
  7. "go-common/app/job/main/passport-game-data/dao"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _defaultDelayDuration = time.Minute * 0
  12. _defaultStepDuration = time.Minute * 15
  13. _defaultLoopDuration = time.Second * 3
  14. _defaultBatchSize = 1000
  15. _defaultBatchMissRetryCount = 3
  16. _timeFormat = "2006-01-02 15:04:05"
  17. )
  18. var (
  19. _loc = time.Now().Location()
  20. )
  21. // Service service.
  22. type Service struct {
  23. c *conf.Config
  24. d *dao.Dao
  25. // init cloud
  26. ic *initCloudConfig
  27. // c2l
  28. c2lC *compareConfig
  29. // l2c
  30. l2cC *compareConfig
  31. }
  32. type compareConfig struct {
  33. On bool
  34. OffsetFilePath string
  35. UseOldOffset bool
  36. End bool
  37. StartTime time.Time
  38. EndTime time.Time
  39. DelayDuration time.Duration
  40. StepDuration time.Duration
  41. LoopDuration time.Duration
  42. BatchSize int
  43. BatchMissRetryCount int
  44. Debug bool
  45. Fix bool
  46. // runtime
  47. st, ed time.Time
  48. rangeCount int
  49. totalCount int
  50. diffCount int
  51. sleeping bool
  52. sleepingSeconds int64
  53. sleepFromTs int64
  54. }
  55. func newCompareConfigFrom(c *conf.CompareConfig) (cc *compareConfig) {
  56. st, err := time.ParseInLocation(_timeFormat, c.StartTime, _loc)
  57. if err != nil {
  58. log.Error("failed to parse end time, time.ParseInLocation(%s, %s, %v), error(%v)", _timeFormat, c.StartTime, _loc, err)
  59. return
  60. }
  61. ed, err := time.ParseInLocation(_timeFormat, c.EndTime, _loc)
  62. if err != nil {
  63. log.Error("failed to parse end time, time.ParseInLocation(%s, %s, %v), error(%v)", _timeFormat, c.EndTime, _loc, err)
  64. return
  65. }
  66. cc = &compareConfig{
  67. On: c.On,
  68. Debug: c.Debug,
  69. OffsetFilePath: c.OffsetFilePath,
  70. UseOldOffset: c.UseOldOffset,
  71. End: c.End,
  72. StartTime: st,
  73. EndTime: ed,
  74. DelayDuration: time.Duration(c.DelayDuration),
  75. StepDuration: time.Duration(c.StepDuration),
  76. LoopDuration: time.Duration(c.LoopDuration),
  77. BatchSize: c.BatchSize,
  78. BatchMissRetryCount: c.BatchMissRetryCount,
  79. Fix: c.Fix,
  80. }
  81. if cc.UseOldOffset {
  82. if oldOffset, err := parseOldOffset(cc.OffsetFilePath); err == nil {
  83. cc.StartTime = oldOffset
  84. }
  85. }
  86. cc.fix()
  87. return
  88. }
  89. func parseOldOffset(path string) (oldOffset time.Time, err error) {
  90. data, err := ioutil.ReadFile(path)
  91. if err != nil {
  92. log.Error("failed to read old offset, ioutil.ReadFile(%s) error(%v) skip", path, err)
  93. return
  94. }
  95. if oldOffset, err = time.ParseInLocation(_timeFormat, string(data), _loc); err != nil {
  96. log.Error("failed to parse offset, time.ParseInLocation(%s, %s, %v) error(%v)", _timeFormat, string(data), _loc, err)
  97. }
  98. return
  99. }
  100. func (cc *compareConfig) fix() {
  101. if int64(cc.DelayDuration) < 0 {
  102. cc.DelayDuration = _defaultDelayDuration
  103. }
  104. if int64(cc.StepDuration) < 0 {
  105. cc.StepDuration = _defaultStepDuration
  106. }
  107. if int64(cc.LoopDuration) < 0 {
  108. cc.LoopDuration = _defaultLoopDuration
  109. }
  110. if cc.BatchSize <= 0 {
  111. cc.BatchSize = _defaultBatchSize
  112. }
  113. if cc.BatchMissRetryCount < 0 {
  114. cc.BatchMissRetryCount = _defaultBatchMissRetryCount
  115. }
  116. }
  117. // New new a service instance.
  118. func New(c *conf.Config) (s *Service) {
  119. s = &Service{
  120. c: c,
  121. d: dao.New(c),
  122. }
  123. if c.Compare.Cloud2Local.On {
  124. s.c2lC = newCompareConfigFrom(c.Compare.Cloud2Local)
  125. go s.cloud2localcompareproc()
  126. }
  127. if c.Compare.Local2Cloud.On {
  128. s.l2cC = newCompareConfigFrom(c.Compare.Local2Cloud)
  129. go s.local2cloudcompareproc()
  130. }
  131. return
  132. }
  133. // Ping check server ok.
  134. func (s *Service) Ping(c context.Context) (err error) {
  135. err = s.d.Ping(c)
  136. return
  137. }
  138. // Close close service.
  139. func (s *Service) Close() (err error) {
  140. s.d.Close()
  141. return
  142. }