123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002 |
- package service
- import (
- "context"
- "encoding/json"
- "strings"
- "time"
- "go-common/app/job/main/passport-user/model"
- "go-common/library/database/sql"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "github.com/go-sql-driver/mysql"
- "github.com/pkg/errors"
- )
- type asoAccountBMsg struct {
- Action string
- Table string
- New *model.OriginAccount
- Timestamp int64
- }
- type asoAccountInfoBMsg struct {
- Action string
- Table string
- New *model.OriginAccountInfo
- Timestamp int64
- }
- type asoAccountRegBMsg struct {
- Action string
- Table string
- New *model.OriginAccountReg
- Timestamp int64
- }
- type asoAccountSnsBMsg struct {
- Action string
- Table string
- New *model.OriginAccountSns
- Timestamp int64
- }
- func (s *Service) consumeproc() {
- s.group.New = func(msg *databus.Message) (res interface{}, err error) {
- bmsg := new(model.BMsg)
- if err = json.Unmarshal(msg.Value, bmsg); err != nil {
- log.Error("json.Unmarshal(%s) error(%+v)", string(msg.Value), err)
- return
- }
- log.Info("receive msg action(%s) table(%s) key(%s) partition(%d) offset(%d) timestamp(%d) New(%s) Old(%s)",
- bmsg.Action, bmsg.Table, msg.Key, msg.Partition, msg.Offset, msg.Timestamp, string(bmsg.New), string(bmsg.Old))
- if bmsg.Table == _asoAccountTable {
- asoAccountBMsg := &asoAccountBMsg{
- Action: bmsg.Action,
- Table: bmsg.Table,
- Timestamp: msg.Timestamp,
- }
- newAccount := new(model.OriginAccount)
- if err = json.Unmarshal(bmsg.New, newAccount); err != nil {
- log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
- return
- }
- asoAccountBMsg.New = newAccount
- return asoAccountBMsg, nil
- } else if bmsg.Table == _asoAccountSnsTable {
- asoAccountSnsBMsg := &asoAccountSnsBMsg{
- Action: bmsg.Action,
- Table: bmsg.Table,
- Timestamp: msg.Timestamp,
- }
- newAccountSns := new(model.OriginAccountSns)
- if err = json.Unmarshal(bmsg.New, newAccountSns); err != nil {
- log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
- return
- }
- asoAccountSnsBMsg.New = newAccountSns
- return asoAccountSnsBMsg, nil
- } else if strings.HasPrefix(bmsg.Table, _asoAccountInfoTable) {
- asoAccountInfoBMsg := &asoAccountInfoBMsg{
- Action: bmsg.Action,
- Table: bmsg.Table,
- Timestamp: msg.Timestamp,
- }
- newAccountInfo := new(model.OriginAccountInfo)
- if err = json.Unmarshal(bmsg.New, newAccountInfo); err != nil {
- log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
- return
- }
- asoAccountInfoBMsg.New = newAccountInfo
- return asoAccountInfoBMsg, nil
- } else if strings.HasPrefix(bmsg.Table, _asoAccountRegOriginTable) {
- asoAccountRegBMsg := &asoAccountRegBMsg{
- Action: bmsg.Action,
- Table: bmsg.Table,
- Timestamp: msg.Timestamp,
- }
- newAccountReg := new(model.OriginAccountReg)
- if err = json.Unmarshal(bmsg.New, newAccountReg); err != nil {
- log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
- return
- }
- asoAccountRegBMsg.New = newAccountReg
- return asoAccountRegBMsg, nil
- }
- return
- }
- s.group.Split = func(msg *databus.Message, data interface{}) int {
- if t, ok := data.(*asoAccountBMsg); ok {
- return int(t.New.Mid)
- } else if t, ok := data.(*asoAccountSnsBMsg); ok {
- return int(t.New.Mid)
- } else if t, ok := data.(*asoAccountInfoBMsg); ok {
- return int(t.New.Mid)
- } else if t, ok := data.(*asoAccountRegBMsg); ok {
- return int(t.New.Mid)
- }
- return 0
- }
- s.group.Do = func(msgs []interface{}) {
- for _, m := range msgs {
- if msg, ok := m.(*asoAccountBMsg); ok {
- for {
- if err := s.handleAsoAccount(msg); err != nil {
- log.Error("fail to handleAsoAccount msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- } else if msg, ok := m.(*asoAccountSnsBMsg); ok {
- for {
- if err := s.handleAsoAccountSns(msg); err != nil {
- log.Error("fail to handleAsoAccountSns msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- } else if msg, ok := m.(*asoAccountInfoBMsg); ok {
- for {
- if err := s.handleAsoAccountInfo(msg); err != nil {
- log.Error("fail to handleAsoAccountInfo msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- } else if msg, ok := m.(*asoAccountRegBMsg); ok {
- for {
- if err := s.handleAsoAccountReg(msg); err != nil {
- log.Error("fail to handleAsoAccountReg msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- break
- }
- }
- }
- }
- // start the group
- s.group.Start()
- log.Info("s.group.Start()")
- }
- func (s *Service) handleAsoAccount(msg *asoAccountBMsg) (err error) {
- switch msg.Action {
- case _insertAction:
- var (
- a *model.OriginAccount
- userBase *model.UserBase
- userEmail *model.UserEmail
- userTel *model.UserTel
- currentUserBase *model.UserBase
- currentUserTel *model.UserTel
- currentUserEmail *model.UserEmail
- )
- if a, err = s.d.GetAsoAccountByMid(context.Background(), msg.New.Mid); err != nil {
- log.Error("fail to get asoAccount by mid(%d) error(%+v)", msg.New.Mid, err)
- return
- }
- if userBase, err = s.convertAccountToUserBase(a); err != nil {
- log.Error("fail to convert AsoAccount(%+v) to UserBase error(%+v)", a, err)
- return
- }
- if currentUserBase, err = s.d.GetUserBaseByMid(context.Background(), a.Mid); err != nil {
- return
- }
- if currentUserBase == nil {
- if _, err = s.d.AddUserBase(context.Background(), userBase); err != nil {
- return
- }
- }
- if a.Tel != "" {
- if currentUserTel, err = s.d.GetUserTelByMid(context.Background(), a.Mid); err != nil {
- return
- }
- if currentUserTel == nil {
- userTel = s.convertAccountToUserTel(a)
- if err = s.addUserTel(userTel, msg.Timestamp); err != nil {
- return
- }
- }
- }
- if a.Email != "" {
- if currentUserEmail, err = s.d.GetUserEmailByMid(context.Background(), a.Mid); err != nil {
- return
- }
- if currentUserEmail == nil {
- userEmail = s.convertAccountToUserEmail(a)
- if err = s.addUserEmail(userEmail, msg.Timestamp); err != nil {
- return
- }
- }
- if err = s.updateUserEmailVerified(msg.New.Mid); err != nil {
- return
- }
- }
- s.addCache(func() {
- ub, e := s.d.GetUserBaseByMid(context.Background(), msg.New.Mid)
- if e == nil && ub != nil {
- s.d.SetUserBaseCache(context.Background(), ub)
- }
- })
- s.addCache(func() {
- ut, e := s.d.GetUserTelByMid(context.Background(), msg.New.Mid)
- if e == nil && ut != nil {
- s.d.SetUserTelCache(context.Background(), ut)
- }
- })
- s.addCache(func() {
- ue, e := s.d.GetUserEmailByMid(context.Background(), msg.New.Mid)
- if e == nil && ue != nil {
- s.d.SetUserEmailCache(context.Background(), ue)
- }
- })
- case _deleteAction:
- var tx *sql.Tx
- tx, err = s.d.BeginTran(context.Background())
- if err != nil {
- log.Error("s.dao.Begin error(%+v)", err)
- return
- }
- if _, err = s.d.TxDelUserBase(tx, msg.New.Mid); err != nil {
- tx.Rollback()
- return
- }
- if _, err = s.d.TxDelUserTel(tx, msg.New.Mid); err != nil {
- tx.Rollback()
- return
- }
- if _, err = s.d.TxDelUserEmail(tx, msg.New.Mid); err != nil {
- tx.Rollback()
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(), error(%v)", err)
- return
- }
- s.addCache(func() {
- s.d.DelUserBaseCache(context.Background(), msg.New.Mid)
- })
- s.addCache(func() {
- s.d.DelUserTelCache(context.Background(), msg.New.Mid)
- })
- s.addCache(func() {
- s.d.DelUserEmailCache(context.Background(), msg.New.Mid)
- })
- case _updateAction:
- var (
- a *model.OriginAccount
- userBase *model.UserBase
- userEmail *model.UserEmail
- userTel *model.UserTel
- currentUserBase *model.UserBase
- currentUserEmail *model.UserEmail
- currentUserTel *model.UserTel
- )
- if a, err = s.d.GetAsoAccountByMid(context.Background(), msg.New.Mid); err != nil {
- log.Error("fail to get asoAccount by mid(%d) error(%+v)", msg.New.Mid, err)
- return
- }
- if userBase, err = s.convertAccountToUserBase(a); err != nil {
- log.Error("fail to convert AsoAccount(%+v) to UserBase error(%+v)", a, err)
- return
- }
- userTel = s.convertAccountToUserTel(a)
- userEmail = s.convertAccountToUserEmail(a)
- if currentUserEmail, err = s.d.GetUserEmailByMid(context.Background(), a.Mid); err != nil {
- return
- }
- if currentUserTel, err = s.d.GetUserTelByMid(context.Background(), a.Mid); err != nil {
- return
- }
- if currentUserBase, err = s.d.GetUserBaseByMid(context.Background(), a.Mid); err != nil {
- return
- }
- if currentUserBase == nil {
- if _, err = s.d.AddUserBase(context.Background(), userBase); err != nil {
- return
- }
- } else {
- if _, err = s.d.UpdateUserBase(context.Background(), userBase); err != nil {
- return
- }
- }
- if currentUserTel != nil {
- if currentUserTel.TelBindTime == 0 {
- if err = s.updateUserTelAndBindTime(userTel, msg.Timestamp); err != nil {
- return
- }
- } else {
- if err = s.updateUserTel(userTel, msg.Timestamp); err != nil {
- return
- }
- }
- } else {
- if a.Tel != "" {
- if err = s.addUserTel(userTel, msg.Timestamp); err != nil {
- return
- }
- }
- }
- if currentUserEmail != nil {
- if currentUserEmail.EmailBindTime == 0 {
- if err = s.updateUserEmailAndBindTime(userEmail, msg.Timestamp); err != nil {
- return
- }
- } else {
- if err = s.updateUserEmail(userEmail, msg.Timestamp); err != nil {
- return
- }
- }
- if err = s.updateUserEmailVerified(msg.New.Mid); err != nil {
- return
- }
- } else {
- if a.Email != "" {
- if err = s.addUserEmail(userEmail, msg.Timestamp); err != nil {
- return
- }
- if err = s.updateUserEmailVerified(msg.New.Mid); err != nil {
- return
- }
- }
- }
- s.addCache(func() {
- ub, e := s.d.GetUserBaseByMid(context.Background(), msg.New.Mid)
- if e == nil && ub != nil {
- s.d.SetUserBaseCache(context.Background(), ub)
- }
- })
- s.addCache(func() {
- ut, e := s.d.GetUserTelByMid(context.Background(), msg.New.Mid)
- if e == nil && ut != nil {
- s.d.SetUserTelCache(context.Background(), ut)
- }
- })
- s.addCache(func() {
- ue, e := s.d.GetUserEmailByMid(context.Background(), msg.New.Mid)
- if e == nil && ue != nil {
- s.d.SetUserEmailCache(context.Background(), ue)
- }
- })
- }
- return
- }
- func (s *Service) handleAsoAccountSns(msg *asoAccountSnsBMsg) (err error) {
- var (
- a *model.OriginAccountSns
- currentSina *model.UserThirdBind
- currentQQ *model.UserThirdBind
- )
- switch msg.Action {
- case _insertAction:
- if a, err = s.d.GetAsoAccountSnsByMid(context.Background(), msg.New.Mid); err != nil {
- log.Error("fail to get asoAccountSns by mid(%d) error(%+v)", msg.New.Mid, err)
- return
- }
- if err = s.syncAsoAccountSns(a); err != nil {
- log.Error("fail to sync asoAccountSns(%+v) error(%+v)", a, err)
- return
- }
- case _updateAction:
- if a, err = s.d.GetAsoAccountSnsByMid(context.Background(), msg.New.Mid); err != nil {
- log.Error("fail to get asoAccountSns by mid(%d) error(%+v)", msg.New.Mid, err)
- return
- }
- qq := s.convertAccountSnsToUserThirdBindQQ(a)
- sina := s.convertAccountSnsToUserThirdBindSina(a)
- if currentSina, err = s.d.GetUserThirdBindByMidAndPlatform(context.Background(), a.Mid, _platformSINA); err != nil {
- return
- }
- if currentQQ, err = s.d.GetUserThirdBindByMidAndPlatform(context.Background(), a.Mid, _platformQQ); err != nil {
- return
- }
- if currentSina != nil {
- if _, err = s.d.UpdateUserThirdBind(context.Background(), sina); err != nil {
- return
- }
- } else {
- if a.SinaUID != 0 {
- if _, err = s.d.AddUserThirdBind(context.Background(), sina); err != nil {
- return
- }
- }
- }
- if currentQQ != nil {
- if _, err = s.d.UpdateUserThirdBind(context.Background(), qq); err != nil {
- return
- }
- } else {
- if a.QQOpenid != "" {
- if _, err = s.d.AddUserThirdBind(context.Background(), qq); err != nil {
- return
- }
- }
- }
- case _deleteAction:
- if _, err = s.d.DelUserThirdBind(context.Background(), msg.New.Mid); err != nil {
- return
- }
- s.addCache(func() {
- s.d.DelUserThirdBindQQCache(context.Background(), msg.New.Mid)
- })
- s.addCache(func() {
- s.d.DelUserThirdBindSinaCache(context.Background(), msg.New.Mid)
- })
- return
- }
- s.addCache(func() {
- resQQ, e := s.d.GetUserThirdBindByMidAndPlatform(context.Background(), msg.New.Mid, _platformQQ)
- if e == nil && resQQ != nil {
- s.d.SetUserThirdBindQQCache(context.Background(), resQQ)
- }
- })
- s.addCache(func() {
- resSina, e := s.d.GetUserThirdBindByMidAndPlatform(context.Background(), msg.New.Mid, _platformSINA)
- if e == nil && resSina != nil {
- s.d.SetUserThirdBindSinaCache(context.Background(), resSina)
- }
- })
- return
- }
- func (s *Service) handleAsoAccountInfo(msg *asoAccountInfoBMsg) (err error) {
- switch msg.Action {
- case _insertAction:
- var (
- a *model.OriginAccountInfo
- tx *sql.Tx
- userRegOrigin *model.UserRegOrigin
- )
- if a, err = s.d.GetAsoAccountInfoByMid(context.Background(), msg.New.Mid); err != nil {
- log.Error("fail to get asoAccountInfo by mid(%d) error(%+v)", msg.New.Mid, err)
- return
- }
- userRegOrigin = s.convertAccountInfoToUserRegOrigin(a)
- tx, err = s.d.BeginTran(context.Background())
- if err != nil {
- log.Error("s.dao.Begin error(%v)", err)
- return
- }
- if _, err = s.d.TxInsertUpdateUserRegOrigin(tx, userRegOrigin); err != nil {
- log.Error("fail to insert update userRegOrigin(%+v) error(%+v)", userRegOrigin, err)
- tx.Rollback()
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(), error(%v)", err)
- return
- }
- s.addCache(func() {
- uro, e := s.d.GetUserRegOriginByMid(context.Background(), msg.New.Mid)
- if e == nil && uro != nil {
- s.d.SetUserRegOriginCache(context.Background(), uro)
- }
- })
- case _updateAction:
- var (
- a *model.OriginAccountInfo
- tx *sql.Tx
- userSafeQuestion *model.UserSafeQuestion
- userEmail *model.UserEmail
- currentUserSafeQuestion *model.UserSafeQuestion
- )
- if a, err = s.d.GetAsoAccountInfoByMid(context.Background(), msg.New.Mid); err != nil {
- log.Error("fail to get asoAccountInfo by mid(%d) error(%+v)", msg.New.Mid, err)
- return
- }
- userEmail = s.convertAccountInfoToUserEmail(a)
- userSafeQuestion = s.convertAccountInfoToUserSafeQuestion(a)
- if currentUserSafeQuestion, err = s.d.GetUserSafeQuestionByMid(context.Background(), msg.New.Mid); err != nil {
- log.Error("fail to get userSafeQuestion by mid(%d) error(%+v)", msg.New.Mid, err)
- return
- }
- tx, err = s.d.BeginTran(context.Background())
- if err != nil {
- log.Error("s.dao.Begin error(%v)", err)
- return
- }
- if currentUserSafeQuestion != nil {
- if _, err = s.d.TxUpdateUserSafeQuesion(tx, userSafeQuestion); err != nil {
- log.Error("fail to update user safe question userSafeQuestion(%+v) error(%+v)", userSafeQuestion, err)
- tx.Rollback()
- return
- }
- } else {
- if a.SafeQuestion != 0 || a.SafeAnswer != "" {
- if _, err = s.d.TxAddUserSafeQuestion(tx, userSafeQuestion); err != nil {
- log.Error("fail to add user safe question userSafeQuestion(%+v) error(%+v)", userSafeQuestion, err)
- tx.Rollback()
- return
- }
- }
- }
- if _, err = s.d.TxUpdateUserEmailVerified(tx, userEmail); err != nil {
- log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
- tx.Rollback()
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(), error(%v)", err)
- return
- }
- }
- s.addCache(func() {
- usq, e := s.d.GetUserSafeQuestionByMid(context.Background(), msg.New.Mid)
- if e == nil && usq != nil {
- s.d.SetUserSafeQuestionCache(context.Background(), usq)
- }
- })
- s.addCache(func() {
- ue, e := s.d.GetUserEmailByMid(context.Background(), msg.New.Mid)
- if e == nil && ue != nil {
- s.d.SetUserEmailCache(context.Background(), ue)
- }
- })
- return
- }
- func (s *Service) handleAsoAccountReg(msg *asoAccountRegBMsg) (err error) {
- var a *model.OriginAccountReg
- if a, err = s.d.GetAsoAccountRegByMid(context.Background(), msg.New.Mid); err != nil {
- log.Error("fail to get asoAccountReg by mid(%d) error(%+v)", msg.New.Mid, err)
- return
- }
- switch msg.Action {
- case _insertAction:
- if err = s.syncAsoAccountReg(a); err != nil {
- log.Error("fail to sync asoAccountReg(%+v) error(%+v)", a, err)
- return
- }
- s.addCache(func() {
- uro, e := s.d.GetUserRegOriginByMid(context.Background(), msg.New.Mid)
- if e == nil && uro != nil {
- s.d.SetUserRegOriginCache(context.Background(), uro)
- }
- })
- }
- return
- }
- func (s *Service) handlerUpdateEmailDuplicate(userEmail *model.UserEmail, timestamp int64) (err error) {
- var (
- duplicateMid int64
- asoAccount *model.OriginAccount
- duplicateUserEmail *model.UserEmail
- )
- // 记录冲突日志
- userEmailDuplicate := &model.UserEmailDuplicate{
- Mid: userEmail.Mid,
- Email: userEmail.Email,
- Verified: userEmail.Verified,
- EmailBindTime: userEmail.EmailBindTime,
- Timestamp: timestamp,
- }
- if _, err = s.d.AddUserEmailDuplicate(context.Background(), userEmailDuplicate); err != nil {
- log.Error("fail to add user email duplicate userEmailDuplicate(%+v) error(%+v)", userEmailDuplicate, err)
- return
- }
- //1. 获取冲突mid
- if duplicateMid, err = s.d.GetMidByEmail(context.Background(), userEmail); err != nil {
- log.Error("fail to get mid by email userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- //2. 根据冲突mid查询老库最新数据
- if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
- log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
- return
- }
- duplicateUserEmail = s.convertAccountToUserEmail(asoAccount)
- // 3. 将冲突的Email设置为NULL
- duplicateUserMailToNil := &model.UserEmail{
- Mid: duplicateMid,
- }
- log.Info("handle update email duplicate, mid(%d) duplicateMid(%d)", userEmail.Mid, duplicateMid)
- if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserMailToNil); err != nil {
- log.Error("fail to update user email duplicateUserMailToNil(%+v) error(%+v)", duplicateUserMailToNil, err)
- return
- }
- // 4. 更新email
- if _, err = s.d.UpdateUserEmail(context.Background(), userEmail); err != nil {
- log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- // 5. 更新冲突的email
- if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserEmail); err != nil {
- log.Error("fail to update user email duplicateUserEmail(%+v) error(%+v)", duplicateUserEmail, err)
- return
- }
- return
- }
- func (s *Service) handlerUpdateEmailAndBindTimeDuplicate(userEmail *model.UserEmail, timestamp int64) (err error) {
- var (
- duplicateMid int64
- asoAccount *model.OriginAccount
- duplicateUserEmail *model.UserEmail
- )
- // 记录冲突日志
- userEmailDuplicate := &model.UserEmailDuplicate{
- Mid: userEmail.Mid,
- Email: userEmail.Email,
- Verified: userEmail.Verified,
- EmailBindTime: userEmail.EmailBindTime,
- Timestamp: timestamp,
- }
- if _, err = s.d.AddUserEmailDuplicate(context.Background(), userEmailDuplicate); err != nil {
- log.Error("fail to add user email duplicate userEmailDuplicate(%+v) error(%+v)", userEmailDuplicate, err)
- return
- }
- //1. 获取冲突mid
- if duplicateMid, err = s.d.GetMidByEmail(context.Background(), userEmail); err != nil {
- log.Error("fail to get mid by email userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- //2. 根据冲突mid查询老库最新数据
- if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
- log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
- return
- }
- duplicateUserEmail = s.convertAccountToUserEmail(asoAccount)
- // 3. 将冲突的Email设置为NULL
- duplicateUserMailToNil := &model.UserEmail{
- Mid: duplicateMid,
- }
- log.Info("handle update email and bind time duplicate, mid(%d) duplicateMid(%d)", userEmail.Mid, duplicateMid)
- if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserMailToNil); err != nil {
- log.Error("fail to update user email duplicateUserMailToNil(%+v) error(%+v)", duplicateUserMailToNil, err)
- return
- }
- // 4. 更新email and bind time
- if _, err = s.d.UpdateUserEmailAndBindTime(context.Background(), userEmail); err != nil {
- log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- // 5. 更新冲突的email
- if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserEmail); err != nil {
- log.Error("fail to update user email duplicateUserEmail(%+v) error(%+v)", duplicateUserEmail, err)
- return
- }
- return
- }
- func (s *Service) handlerInsertEmailDuplicate(userEmail *model.UserEmail, timestamp int64) (err error) {
- var (
- duplicateMid int64
- asoAccount *model.OriginAccount
- duplicateUserEmail *model.UserEmail
- )
- // 记录冲突日志
- userEmailDuplicate := &model.UserEmailDuplicate{
- Mid: userEmail.Mid,
- Email: userEmail.Email,
- Verified: userEmail.Verified,
- EmailBindTime: userEmail.EmailBindTime,
- Timestamp: timestamp,
- }
- if _, err = s.d.AddUserEmailDuplicate(context.Background(), userEmailDuplicate); err != nil {
- log.Error("fail to add user email duplicate userEmailDuplicate(%+v) error(%+v)", userEmailDuplicate, err)
- return
- }
- //1. 获取冲突mid
- if duplicateMid, err = s.d.GetMidByEmail(context.Background(), userEmail); err != nil {
- log.Error("fail to get mid by email userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- //2. 根据冲突mid查询老库最新数据
- if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
- log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
- return
- }
- duplicateUserEmail = s.convertAccountToUserEmail(asoAccount)
- // 3. 将冲突的Email设置为NULL
- duplicateUserMailToNil := &model.UserEmail{
- Mid: duplicateMid,
- }
- log.Info("handle insert email duplicate, mid(%d) duplicateMid(%d)", userEmail.Mid, duplicateMid)
- if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserMailToNil); err != nil {
- log.Error("fail to update user email duplicateUserMailToNil(%+v) error(%+v)", duplicateUserMailToNil, err)
- return
- }
- // 4. 新增email
- if _, err = s.d.AddUserEmail(context.Background(), userEmail); err != nil {
- log.Error("fail to add user email userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- // 5. 更新冲突的email
- if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserEmail); err != nil {
- log.Error("fail to update user email duplicateUserEmail(%+v) error(%+v)", duplicateUserEmail, err)
- return
- }
- return
- }
- func (s *Service) handlerUpdateTelDuplicate(userTel *model.UserTel, timestamp int64) (err error) {
- var (
- duplicateMid int64
- asoAccount *model.OriginAccount
- duplicateUserTel *model.UserTel
- )
- // 记录冲突日志
- userTelDuplicate := &model.UserTelDuplicate{
- Mid: userTel.Mid,
- Tel: userTel.Tel,
- Cid: userTel.Cid,
- TelBindTime: userTel.TelBindTime,
- Timestamp: timestamp,
- }
- if _, err = s.d.AddUserTelDuplicate(context.Background(), userTelDuplicate); err != nil {
- log.Error("fail to add user tel duplicate userTelDuplicate(%+v) error(%+v)", userTelDuplicate, err)
- return
- }
- //1. 获取冲突mid
- if duplicateMid, err = s.d.GetMidByTel(context.Background(), userTel); err != nil {
- log.Error("fail to get mid by tel userTel(%+v) error(%+v)", userTel, err)
- return
- }
- //2. 根据冲突mid查询老库最新数据
- if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
- log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
- return
- }
- duplicateUserTel = s.convertAccountToUserTel(asoAccount)
- // 3. 将冲突的Tel设置为NULL
- duplicateUserTelToNil := &model.UserTel{
- Mid: duplicateMid,
- }
- log.Info("handle update tel duplicate, mid(%d) duplicateMid(%d)", userTel.Mid, duplicateMid)
- if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTelToNil); err != nil {
- log.Error("fail to update user tel duplicateUserTelToNil(%+v) error(%+v)", duplicateUserTelToNil, err)
- return
- }
- // 4. 更新tel
- if _, err = s.d.UpdateUserTel(context.Background(), userTel); err != nil {
- log.Error("fail to update user tel userTel(%+v) error(%+v)", userTel, err)
- return
- }
- // 5. 更新冲突tel
- if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTel); err != nil {
- log.Error("fail to update user tel duplicateUserTel(%+v) error(%+v)", duplicateUserTel, err)
- return
- }
- return
- }
- func (s *Service) handlerUpdateTelAndBindTimeDuplicate(userTel *model.UserTel, timestamp int64) (err error) {
- var (
- duplicateMid int64
- asoAccount *model.OriginAccount
- duplicateUserTel *model.UserTel
- )
- // 记录冲突日志
- userTelDuplicate := &model.UserTelDuplicate{
- Mid: userTel.Mid,
- Tel: userTel.Tel,
- Cid: userTel.Cid,
- TelBindTime: userTel.TelBindTime,
- Timestamp: timestamp,
- }
- if _, err = s.d.AddUserTelDuplicate(context.Background(), userTelDuplicate); err != nil {
- log.Error("fail to add user tel duplicate userTelDuplicate(%+v) error(%+v)", userTelDuplicate, err)
- return
- }
- //1. 获取冲突mid
- if duplicateMid, err = s.d.GetMidByTel(context.Background(), userTel); err != nil {
- log.Error("fail to get mid by tel userTel(%+v) error(%+v)", userTel, err)
- return
- }
- //2. 根据冲突mid查询老库最新数据
- if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
- log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
- return
- }
- duplicateUserTel = s.convertAccountToUserTel(asoAccount)
- // 3. 将冲突的Tel设置为NULL
- duplicateUserTelToNil := &model.UserTel{
- Mid: duplicateMid,
- }
- log.Info("handle update tel and bind time duplicate, mid(%d) duplicateMid(%d)", userTel.Mid, duplicateMid)
- if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTelToNil); err != nil {
- log.Error("fail to update user tel duplicateUserTelToNil(%+v) error(%+v)", duplicateUserTelToNil, err)
- return
- }
- // 4. 更新tel
- if _, err = s.d.UpdateUserTelAndBindTime(context.Background(), userTel); err != nil {
- log.Error("fail to update user tel userTel(%+v) error(%+v)", userTel, err)
- return
- }
- // 5. 更新冲突tel
- if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTel); err != nil {
- log.Error("fail to update user tel duplicateUserTel(%+v) error(%+v)", duplicateUserTel, err)
- return
- }
- return
- }
- func (s *Service) handlerInsertTelDuplicate(userTel *model.UserTel, timestamp int64) (err error) {
- var (
- duplicateMid int64
- asoAccount *model.OriginAccount
- duplicateUserTel *model.UserTel
- )
- // 记录冲突日志
- userTelDuplicate := &model.UserTelDuplicate{
- Mid: userTel.Mid,
- Tel: userTel.Tel,
- Cid: userTel.Cid,
- TelBindTime: userTel.TelBindTime,
- Timestamp: timestamp,
- }
- if _, err = s.d.AddUserTelDuplicate(context.Background(), userTelDuplicate); err != nil {
- log.Error("fail to add user tel duplicate userTelDuplicate(%+v) error(%+v)", userTelDuplicate, err)
- return
- }
- //1. 获取冲突mid
- if duplicateMid, err = s.d.GetMidByTel(context.Background(), userTel); err != nil {
- log.Error("fail to get mid by tel userTel(%+v) error(%+v)", userTel, err)
- return
- }
- //2. 根据冲突mid查询老库最新数据
- if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
- log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
- return
- }
- duplicateUserTel = s.convertAccountToUserTel(asoAccount)
- // 3. 将冲突的Tel设置为NULL
- duplicateUserTelToNil := &model.UserTel{
- Mid: duplicateMid,
- }
- log.Info("handle insert tel duplicate, mid(%d) duplicateMid(%d)", userTel.Mid, duplicateMid)
- if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTelToNil); err != nil {
- log.Error("fail to update user tel duplicateUserTelToNil(%+v) error(%+v)", duplicateUserTelToNil, err)
- return
- }
- // 4. 更新tel
- if _, err = s.d.AddUserTel(context.Background(), userTel); err != nil {
- log.Error("fail to update user tel userTel(%+v) error(%+v)", userTel, err)
- return
- }
- // 5. 更新冲突tel
- if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTel); err != nil {
- log.Error("fail to update user tel duplicateUserTel(%+v) error(%+v)", duplicateUserTel, err)
- return
- }
- return
- }
- func (s *Service) updateUserEmailVerified(mid int64) (err error) {
- var (
- info *model.OriginAccountInfo
- userEmail *model.UserEmail
- )
- if info, err = s.d.GetAsoAccountInfoByMid(context.Background(), mid); err != nil {
- log.Error("fail to get asoAccountInfo by mid(%d) error(%+v)", mid, err)
- return
- }
- userEmail = s.convertAccountInfoToUserEmail(info)
- if _, err = s.d.UpdateUserEmailVerified(context.Background(), userEmail); err != nil {
- log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- return
- }
- func (s *Service) addUserEmail(userEmail *model.UserEmail, timestamp int64) (err error) {
- if _, err = s.d.AddUserEmail(context.Background(), userEmail); err != nil {
- log.Error("fail to add user email userEmail(%+v) error(%+v)", userEmail, err)
- switch nErr := errors.Cause(err).(type) {
- case *mysql.MySQLError:
- if nErr.Number == _mySQLErrCodeDuplicateEntry {
- if err = s.handlerInsertEmailDuplicate(userEmail, timestamp); err != nil {
- log.Error("fail to handlerInsertEmailDuplicate userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- err = nil
- return
- }
- }
- return
- }
- return
- }
- func (s *Service) updateUserEmail(userEmail *model.UserEmail, timestamp int64) (err error) {
- if _, err = s.d.UpdateUserEmail(context.Background(), userEmail); err != nil {
- log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
- switch nErr := errors.Cause(err).(type) {
- case *mysql.MySQLError:
- if nErr.Number == _mySQLErrCodeDuplicateEntry {
- if err = s.handlerUpdateEmailDuplicate(userEmail, timestamp); err != nil {
- log.Error("fail to handlerUpdateEmailDuplicate userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- err = nil
- return
- }
- }
- return
- }
- return
- }
- func (s *Service) updateUserEmailAndBindTime(userEmail *model.UserEmail, timestamp int64) (err error) {
- if _, err = s.d.UpdateUserEmailAndBindTime(context.Background(), userEmail); err != nil {
- log.Error("fail to update user email and bind time userEmail(%+v) error(%+v)", userEmail, err)
- switch nErr := errors.Cause(err).(type) {
- case *mysql.MySQLError:
- if nErr.Number == _mySQLErrCodeDuplicateEntry {
- if err = s.handlerUpdateEmailAndBindTimeDuplicate(userEmail, timestamp); err != nil {
- log.Error("fail to handlerUpdateEmailAndBindTimeDuplicate userEmail(%+v) error(%+v)", userEmail, err)
- return
- }
- err = nil
- return
- }
- }
- return
- }
- return
- }
- func (s *Service) addUserTel(userTel *model.UserTel, timestamp int64) (err error) {
- if _, err = s.d.AddUserTel(context.Background(), userTel); err != nil {
- log.Error("fail to add user tel userTel(%+v) error(%+v)", userTel, err)
- switch nErr := errors.Cause(err).(type) {
- case *mysql.MySQLError:
- if nErr.Number == _mySQLErrCodeDuplicateEntry {
- if err = s.handlerInsertTelDuplicate(userTel, timestamp); err != nil {
- log.Error("fail to handlerTelDuplicate userTel(%+v) error(%+v)", userTel, err)
- return
- }
- err = nil
- return
- }
- }
- return
- }
- return
- }
- func (s *Service) updateUserTel(userTel *model.UserTel, timestamp int64) (err error) {
- if _, err = s.d.UpdateUserTel(context.Background(), userTel); err != nil {
- log.Error("fail to update user tel userTel(%+v) error(%+v)", userTel, err)
- switch nErr := errors.Cause(err).(type) {
- case *mysql.MySQLError:
- if nErr.Number == _mySQLErrCodeDuplicateEntry {
- if err = s.handlerUpdateTelDuplicate(userTel, timestamp); err != nil {
- log.Error("fail to handlerTelDuplicate userTel(%+v) error(%+v)", userTel, err)
- return
- }
- err = nil
- return
- }
- }
- return
- }
- return
- }
- func (s *Service) updateUserTelAndBindTime(userTel *model.UserTel, timestamp int64) (err error) {
- if _, err = s.d.UpdateUserTelAndBindTime(context.Background(), userTel); err != nil {
- log.Error("fail to update user tel and bind time userTel(%+v) error(%+v)", userTel, err)
- switch nErr := errors.Cause(err).(type) {
- case *mysql.MySQLError:
- if nErr.Number == _mySQLErrCodeDuplicateEntry {
- if err = s.handlerUpdateTelAndBindTimeDuplicate(userTel, timestamp); err != nil {
- log.Error("fail to handlerTelDuplicate userTel(%+v) error(%+v)", userTel, err)
- return
- }
- err = nil
- return
- }
- }
- return
- }
- return
- }
|