check_data.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/vip/model"
  6. "go-common/library/log"
  7. "github.com/pkg/errors"
  8. )
  9. // CheckUserData check vip_user_info data.
  10. func (s *Service) CheckUserData(c context.Context) (diffs map[int64]string, err error) {
  11. var (
  12. maxID int
  13. size = s.c.Property.BatchSize
  14. ids = []int64{}
  15. ousers = make(map[int64]*model.VipUserInfoOld, size)
  16. nusers = make(map[int64]*model.VipUserInfo, _ps)
  17. ou *model.VipUserInfoOld
  18. nu *model.VipUserInfo
  19. ok bool
  20. )
  21. diffs = make(map[int64]string)
  22. if maxID, err = s.dao.SelOldUserInfoMaxID(context.TODO()); err != nil {
  23. err = errors.WithStack(err)
  24. return
  25. }
  26. page := maxID / size
  27. if maxID%size != 0 {
  28. page++
  29. }
  30. log.Info("check vip_user_info total(%d)", page)
  31. for i := 0; i < page; i++ {
  32. log.Info("check vip_user_info page index(%d) total(%d)", i, page)
  33. startID := i * size
  34. endID := (i + 1) * size
  35. if endID > maxID {
  36. endID = maxID
  37. }
  38. if ousers, err = s.dao.SelOldUserInfoMaps(context.TODO(), startID, endID); err != nil {
  39. return
  40. }
  41. j := 1
  42. for _, v := range ousers {
  43. ids = append(ids, v.Mid)
  44. if j%_ps == 0 || j == len(ousers) {
  45. if nusers, err = s.dao.SelVipByIds(context.TODO(), ids); err != nil {
  46. return
  47. }
  48. for _, mid := range ids {
  49. if ou, ok = ousers[mid]; !ok {
  50. diffs[mid] = "old not found"
  51. continue
  52. }
  53. if nu, ok = nusers[mid]; !ok {
  54. diffs[mid] = "new not found"
  55. continue
  56. }
  57. if nu.Type != ou.Type {
  58. diffs[mid] = "vip_type"
  59. continue
  60. }
  61. if nu.Status != ou.Status {
  62. diffs[mid] = "vip_status"
  63. continue
  64. }
  65. if !nu.OverdueTime.Time().Equal(ou.OverdueTime.Time()) {
  66. diffs[mid] = "vip_overdue_time"
  67. continue
  68. }
  69. if !nu.AnnualVipOverdueTime.Time().Equal(ou.AnnualVipOverdueTime.Time()) {
  70. diffs[mid] = "annual_vip_overdue_time"
  71. continue
  72. }
  73. if nu.PayType != ou.IsAutoRenew {
  74. diffs[mid] = "vip_pay_type"
  75. continue
  76. }
  77. if nu.PayChannelID != ou.PayChannelID {
  78. diffs[mid] = "pay_channel_id"
  79. continue
  80. }
  81. if !nu.IosOverdueTime.Time().Equal(ou.IosOverdueTime.Time()) {
  82. diffs[mid] = "ios_overdue_time"
  83. continue
  84. }
  85. }
  86. // reset
  87. ids = []int64{}
  88. }
  89. j++
  90. }
  91. log.Info("check index (%d) vip_user_info diff len (%d)", i, len(diffs))
  92. log.Info("check index (%d) vip_user_info diff data mids(%v)", i, diffs)
  93. time.Sleep(time.Millisecond * _defsleepmsec)
  94. }
  95. return
  96. }
  97. //CheckBcoinData check bcoin data
  98. func (s *Service) CheckBcoinData(c context.Context) (mids []int64, err error) {
  99. var (
  100. maxID int
  101. size = s.c.Property.BatchSize
  102. )
  103. if maxID, err = s.dao.SelMaxID(context.TODO()); err != nil {
  104. err = errors.WithStack(err)
  105. return
  106. }
  107. page := maxID / size
  108. if maxID%size != 0 {
  109. page++
  110. }
  111. for i := 0; i < page; i++ {
  112. startID := size * i
  113. endID := (i + 1) * size
  114. var res []*model.VipUserInfo
  115. if res, err = s.dao.SelUserInfos(context.TODO(), startID, endID); err != nil {
  116. err = errors.WithStack(err)
  117. return
  118. }
  119. var (
  120. tempMids []int64
  121. bcoinMap map[int64][]*model.VipBcoinSalary
  122. oldBcoinMap map[int64][]*model.VipBcoinSalary
  123. )
  124. for _, v := range res {
  125. tempMids = append(tempMids, v.Mid)
  126. }
  127. if bcoinMap, err = s.dao.SelBcoinSalaryDataMaps(context.TODO(), tempMids); err != nil {
  128. err = errors.WithStack(err)
  129. return
  130. }
  131. if oldBcoinMap, err = s.dao.SelOldBcoinSalaryDataMaps(context.TODO(), tempMids); err != nil {
  132. err = errors.WithStack(err)
  133. return
  134. }
  135. if len(bcoinMap) > len(oldBcoinMap) {
  136. for key, val := range bcoinMap {
  137. salaries := oldBcoinMap[key]
  138. if len(salaries) != len(val) {
  139. mids = append(mids, key)
  140. }
  141. }
  142. } else {
  143. for key, val := range oldBcoinMap {
  144. salaries := bcoinMap[key]
  145. if len(salaries) != len(val) {
  146. mids = append(mids, key)
  147. }
  148. }
  149. }
  150. }
  151. log.Info("cur not sync data mid is(%+v)", mids)
  152. return
  153. }
  154. //CheckChangeHistory check change history data
  155. func (s *Service) CheckChangeHistory(c context.Context) (mids []int64, err error) {
  156. var (
  157. maxID int
  158. size = 2000
  159. )
  160. if maxID, err = s.dao.SelMaxID(context.TODO()); err != nil {
  161. err = errors.WithStack(err)
  162. return
  163. }
  164. page := maxID / size
  165. if maxID%size != 0 {
  166. page++
  167. }
  168. for i := 0; i < page; i++ {
  169. startID := size * i
  170. endID := (i + 1) * size
  171. var res []*model.VipUserInfo
  172. if res, err = s.dao.SelUserInfos(context.TODO(), startID, endID); err != nil {
  173. err = errors.WithStack(err)
  174. return
  175. }
  176. var (
  177. tempMids []int64
  178. historyMap map[int64][]*model.VipChangeHistory
  179. oldHistoryMap map[int64][]*model.VipChangeHistory
  180. )
  181. for _, v := range res {
  182. tempMids = append(tempMids, v.Mid)
  183. }
  184. if historyMap, err = s.dao.SelChangeHistoryMaps(context.TODO(), tempMids); err != nil {
  185. err = errors.WithStack(err)
  186. return
  187. }
  188. if oldHistoryMap, err = s.dao.SelOldChangeHistoryMaps(context.TODO(), tempMids); err != nil {
  189. err = errors.WithStack(err)
  190. return
  191. }
  192. if len(historyMap) > len(oldHistoryMap) {
  193. for key, val := range historyMap {
  194. histories := oldHistoryMap[key]
  195. if len(histories) != len(val) {
  196. mids = append(mids, key)
  197. }
  198. }
  199. } else {
  200. for key, val := range oldHistoryMap {
  201. histories := historyMap[key]
  202. if len(histories) != len(val) {
  203. mids = append(mids, key)
  204. }
  205. }
  206. }
  207. }
  208. log.Info("cur not sync data mid is(%+v)", mids)
  209. return
  210. }