123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- package service
- import (
- "context"
- "go-common/app/job/main/passport-user/model"
- "go-common/library/log"
- "time"
- )
- func (s *Service) getAsoAccount(start, end, count int64) {
- chanNum := int64(s.c.FullSync.AsoAccount.ChanNum)
- for {
- log.Info("getAsoAccount, start %d, end %d, count %d", start, end, count)
- var (
- res []*model.OriginAccount
- err error
- )
- for {
- if res, err = s.d.AsoAccount(context.Background(), start, count); err != nil {
- log.Error("fail to get AsoAccount error(%+v)", err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- for _, a := range res {
- s.asoAccountChan[a.Mid%chanNum] <- a
- }
- if start > end || len(res) == 0 {
- log.Info("sync asoAccount finished! endID(%d)", start)
- break
- }
- start = res[len(res)-1].Mid
- }
- }
- func (s *Service) asoAccountConsume(c chan *model.OriginAccount) {
- for {
- a, ok := <-c
- if !ok {
- log.Error("asoAccountChan closed")
- return
- }
- for i := 0; i < _retry; i++ {
- if err := s.syncAsoAccount(a); err != nil {
- log.Error("fail to sync asoAccount(%+v) error(%+v)", a, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- }
- }
- func (s *Service) getAsoAccountInfo(start, end, count int64) {
- chanNum := int64(s.c.FullSync.AsoAccountInfo.ChanNum)
- initStart, initEnd, initCount := start, end, count
- for i := 0; i < _asoAccountInfoSharding; i++ {
- start, end, count = initStart, initEnd, initCount
- for {
- log.Info("getAsoAccountInfo, start %d, end %d, count %d, suffix %d", start, end, count, i)
- var (
- res []*model.OriginAccountInfo
- err error
- )
- for {
- if res, err = s.d.AsoAccountInfo(context.Background(), start, count, int64(i)); err != nil {
- log.Error("fail to get AsoAccountInfo error(%+v)", err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- for _, a := range res {
- s.asoAccountInfoChan[a.ID%chanNum] <- a
- }
- if start > end || len(res) == 0 {
- log.Info("sync asoAccountInfo(%d) finished! endID(%d)", i, start)
- break
- }
- start = res[len(res)-1].ID
- }
- }
- }
- func (s *Service) asoAccountInfoConsume(c chan *model.OriginAccountInfo) {
- for {
- a, ok := <-c
- if !ok {
- log.Error("asoAccountInfoChan closed")
- return
- }
- for {
- if err := s.syncAsoAccountInfo(a); err != nil {
- log.Error("fail to sync asoAccountInfo(%+v) error(%+v)", a, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- }
- }
- func (s *Service) getAsoAccountReg(start, end, count int64) {
- chanNum := int64(s.c.FullSync.AsoAccountReg.ChanNum)
- initStart, initEnd, initCount := start, end, count
- for i := 0; i < _asoAccountRegOriginSharding; i++ {
- start, end, count = initStart, initEnd, initCount
- for {
- log.Info("getAsoAccountReg, start %d, end %d, count %d, suffix %d", start, end, count, i)
- var (
- res []*model.OriginAccountReg
- err error
- )
- for {
- if res, err = s.d.AsoAccountReg(context.Background(), start, count, int64(i)); err != nil {
- log.Error("fail to get AsoAccountReg error(%+v)", err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- for _, a := range res {
- s.asoAccountRegChan[a.ID%chanNum] <- a
- }
- if start > end || len(res) == 0 {
- log.Info("sync asoAccountReg(%d) finished! endID(%d)", i, start)
- break
- }
- start = res[len(res)-1].ID
- }
- }
- }
- func (s *Service) asoAccountRegConsume(c chan *model.OriginAccountReg) {
- for {
- a, ok := <-c
- if !ok {
- log.Error("asoAccountRegChan closed")
- return
- }
- for {
- if err := s.syncAsoAccountReg(a); err != nil {
- log.Error("fail to sync asoAccountReg(%+v) error(%+v)", a, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- }
- }
- func (s *Service) getAsoAccountSns(start, end, count int64) {
- chanNum := int64(s.c.FullSync.AsoAccountReg.ChanNum)
- for {
- log.Info("getAsoAccountSns, start %d, end %d, count %d", start, end, count)
- var (
- res []*model.OriginAccountSns
- err error
- )
- for {
- if res, err = s.d.AsoAccountSns(context.Background(), start, count); err != nil {
- log.Error("fail to get AsoAccountSns error(%+v)", err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- for _, a := range res {
- s.asoAccountSnsChan[a.Mid%chanNum] <- a
- }
- if start > end || len(res) == 0 {
- log.Info("sync asoAccountSns finished! endID(%d)", start)
- break
- }
- start = res[len(res)-1].Mid
- }
- }
- func (s *Service) asoAccountSnsConsume(c chan *model.OriginAccountSns) {
- for {
- a, ok := <-c
- if !ok {
- log.Error("asoAccountSnsChan closed")
- return
- }
- for {
- if err := s.syncAsoAccountSns(a); err != nil {
- log.Error("fail to sync asoAccountSns(%+v) error(%+v)", a, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- }
- }
- func (s *Service) getAsoTelBindLog(start, end, count int64) {
- chanNum := int64(s.c.FullSync.AsoAccountReg.ChanNum)
- for {
- log.Info("getAsoTelBindLog, start %d, end %d, count %d", start, end, count)
- var (
- res []*model.UserTel
- err error
- )
- for {
- if res, err = s.d.UserTel(context.Background(), start, count); err != nil {
- log.Error("fail to get UserTel error(%+v)", err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- for _, a := range res {
- s.asoTelBindLogChan[a.Mid%chanNum] <- a
- }
- if start > end || len(res) == 0 {
- log.Info("sync asoTelBindLog finished! endID(%d)", start)
- break
- }
- start = res[len(res)-1].Mid
- }
- }
- func (s *Service) asoTelBindLogConsume(c chan *model.UserTel) {
- filterStart := 1536572121
- filterEnd := 1536616436
- for {
- a, ok := <-c
- if !ok {
- log.Error("asoTelBindLogChan closed")
- return
- }
- for {
- var (
- err error
- telBindTime int64
- )
- if telBindTime, err = s.d.AsoTelBindLog(context.Background(), a.Mid); err != nil {
- log.Error("fail to get AsoTelBindLog error(%+v)", err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- if telBindTime == 0 {
- break
- }
- if telBindTime > int64(filterStart) && telBindTime < int64(filterEnd) {
- break
- }
- userTel := &model.UserTel{
- Mid: a.Mid,
- TelBindTime: telBindTime,
- }
- if _, err = s.d.UpdateUserTelBindTime(context.Background(), userTel); err != nil {
- log.Error("fail to update tel bind log userTel(%+v) error(%+v)", userTel, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- }
- }
|