aso_full_migration.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/passport-user/model"
  5. "go-common/library/log"
  6. "time"
  7. )
  8. func (s *Service) getAsoAccount(start, end, count int64) {
  9. chanNum := int64(s.c.FullSync.AsoAccount.ChanNum)
  10. for {
  11. log.Info("getAsoAccount, start %d, end %d, count %d", start, end, count)
  12. var (
  13. res []*model.OriginAccount
  14. err error
  15. )
  16. for {
  17. if res, err = s.d.AsoAccount(context.Background(), start, count); err != nil {
  18. log.Error("fail to get AsoAccount error(%+v)", err)
  19. time.Sleep(100 * time.Millisecond)
  20. continue
  21. }
  22. break
  23. }
  24. for _, a := range res {
  25. s.asoAccountChan[a.Mid%chanNum] <- a
  26. }
  27. if start > end || len(res) == 0 {
  28. log.Info("sync asoAccount finished! endID(%d)", start)
  29. break
  30. }
  31. start = res[len(res)-1].Mid
  32. }
  33. }
  34. func (s *Service) asoAccountConsume(c chan *model.OriginAccount) {
  35. for {
  36. a, ok := <-c
  37. if !ok {
  38. log.Error("asoAccountChan closed")
  39. return
  40. }
  41. for i := 0; i < _retry; i++ {
  42. if err := s.syncAsoAccount(a); err != nil {
  43. log.Error("fail to sync asoAccount(%+v) error(%+v)", a, err)
  44. time.Sleep(100 * time.Millisecond)
  45. continue
  46. }
  47. break
  48. }
  49. }
  50. }
  51. func (s *Service) getAsoAccountInfo(start, end, count int64) {
  52. chanNum := int64(s.c.FullSync.AsoAccountInfo.ChanNum)
  53. initStart, initEnd, initCount := start, end, count
  54. for i := 0; i < _asoAccountInfoSharding; i++ {
  55. start, end, count = initStart, initEnd, initCount
  56. for {
  57. log.Info("getAsoAccountInfo, start %d, end %d, count %d, suffix %d", start, end, count, i)
  58. var (
  59. res []*model.OriginAccountInfo
  60. err error
  61. )
  62. for {
  63. if res, err = s.d.AsoAccountInfo(context.Background(), start, count, int64(i)); err != nil {
  64. log.Error("fail to get AsoAccountInfo error(%+v)", err)
  65. time.Sleep(100 * time.Millisecond)
  66. continue
  67. }
  68. break
  69. }
  70. for _, a := range res {
  71. s.asoAccountInfoChan[a.ID%chanNum] <- a
  72. }
  73. if start > end || len(res) == 0 {
  74. log.Info("sync asoAccountInfo(%d) finished! endID(%d)", i, start)
  75. break
  76. }
  77. start = res[len(res)-1].ID
  78. }
  79. }
  80. }
  81. func (s *Service) asoAccountInfoConsume(c chan *model.OriginAccountInfo) {
  82. for {
  83. a, ok := <-c
  84. if !ok {
  85. log.Error("asoAccountInfoChan closed")
  86. return
  87. }
  88. for {
  89. if err := s.syncAsoAccountInfo(a); err != nil {
  90. log.Error("fail to sync asoAccountInfo(%+v) error(%+v)", a, err)
  91. time.Sleep(100 * time.Millisecond)
  92. continue
  93. }
  94. break
  95. }
  96. }
  97. }
  98. func (s *Service) getAsoAccountReg(start, end, count int64) {
  99. chanNum := int64(s.c.FullSync.AsoAccountReg.ChanNum)
  100. initStart, initEnd, initCount := start, end, count
  101. for i := 0; i < _asoAccountRegOriginSharding; i++ {
  102. start, end, count = initStart, initEnd, initCount
  103. for {
  104. log.Info("getAsoAccountReg, start %d, end %d, count %d, suffix %d", start, end, count, i)
  105. var (
  106. res []*model.OriginAccountReg
  107. err error
  108. )
  109. for {
  110. if res, err = s.d.AsoAccountReg(context.Background(), start, count, int64(i)); err != nil {
  111. log.Error("fail to get AsoAccountReg error(%+v)", err)
  112. time.Sleep(100 * time.Millisecond)
  113. continue
  114. }
  115. break
  116. }
  117. for _, a := range res {
  118. s.asoAccountRegChan[a.ID%chanNum] <- a
  119. }
  120. if start > end || len(res) == 0 {
  121. log.Info("sync asoAccountReg(%d) finished! endID(%d)", i, start)
  122. break
  123. }
  124. start = res[len(res)-1].ID
  125. }
  126. }
  127. }
  128. func (s *Service) asoAccountRegConsume(c chan *model.OriginAccountReg) {
  129. for {
  130. a, ok := <-c
  131. if !ok {
  132. log.Error("asoAccountRegChan closed")
  133. return
  134. }
  135. for {
  136. if err := s.syncAsoAccountReg(a); err != nil {
  137. log.Error("fail to sync asoAccountReg(%+v) error(%+v)", a, err)
  138. time.Sleep(100 * time.Millisecond)
  139. continue
  140. }
  141. break
  142. }
  143. }
  144. }
  145. func (s *Service) getAsoAccountSns(start, end, count int64) {
  146. chanNum := int64(s.c.FullSync.AsoAccountReg.ChanNum)
  147. for {
  148. log.Info("getAsoAccountSns, start %d, end %d, count %d", start, end, count)
  149. var (
  150. res []*model.OriginAccountSns
  151. err error
  152. )
  153. for {
  154. if res, err = s.d.AsoAccountSns(context.Background(), start, count); err != nil {
  155. log.Error("fail to get AsoAccountSns error(%+v)", err)
  156. time.Sleep(100 * time.Millisecond)
  157. continue
  158. }
  159. break
  160. }
  161. for _, a := range res {
  162. s.asoAccountSnsChan[a.Mid%chanNum] <- a
  163. }
  164. if start > end || len(res) == 0 {
  165. log.Info("sync asoAccountSns finished! endID(%d)", start)
  166. break
  167. }
  168. start = res[len(res)-1].Mid
  169. }
  170. }
  171. func (s *Service) asoAccountSnsConsume(c chan *model.OriginAccountSns) {
  172. for {
  173. a, ok := <-c
  174. if !ok {
  175. log.Error("asoAccountSnsChan closed")
  176. return
  177. }
  178. for {
  179. if err := s.syncAsoAccountSns(a); err != nil {
  180. log.Error("fail to sync asoAccountSns(%+v) error(%+v)", a, err)
  181. time.Sleep(100 * time.Millisecond)
  182. continue
  183. }
  184. break
  185. }
  186. }
  187. }
  188. func (s *Service) getAsoTelBindLog(start, end, count int64) {
  189. chanNum := int64(s.c.FullSync.AsoAccountReg.ChanNum)
  190. for {
  191. log.Info("getAsoTelBindLog, start %d, end %d, count %d", start, end, count)
  192. var (
  193. res []*model.UserTel
  194. err error
  195. )
  196. for {
  197. if res, err = s.d.UserTel(context.Background(), start, count); err != nil {
  198. log.Error("fail to get UserTel error(%+v)", err)
  199. time.Sleep(100 * time.Millisecond)
  200. continue
  201. }
  202. break
  203. }
  204. for _, a := range res {
  205. s.asoTelBindLogChan[a.Mid%chanNum] <- a
  206. }
  207. if start > end || len(res) == 0 {
  208. log.Info("sync asoTelBindLog finished! endID(%d)", start)
  209. break
  210. }
  211. start = res[len(res)-1].Mid
  212. }
  213. }
  214. func (s *Service) asoTelBindLogConsume(c chan *model.UserTel) {
  215. filterStart := 1536572121
  216. filterEnd := 1536616436
  217. for {
  218. a, ok := <-c
  219. if !ok {
  220. log.Error("asoTelBindLogChan closed")
  221. return
  222. }
  223. for {
  224. var (
  225. err error
  226. telBindTime int64
  227. )
  228. if telBindTime, err = s.d.AsoTelBindLog(context.Background(), a.Mid); err != nil {
  229. log.Error("fail to get AsoTelBindLog error(%+v)", err)
  230. time.Sleep(100 * time.Millisecond)
  231. continue
  232. }
  233. if telBindTime == 0 {
  234. break
  235. }
  236. if telBindTime > int64(filterStart) && telBindTime < int64(filterEnd) {
  237. break
  238. }
  239. userTel := &model.UserTel{
  240. Mid: a.Mid,
  241. TelBindTime: telBindTime,
  242. }
  243. if _, err = s.d.UpdateUserTelBindTime(context.Background(), userTel); err != nil {
  244. log.Error("fail to update tel bind log userTel(%+v) error(%+v)", userTel, err)
  245. time.Sleep(100 * time.Millisecond)
  246. continue
  247. }
  248. break
  249. }
  250. }
  251. }