sync.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/vip/model"
  6. "go-common/library/log"
  7. )
  8. // SyncAllUser 同步旧user——info到新db.
  9. // FIXME 切新db后删除.
  10. func (s *Service) SyncAllUser(c context.Context) {
  11. var (
  12. err error
  13. maxID int
  14. size = s.c.Property.BatchSize
  15. ids = []int64{}
  16. ousers = make(map[int64]*model.VipUserInfoOld, size)
  17. nusers = make(map[int64]*model.VipUserInfo, _ps)
  18. updateDB = s.c.Property.UpdateDB
  19. nu *model.VipUserInfo
  20. ok bool
  21. )
  22. if maxID, err = s.dao.SelOldUserInfoMaxID(context.TODO()); err != nil {
  23. log.Error("sync job s.dao.SelOldUserInfoMaxID err(%+v)", err)
  24. return
  25. }
  26. page := maxID / size
  27. if maxID%size != 0 {
  28. page++
  29. }
  30. log.Info("sync job vip_user_info total(%d)", page)
  31. for i := 0; i < page; i++ {
  32. log.Info("sync job vip_user_info page index(%d) total(%d)", i, page)
  33. startID := i * size
  34. endID := (i + 1) * size
  35. if endID > maxID {
  36. endID = maxID
  37. }
  38. if ousers, err = s.dao.SelOldUserInfoMaps(context.TODO(), startID, endID); err != nil {
  39. log.Error("sync job s.dao.SelOldUserInfoMaps(%d, %d) err(%+v)", startID, endID, err)
  40. return
  41. }
  42. j := 1
  43. for _, v := range ousers {
  44. ids = append(ids, v.Mid)
  45. if j%_ps == 0 || j == len(ousers) {
  46. if nusers, err = s.dao.SelVipByIds(context.TODO(), ids); err != nil {
  47. return
  48. }
  49. for _, mid := range ids {
  50. var ou *model.VipUserInfoOld
  51. if ou, ok = ousers[mid]; !ok {
  52. log.Warn("sync job old not found %d", mid)
  53. continue
  54. }
  55. if nu, ok = nusers[mid]; !ok {
  56. log.Warn("sync job need insert to new %d, old(%+v), toNew(%+v)", mid, ou, ou.ToNew())
  57. if updateDB {
  58. s.dao.SyncAddUser(context.Background(), ou.ToNew())
  59. }
  60. continue
  61. }
  62. if ou.RecentTime <= 0 {
  63. ou.RecentTime = ou.Mtime
  64. }
  65. if nu.Type != ou.Type ||
  66. nu.Status != ou.Status ||
  67. !nu.StartTime.Time().Equal(ou.StartTime.Time()) ||
  68. !nu.OverdueTime.Time().Equal(ou.OverdueTime.Time()) ||
  69. !nu.AnnualVipOverdueTime.Time().Equal(ou.AnnualVipOverdueTime.Time()) ||
  70. !nu.Ctime.Time().Equal(ou.Ctime.Time()) ||
  71. !nu.Mtime.Time().Equal(ou.Mtime.Time()) ||
  72. nu.PayType != ou.IsAutoRenew ||
  73. nu.PayChannelID != ou.PayChannelID ||
  74. !nu.IosOverdueTime.Time().Equal(ou.IosOverdueTime.Time()) ||
  75. nu.Ver != ou.Ver ||
  76. !nu.RecentTime.Time().Equal(ou.RecentTime.Time()) {
  77. log.Warn("sync job need update to new %d, old(%+v), new(%+v), toNew(%+v)", mid, ou, nu, ou.ToNew())
  78. if updateDB {
  79. s.dao.SyncUpdateUser(context.Background(), ou.ToNew(), nu.Ver)
  80. }
  81. continue
  82. }
  83. }
  84. log.Info("sync job vip_user_info page index(%d) ids(%+v)", j, ids)
  85. // reset
  86. ids = []int64{}
  87. }
  88. j++
  89. }
  90. log.Info("sync job vip_user_info page index(%d) end", i)
  91. time.Sleep(time.Millisecond * _defsleepmsec)
  92. }
  93. }