123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package service
- import (
- "context"
- "io/ioutil"
- "os"
- "time"
- "go-common/app/job/main/passport-game-data/model"
- "go-common/library/log"
- )
- // cloud2localcompareproc compare aso accounts between cloud and local.
- // select last modified from cloud
- // load batchSize from origin
- // compare:
- // if cloud_mtime >= local_mtime, directly compare
- // if cloud_mtime < local_mtime, sleep and reload from cloud, then do compare again
- func (s *Service) cloud2localcompareproc() {
- var (
- err error
- cloudRes []*model.AsoAccount
- ack = false
- )
- cc := s.c2lC
- delay := cc.DelayDuration
- cc.st = cc.StartTime
- cc.ed = cc.st.Add(cc.StepDuration)
- offsetFile, err := os.Create(cc.OffsetFilePath)
- if err != nil {
- log.Error("failed to create offset file, os.Create(%s) error(%v)", cc.OffsetFilePath, err)
- return
- }
- defer offsetFile.Close()
- log.Info("created offset file %s", cc.OffsetFilePath)
- for {
- time.Sleep(cc.LoopDuration)
- cc.sleeping = false
- if ack {
- cc.st = cc.st.Add(cc.StepDuration)
- cc.ed = cc.ed.Add(cc.StepDuration)
- }
- st, ed := cc.st, cc.ed
- if err = ioutil.WriteFile(cc.OffsetFilePath, []byte(st.Format(_timeFormat)), os.ModeAppend); err != nil {
- log.Error("failed to write offset, ioutil.WriteFile(%s, %s, os.ModeAppend), error(%v)", cc.OffsetFilePath, st.Format(_timeFormat), err)
- continue
- }
- if cc.Debug {
- log.Info("st: %s, ed: %s", st.Format(_timeFormat), ed.Format(_timeFormat))
- }
- if cc.End && st.After(cc.EndTime) {
- log.Info("st:%s is after endTime:%s, all data compares ok, cloud2localcompareproc exit", st.Format(_timeFormat), cc.EndTime.Format(_timeFormat))
- return
- }
- now := time.Now()
- if now.Sub(st) <= delay {
- delta := int64(delay/time.Second) - (now.Unix() - st.Unix())
- log.Info("now time is just after st by %d seconds, not greater than delay duration: %v, will sleep %d seconds", int64(delay/time.Second)-delta, delay, delta)
- cc.sleeping = true
- cc.sleepingSeconds = delta
- cc.sleepFromTs = now.Unix()
- time.Sleep(time.Duration(int64(time.Second) * delta))
- continue
- }
- if cloudRes, err = s.d.AsoAccountRangeCloud(context.TODO(), st, ed); err != nil {
- continue
- }
- cc.rangeCount = len(cloudRes)
- cc.totalCount += len(cloudRes)
- if err = s.cloud2LocalCompare(context.TODO(), cloudRes); err != nil {
- continue
- }
- ack = true
- }
- }
- func (s *Service) cloud2LocalCompare(c context.Context, cloudRes []*model.AsoAccount) (err error) {
- mids := make([]int64, 0)
- for _, item := range cloudRes {
- mids = append(mids, item.Mid)
- }
- cc := s.c2lC
- localRes := s.batchQueryLocalNonMiss(context.TODO(), mids, cc.BatchSize, cc.BatchMissRetryCount)
- m := make(map[int64]*model.OriginAsoAccount)
- for _, item := range localRes {
- m[item.Mid] = item
- }
- // compare
- pendingMids := make([]int64, 0)
- for _, item := range cloudRes {
- cloud := item
- local := m[item.Mid]
- status := doCompare(cloud, local, true)
- switch status {
- case _statusOK:
- // do nothing
- case _statusNo:
- cc.diffCount++
- s.doLog(cloud, local, false)
- if cc.Fix {
- s.fixCloudRecord(context.TODO(), model.Default(local), cloud)
- }
- case _statusPending:
- pendingMids = append(pendingMids, item.Mid)
- }
- }
- if len(pendingMids) == 0 {
- return
- }
- // reload pending mids from cloud
- var pendingRes []*model.AsoAccount
- if pendingRes, err = s.d.AsoAccountsCloud(context.TODO(), pendingMids); err != nil {
- return
- }
- // compare
- for _, item := range pendingRes {
- cloud := item
- local := m[item.Mid]
- status := doCompare(item, m[item.Mid], false)
- switch status {
- case _statusOK:
- case _statusNo:
- cc.diffCount++
- s.doLog(cloud, local, true)
- if cc.Fix {
- s.fixCloudRecord(context.TODO(), model.Default(local), cloud)
- }
- }
- }
- return
- }
|