local2cloud.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package service
  2. import (
  3. "context"
  4. "io/ioutil"
  5. "os"
  6. "time"
  7. "go-common/app/job/main/passport-game-data/model"
  8. "go-common/library/log"
  9. )
  10. // local2cloudcompareproc compare aso accounts between local and cloud, delay for the given duration.
  11. // select last modified from cloud
  12. // load batchSize from origin
  13. // compare:
  14. // if cloud_mtime >= local_mtime, directly compare
  15. // if cloud_mtime < local_mtime, sleep and reload from cloud, then do compare again
  16. func (s *Service) local2cloudcompareproc() {
  17. var (
  18. err error
  19. localRes []*model.OriginAsoAccount
  20. ack = false
  21. )
  22. cc := s.l2cC
  23. delay := cc.DelayDuration
  24. cc.st = cc.StartTime
  25. cc.ed = cc.st.Add(cc.StepDuration)
  26. offsetFile, err := os.Create(cc.OffsetFilePath)
  27. if err != nil {
  28. log.Error("failed to create offset file, os.Create(%s) error(%v)", cc.OffsetFilePath, err)
  29. return
  30. }
  31. defer offsetFile.Close()
  32. log.Info("created offset file %s", cc.OffsetFilePath)
  33. for {
  34. time.Sleep(cc.LoopDuration)
  35. cc.sleeping = false
  36. if ack {
  37. cc.st = cc.st.Add(cc.StepDuration)
  38. cc.ed = cc.ed.Add(cc.StepDuration)
  39. }
  40. st, ed := cc.st, cc.ed
  41. if err = ioutil.WriteFile(cc.OffsetFilePath, []byte(st.Format(_timeFormat)), os.ModeAppend); err != nil {
  42. log.Error("failed to write offset, ioutil.WriteFile(%s, %s, os.ModeAppend), error(%v)", cc.OffsetFilePath, st.Format(_timeFormat), err)
  43. continue
  44. }
  45. if cc.Debug {
  46. log.Info("st: %s, ed: %s", st.Format(_timeFormat), ed.Format(_timeFormat))
  47. }
  48. if cc.End && st.After(cc.EndTime) {
  49. log.Info("st:%s is after endTime:%s, all data compares ok, local2cloudcompareproc exit", st.Format(_timeFormat), cc.EndTime.Format(_timeFormat))
  50. return
  51. }
  52. now := time.Now()
  53. if now.Sub(st) <= delay {
  54. delta := int64(delay/time.Second) - (now.Unix() - st.Unix())
  55. log.Info("now time is just after st by %d seconds, not greater than delay duration: %v, will sleep %d seconds", int64(delay/time.Second)-delta, delay, delta)
  56. cc.sleeping = true
  57. cc.sleepingSeconds = delta
  58. cc.sleepFromTs = now.Unix()
  59. time.Sleep(time.Duration(int64(time.Second) * delta))
  60. continue
  61. }
  62. if localRes, err = s.d.AsoAccountRangeLocal(context.TODO(), st, ed); err != nil {
  63. continue
  64. }
  65. cc.rangeCount = len(localRes)
  66. cc.totalCount += len(localRes)
  67. if err = s.local2CloudCompare(context.TODO(), localRes); err != nil {
  68. continue
  69. }
  70. ack = true
  71. }
  72. }
  73. func (s *Service) local2CloudCompare(c context.Context, lRes []*model.OriginAsoAccount) (err error) {
  74. mids := make([]int64, 0)
  75. for _, item := range lRes {
  76. mids = append(mids, item.Mid)
  77. }
  78. cc := s.l2cC
  79. // query from cloud
  80. cRes := s.batchQueryCloudNonMiss(context.TODO(), mids, cc.BatchSize, cc.BatchMissRetryCount)
  81. cM := make(map[int64]*model.AsoAccount)
  82. for _, item := range cRes {
  83. cM[item.Mid] = item
  84. }
  85. // compare cloud with local
  86. pendingMids := make([]int64, 0)
  87. for _, item := range lRes {
  88. local := item
  89. cloud := cM[item.Mid]
  90. status := doCompare(cloud, local, true)
  91. switch status {
  92. case _statusOK:
  93. // do nothing
  94. case _statusNo:
  95. cc.diffCount++
  96. s.doLog(cloud, local, false)
  97. if cc.Fix {
  98. s.fixCloudRecord(context.TODO(), model.Default(local), cloud)
  99. }
  100. case _statusPending:
  101. pendingMids = append(pendingMids, item.Mid)
  102. }
  103. }
  104. if len(pendingMids) == 0 {
  105. return
  106. }
  107. // reload pending mids from cloud
  108. var pendingRes []*model.AsoAccount
  109. if pendingRes, err = s.d.AsoAccountsCloud(context.TODO(), pendingMids); err != nil {
  110. return
  111. }
  112. lM := make(map[int64]*model.OriginAsoAccount)
  113. for _, item := range lRes {
  114. lM[item.Mid] = item
  115. }
  116. // compare
  117. for _, item := range pendingRes {
  118. cloud := item
  119. local := lM[item.Mid]
  120. status := doCompare(item, local, false)
  121. switch status {
  122. case _statusOK:
  123. case _statusNo:
  124. cc.diffCount++
  125. s.doLog(cloud, local, true)
  126. if cc.Fix {
  127. s.fixCloudRecord(context.TODO(), model.Default(local), cloud)
  128. }
  129. }
  130. }
  131. return
  132. }