123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- package service
- import (
- "context"
- "time"
- "go-common/app/job/main/vip/model"
- "go-common/library/log"
- )
- // SyncAllUser 同步旧user——info到新db.
- // FIXME 切新db后删除.
- func (s *Service) SyncAllUser(c context.Context) {
- var (
- err error
- maxID int
- size = s.c.Property.BatchSize
- ids = []int64{}
- ousers = make(map[int64]*model.VipUserInfoOld, size)
- nusers = make(map[int64]*model.VipUserInfo, _ps)
- updateDB = s.c.Property.UpdateDB
- nu *model.VipUserInfo
- ok bool
- )
- if maxID, err = s.dao.SelOldUserInfoMaxID(context.TODO()); err != nil {
- log.Error("sync job s.dao.SelOldUserInfoMaxID err(%+v)", err)
- return
- }
- page := maxID / size
- if maxID%size != 0 {
- page++
- }
- log.Info("sync job vip_user_info total(%d)", page)
- for i := 0; i < page; i++ {
- log.Info("sync job vip_user_info page index(%d) total(%d)", i, page)
- startID := i * size
- endID := (i + 1) * size
- if endID > maxID {
- endID = maxID
- }
- if ousers, err = s.dao.SelOldUserInfoMaps(context.TODO(), startID, endID); err != nil {
- log.Error("sync job s.dao.SelOldUserInfoMaps(%d, %d) err(%+v)", startID, endID, err)
- return
- }
- j := 1
- for _, v := range ousers {
- ids = append(ids, v.Mid)
- if j%_ps == 0 || j == len(ousers) {
- if nusers, err = s.dao.SelVipByIds(context.TODO(), ids); err != nil {
- return
- }
- for _, mid := range ids {
- var ou *model.VipUserInfoOld
- if ou, ok = ousers[mid]; !ok {
- log.Warn("sync job old not found %d", mid)
- continue
- }
- if nu, ok = nusers[mid]; !ok {
- log.Warn("sync job need insert to new %d, old(%+v), toNew(%+v)", mid, ou, ou.ToNew())
- if updateDB {
- s.dao.SyncAddUser(context.Background(), ou.ToNew())
- }
- continue
- }
- if ou.RecentTime <= 0 {
- ou.RecentTime = ou.Mtime
- }
- if nu.Type != ou.Type ||
- nu.Status != ou.Status ||
- !nu.StartTime.Time().Equal(ou.StartTime.Time()) ||
- !nu.OverdueTime.Time().Equal(ou.OverdueTime.Time()) ||
- !nu.AnnualVipOverdueTime.Time().Equal(ou.AnnualVipOverdueTime.Time()) ||
- !nu.Ctime.Time().Equal(ou.Ctime.Time()) ||
- !nu.Mtime.Time().Equal(ou.Mtime.Time()) ||
- nu.PayType != ou.IsAutoRenew ||
- nu.PayChannelID != ou.PayChannelID ||
- !nu.IosOverdueTime.Time().Equal(ou.IosOverdueTime.Time()) ||
- nu.Ver != ou.Ver ||
- !nu.RecentTime.Time().Equal(ou.RecentTime.Time()) {
- log.Warn("sync job need update to new %d, old(%+v), new(%+v), toNew(%+v)", mid, ou, nu, ou.ToNew())
- if updateDB {
- s.dao.SyncUpdateUser(context.Background(), ou.ToNew(), nu.Ver)
- }
- continue
- }
- }
- log.Info("sync job vip_user_info page index(%d) ids(%+v)", j, ids)
- // reset
- ids = []int64{}
- }
- j++
- }
- log.Info("sync job vip_user_info page index(%d) end", i)
- time.Sleep(time.Millisecond * _defsleepmsec)
- }
- }
|