cloud2local.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package service
  2. import (
  3. "context"
  4. "io/ioutil"
  5. "os"
  6. "time"
  7. "go-common/app/job/main/passport-game-data/model"
  8. "go-common/library/log"
  9. )
  10. // cloud2localcompareproc compare aso accounts between cloud and local.
  11. // select last modified from cloud
  12. // load batchSize from origin
  13. // compare:
  14. // if cloud_mtime >= local_mtime, directly compare
  15. // if cloud_mtime < local_mtime, sleep and reload from cloud, then do compare again
  16. func (s *Service) cloud2localcompareproc() {
  17. var (
  18. err error
  19. cloudRes []*model.AsoAccount
  20. ack = false
  21. )
  22. cc := s.c2lC
  23. delay := cc.DelayDuration
  24. cc.st = cc.StartTime
  25. cc.ed = cc.st.Add(cc.StepDuration)
  26. offsetFile, err := os.Create(cc.OffsetFilePath)
  27. if err != nil {
  28. log.Error("failed to create offset file, os.Create(%s) error(%v)", cc.OffsetFilePath, err)
  29. return
  30. }
  31. defer offsetFile.Close()
  32. log.Info("created offset file %s", cc.OffsetFilePath)
  33. for {
  34. time.Sleep(cc.LoopDuration)
  35. cc.sleeping = false
  36. if ack {
  37. cc.st = cc.st.Add(cc.StepDuration)
  38. cc.ed = cc.ed.Add(cc.StepDuration)
  39. }
  40. st, ed := cc.st, cc.ed
  41. if err = ioutil.WriteFile(cc.OffsetFilePath, []byte(st.Format(_timeFormat)), os.ModeAppend); err != nil {
  42. log.Error("failed to write offset, ioutil.WriteFile(%s, %s, os.ModeAppend), error(%v)", cc.OffsetFilePath, st.Format(_timeFormat), err)
  43. continue
  44. }
  45. if cc.Debug {
  46. log.Info("st: %s, ed: %s", st.Format(_timeFormat), ed.Format(_timeFormat))
  47. }
  48. if cc.End && st.After(cc.EndTime) {
  49. log.Info("st:%s is after endTime:%s, all data compares ok, cloud2localcompareproc exit", st.Format(_timeFormat), cc.EndTime.Format(_timeFormat))
  50. return
  51. }
  52. now := time.Now()
  53. if now.Sub(st) <= delay {
  54. delta := int64(delay/time.Second) - (now.Unix() - st.Unix())
  55. log.Info("now time is just after st by %d seconds, not greater than delay duration: %v, will sleep %d seconds", int64(delay/time.Second)-delta, delay, delta)
  56. cc.sleeping = true
  57. cc.sleepingSeconds = delta
  58. cc.sleepFromTs = now.Unix()
  59. time.Sleep(time.Duration(int64(time.Second) * delta))
  60. continue
  61. }
  62. if cloudRes, err = s.d.AsoAccountRangeCloud(context.TODO(), st, ed); err != nil {
  63. continue
  64. }
  65. cc.rangeCount = len(cloudRes)
  66. cc.totalCount += len(cloudRes)
  67. if err = s.cloud2LocalCompare(context.TODO(), cloudRes); err != nil {
  68. continue
  69. }
  70. ack = true
  71. }
  72. }
  73. func (s *Service) cloud2LocalCompare(c context.Context, cloudRes []*model.AsoAccount) (err error) {
  74. mids := make([]int64, 0)
  75. for _, item := range cloudRes {
  76. mids = append(mids, item.Mid)
  77. }
  78. cc := s.c2lC
  79. localRes := s.batchQueryLocalNonMiss(context.TODO(), mids, cc.BatchSize, cc.BatchMissRetryCount)
  80. m := make(map[int64]*model.OriginAsoAccount)
  81. for _, item := range localRes {
  82. m[item.Mid] = item
  83. }
  84. // compare
  85. pendingMids := make([]int64, 0)
  86. for _, item := range cloudRes {
  87. cloud := item
  88. local := m[item.Mid]
  89. status := doCompare(cloud, local, true)
  90. switch status {
  91. case _statusOK:
  92. // do nothing
  93. case _statusNo:
  94. cc.diffCount++
  95. s.doLog(cloud, local, false)
  96. if cc.Fix {
  97. s.fixCloudRecord(context.TODO(), model.Default(local), cloud)
  98. }
  99. case _statusPending:
  100. pendingMids = append(pendingMids, item.Mid)
  101. }
  102. }
  103. if len(pendingMids) == 0 {
  104. return
  105. }
  106. // reload pending mids from cloud
  107. var pendingRes []*model.AsoAccount
  108. if pendingRes, err = s.d.AsoAccountsCloud(context.TODO(), pendingMids); err != nil {
  109. return
  110. }
  111. // compare
  112. for _, item := range pendingRes {
  113. cloud := item
  114. local := m[item.Mid]
  115. status := doCompare(item, m[item.Mid], false)
  116. switch status {
  117. case _statusOK:
  118. case _statusNo:
  119. cc.diffCount++
  120. s.doLog(cloud, local, true)
  121. if cc.Fix {
  122. s.fixCloudRecord(context.TODO(), model.Default(local), cloud)
  123. }
  124. }
  125. }
  126. return
  127. }