fixer.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync/atomic"
  6. "time"
  7. "go-common/app/job/main/member/model"
  8. "go-common/library/log"
  9. "github.com/pkg/errors"
  10. )
  11. var (
  12. csclice []chan int64
  13. maxmid int64 = 310000000
  14. scanned int64
  15. errCount int64
  16. )
  17. func (s *Service) makeChan(num int) {
  18. csclice = make([]chan int64, num)
  19. for i := 0; i < num; i++ {
  20. csclice[i] = make(chan int64, 10000)
  21. }
  22. }
  23. // dataCheckMids check mid
  24. func (s *Service) dataCheckMids() {
  25. var (
  26. i int64
  27. )
  28. if s.c.SyncRange.End > maxmid {
  29. s.c.SyncRange.End = maxmid
  30. }
  31. if s.c.SyncRange.Start < 0 {
  32. s.c.SyncRange.Start = 0
  33. }
  34. for i = s.c.SyncRange.Start; i < s.c.SyncRange.End; i++ {
  35. csclice[i%30] <- i
  36. }
  37. }
  38. // dataFixer
  39. func (s *Service) dataFixer(cs chan int64) {
  40. for {
  41. mids := make([]int64, 0, 10)
  42. for mid := range cs {
  43. mids = append(mids, mid)
  44. if len(mids) >= 5 {
  45. break
  46. }
  47. atomic.AddInt64(&scanned, 1)
  48. }
  49. s.fix(mids)
  50. }
  51. }
  52. func (s *Service) fix(mids []int64) {
  53. var (
  54. err error
  55. accs = make(map[int64]*model.AccountInfo)
  56. errs = make(map[int64]map[string]bool)
  57. c = context.TODO()
  58. base *model.BaseInfo
  59. )
  60. func() {
  61. defer func() {
  62. if r := recover(); r != nil {
  63. r = errors.WithStack(r.(error))
  64. log.Error("fixer: wocao jingran recover le error(%+v)", r)
  65. time.Sleep(10 * time.Second)
  66. }
  67. time.Sleep(10 * time.Millisecond)
  68. }()
  69. if accs, errs, err = s.dao.Accounts(c, mids); err != nil {
  70. log.Error("fixer: dao.AccountInfo mid(%v) res(%v) error(%v)", mids, accs, err)
  71. return
  72. }
  73. for mid, res := range accs {
  74. log.Error("fixer: mid(%d) res(%+v)", mid, res)
  75. if base, err = s.dao.BaseInfo(c, mid); err != nil {
  76. log.Error("fixer: s.dao.BaseInfo mid(%d) err(%v)", mid, err)
  77. continue
  78. }
  79. if base == nil {
  80. log.Error("fixer: dataCheckErr mid(%d) res(%v),base(%v),detail(%v)", mid, res, base)
  81. continue
  82. }
  83. // all fields are same
  84. if sameAccInfo(base, res) {
  85. log.Info("fixer: sameAccInfo mid(%d) result true continue", mid)
  86. continue
  87. }
  88. // increase errCount and logging
  89. bs, _ := json.Marshal(base)
  90. jres, _ := json.Marshal(res)
  91. atomic.AddInt64(&errCount, 1)
  92. log.Error("fixer: dataCheckFail mid(%d) base(%s),res(%s),errCount(%d)", mid, bs, jres, atomic.LoadInt64(&errCount))
  93. if _, ok := errs[mid]; !ok {
  94. log.Error("fixer,errs[%v] is not ok", mid)
  95. continue
  96. }
  97. if asoOK := errs[mid]["asoOK"]; asoOK && !sameName(base, res) && len(res.Name) > 0 {
  98. s.dao.SetName(c, mid, res.Name)
  99. }
  100. }
  101. log.Info("fixer: dataCheckRight mids(%v) scanned(%d) errCount(%d)", mids, scanned, atomic.LoadInt64(&errCount))
  102. }()
  103. }