123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package service
- import (
- "context"
- "io/ioutil"
- "time"
- "go-common/app/job/main/passport-game-data/conf"
- "go-common/app/job/main/passport-game-data/dao"
- "go-common/library/log"
- )
- const (
- _defaultDelayDuration = time.Minute * 0
- _defaultStepDuration = time.Minute * 15
- _defaultLoopDuration = time.Second * 3
- _defaultBatchSize = 1000
- _defaultBatchMissRetryCount = 3
- _timeFormat = "2006-01-02 15:04:05"
- )
- var (
- _loc = time.Now().Location()
- )
- // Service service.
- type Service struct {
- c *conf.Config
- d *dao.Dao
- // init cloud
- ic *initCloudConfig
- // c2l
- c2lC *compareConfig
- // l2c
- l2cC *compareConfig
- }
- type compareConfig struct {
- On bool
- OffsetFilePath string
- UseOldOffset bool
- End bool
- StartTime time.Time
- EndTime time.Time
- DelayDuration time.Duration
- StepDuration time.Duration
- LoopDuration time.Duration
- BatchSize int
- BatchMissRetryCount int
- Debug bool
- Fix bool
- // runtime
- st, ed time.Time
- rangeCount int
- totalCount int
- diffCount int
- sleeping bool
- sleepingSeconds int64
- sleepFromTs int64
- }
- func newCompareConfigFrom(c *conf.CompareConfig) (cc *compareConfig) {
- st, err := time.ParseInLocation(_timeFormat, c.StartTime, _loc)
- if err != nil {
- log.Error("failed to parse end time, time.ParseInLocation(%s, %s, %v), error(%v)", _timeFormat, c.StartTime, _loc, err)
- return
- }
- ed, err := time.ParseInLocation(_timeFormat, c.EndTime, _loc)
- if err != nil {
- log.Error("failed to parse end time, time.ParseInLocation(%s, %s, %v), error(%v)", _timeFormat, c.EndTime, _loc, err)
- return
- }
- cc = &compareConfig{
- On: c.On,
- Debug: c.Debug,
- OffsetFilePath: c.OffsetFilePath,
- UseOldOffset: c.UseOldOffset,
- End: c.End,
- StartTime: st,
- EndTime: ed,
- DelayDuration: time.Duration(c.DelayDuration),
- StepDuration: time.Duration(c.StepDuration),
- LoopDuration: time.Duration(c.LoopDuration),
- BatchSize: c.BatchSize,
- BatchMissRetryCount: c.BatchMissRetryCount,
- Fix: c.Fix,
- }
- if cc.UseOldOffset {
- if oldOffset, err := parseOldOffset(cc.OffsetFilePath); err == nil {
- cc.StartTime = oldOffset
- }
- }
- cc.fix()
- return
- }
- func parseOldOffset(path string) (oldOffset time.Time, err error) {
- data, err := ioutil.ReadFile(path)
- if err != nil {
- log.Error("failed to read old offset, ioutil.ReadFile(%s) error(%v) skip", path, err)
- return
- }
- if oldOffset, err = time.ParseInLocation(_timeFormat, string(data), _loc); err != nil {
- log.Error("failed to parse offset, time.ParseInLocation(%s, %s, %v) error(%v)", _timeFormat, string(data), _loc, err)
- }
- return
- }
- func (cc *compareConfig) fix() {
- if int64(cc.DelayDuration) < 0 {
- cc.DelayDuration = _defaultDelayDuration
- }
- if int64(cc.StepDuration) < 0 {
- cc.StepDuration = _defaultStepDuration
- }
- if int64(cc.LoopDuration) < 0 {
- cc.LoopDuration = _defaultLoopDuration
- }
- if cc.BatchSize <= 0 {
- cc.BatchSize = _defaultBatchSize
- }
- if cc.BatchMissRetryCount < 0 {
- cc.BatchMissRetryCount = _defaultBatchMissRetryCount
- }
- }
- // New new a service instance.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- d: dao.New(c),
- }
- if c.Compare.Cloud2Local.On {
- s.c2lC = newCompareConfigFrom(c.Compare.Cloud2Local)
- go s.cloud2localcompareproc()
- }
- if c.Compare.Local2Cloud.On {
- s.l2cC = newCompareConfigFrom(c.Compare.Local2Cloud)
- go s.local2cloudcompareproc()
- }
- return
- }
- // Ping check server ok.
- func (s *Service) Ping(c context.Context) (err error) {
- err = s.d.Ping(c)
- return
- }
- // Close close service.
- func (s *Service) Close() (err error) {
- s.d.Close()
- return
- }
|