aso_incr_migration.go 32 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/passport-user/model"
  8. "go-common/library/database/sql"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. "github.com/go-sql-driver/mysql"
  12. "github.com/pkg/errors"
  13. )
  14. type asoAccountBMsg struct {
  15. Action string
  16. Table string
  17. New *model.OriginAccount
  18. Timestamp int64
  19. }
  20. type asoAccountInfoBMsg struct {
  21. Action string
  22. Table string
  23. New *model.OriginAccountInfo
  24. Timestamp int64
  25. }
  26. type asoAccountRegBMsg struct {
  27. Action string
  28. Table string
  29. New *model.OriginAccountReg
  30. Timestamp int64
  31. }
  32. type asoAccountSnsBMsg struct {
  33. Action string
  34. Table string
  35. New *model.OriginAccountSns
  36. Timestamp int64
  37. }
  38. func (s *Service) consumeproc() {
  39. s.group.New = func(msg *databus.Message) (res interface{}, err error) {
  40. bmsg := new(model.BMsg)
  41. if err = json.Unmarshal(msg.Value, bmsg); err != nil {
  42. log.Error("json.Unmarshal(%s) error(%+v)", string(msg.Value), err)
  43. return
  44. }
  45. log.Info("receive msg action(%s) table(%s) key(%s) partition(%d) offset(%d) timestamp(%d) New(%s) Old(%s)",
  46. bmsg.Action, bmsg.Table, msg.Key, msg.Partition, msg.Offset, msg.Timestamp, string(bmsg.New), string(bmsg.Old))
  47. if bmsg.Table == _asoAccountTable {
  48. asoAccountBMsg := &asoAccountBMsg{
  49. Action: bmsg.Action,
  50. Table: bmsg.Table,
  51. Timestamp: msg.Timestamp,
  52. }
  53. newAccount := new(model.OriginAccount)
  54. if err = json.Unmarshal(bmsg.New, newAccount); err != nil {
  55. log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
  56. return
  57. }
  58. asoAccountBMsg.New = newAccount
  59. return asoAccountBMsg, nil
  60. } else if bmsg.Table == _asoAccountSnsTable {
  61. asoAccountSnsBMsg := &asoAccountSnsBMsg{
  62. Action: bmsg.Action,
  63. Table: bmsg.Table,
  64. Timestamp: msg.Timestamp,
  65. }
  66. newAccountSns := new(model.OriginAccountSns)
  67. if err = json.Unmarshal(bmsg.New, newAccountSns); err != nil {
  68. log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
  69. return
  70. }
  71. asoAccountSnsBMsg.New = newAccountSns
  72. return asoAccountSnsBMsg, nil
  73. } else if strings.HasPrefix(bmsg.Table, _asoAccountInfoTable) {
  74. asoAccountInfoBMsg := &asoAccountInfoBMsg{
  75. Action: bmsg.Action,
  76. Table: bmsg.Table,
  77. Timestamp: msg.Timestamp,
  78. }
  79. newAccountInfo := new(model.OriginAccountInfo)
  80. if err = json.Unmarshal(bmsg.New, newAccountInfo); err != nil {
  81. log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
  82. return
  83. }
  84. asoAccountInfoBMsg.New = newAccountInfo
  85. return asoAccountInfoBMsg, nil
  86. } else if strings.HasPrefix(bmsg.Table, _asoAccountRegOriginTable) {
  87. asoAccountRegBMsg := &asoAccountRegBMsg{
  88. Action: bmsg.Action,
  89. Table: bmsg.Table,
  90. Timestamp: msg.Timestamp,
  91. }
  92. newAccountReg := new(model.OriginAccountReg)
  93. if err = json.Unmarshal(bmsg.New, newAccountReg); err != nil {
  94. log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
  95. return
  96. }
  97. asoAccountRegBMsg.New = newAccountReg
  98. return asoAccountRegBMsg, nil
  99. }
  100. return
  101. }
  102. s.group.Split = func(msg *databus.Message, data interface{}) int {
  103. if t, ok := data.(*asoAccountBMsg); ok {
  104. return int(t.New.Mid)
  105. } else if t, ok := data.(*asoAccountSnsBMsg); ok {
  106. return int(t.New.Mid)
  107. } else if t, ok := data.(*asoAccountInfoBMsg); ok {
  108. return int(t.New.Mid)
  109. } else if t, ok := data.(*asoAccountRegBMsg); ok {
  110. return int(t.New.Mid)
  111. }
  112. return 0
  113. }
  114. s.group.Do = func(msgs []interface{}) {
  115. for _, m := range msgs {
  116. if msg, ok := m.(*asoAccountBMsg); ok {
  117. for {
  118. if err := s.handleAsoAccount(msg); err != nil {
  119. log.Error("fail to handleAsoAccount msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
  120. time.Sleep(100 * time.Millisecond)
  121. continue
  122. }
  123. break
  124. }
  125. } else if msg, ok := m.(*asoAccountSnsBMsg); ok {
  126. for {
  127. if err := s.handleAsoAccountSns(msg); err != nil {
  128. log.Error("fail to handleAsoAccountSns msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
  129. time.Sleep(100 * time.Millisecond)
  130. continue
  131. }
  132. break
  133. }
  134. } else if msg, ok := m.(*asoAccountInfoBMsg); ok {
  135. for {
  136. if err := s.handleAsoAccountInfo(msg); err != nil {
  137. log.Error("fail to handleAsoAccountInfo msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
  138. time.Sleep(100 * time.Millisecond)
  139. continue
  140. }
  141. break
  142. }
  143. } else if msg, ok := m.(*asoAccountRegBMsg); ok {
  144. for {
  145. if err := s.handleAsoAccountReg(msg); err != nil {
  146. log.Error("fail to handleAsoAccountReg msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
  147. time.Sleep(100 * time.Millisecond)
  148. continue
  149. }
  150. break
  151. }
  152. }
  153. }
  154. }
  155. // start the group
  156. s.group.Start()
  157. log.Info("s.group.Start()")
  158. }
  159. func (s *Service) handleAsoAccount(msg *asoAccountBMsg) (err error) {
  160. switch msg.Action {
  161. case _insertAction:
  162. var (
  163. a *model.OriginAccount
  164. userBase *model.UserBase
  165. userEmail *model.UserEmail
  166. userTel *model.UserTel
  167. currentUserBase *model.UserBase
  168. currentUserTel *model.UserTel
  169. currentUserEmail *model.UserEmail
  170. )
  171. if a, err = s.d.GetAsoAccountByMid(context.Background(), msg.New.Mid); err != nil {
  172. log.Error("fail to get asoAccount by mid(%d) error(%+v)", msg.New.Mid, err)
  173. return
  174. }
  175. if userBase, err = s.convertAccountToUserBase(a); err != nil {
  176. log.Error("fail to convert AsoAccount(%+v) to UserBase error(%+v)", a, err)
  177. return
  178. }
  179. if currentUserBase, err = s.d.GetUserBaseByMid(context.Background(), a.Mid); err != nil {
  180. return
  181. }
  182. if currentUserBase == nil {
  183. if _, err = s.d.AddUserBase(context.Background(), userBase); err != nil {
  184. return
  185. }
  186. }
  187. if a.Tel != "" {
  188. if currentUserTel, err = s.d.GetUserTelByMid(context.Background(), a.Mid); err != nil {
  189. return
  190. }
  191. if currentUserTel == nil {
  192. userTel = s.convertAccountToUserTel(a)
  193. if err = s.addUserTel(userTel, msg.Timestamp); err != nil {
  194. return
  195. }
  196. }
  197. }
  198. if a.Email != "" {
  199. if currentUserEmail, err = s.d.GetUserEmailByMid(context.Background(), a.Mid); err != nil {
  200. return
  201. }
  202. if currentUserEmail == nil {
  203. userEmail = s.convertAccountToUserEmail(a)
  204. if err = s.addUserEmail(userEmail, msg.Timestamp); err != nil {
  205. return
  206. }
  207. }
  208. if err = s.updateUserEmailVerified(msg.New.Mid); err != nil {
  209. return
  210. }
  211. }
  212. s.addCache(func() {
  213. ub, e := s.d.GetUserBaseByMid(context.Background(), msg.New.Mid)
  214. if e == nil && ub != nil {
  215. s.d.SetUserBaseCache(context.Background(), ub)
  216. }
  217. })
  218. s.addCache(func() {
  219. ut, e := s.d.GetUserTelByMid(context.Background(), msg.New.Mid)
  220. if e == nil && ut != nil {
  221. s.d.SetUserTelCache(context.Background(), ut)
  222. }
  223. })
  224. s.addCache(func() {
  225. ue, e := s.d.GetUserEmailByMid(context.Background(), msg.New.Mid)
  226. if e == nil && ue != nil {
  227. s.d.SetUserEmailCache(context.Background(), ue)
  228. }
  229. })
  230. case _deleteAction:
  231. var tx *sql.Tx
  232. tx, err = s.d.BeginTran(context.Background())
  233. if err != nil {
  234. log.Error("s.dao.Begin error(%+v)", err)
  235. return
  236. }
  237. if _, err = s.d.TxDelUserBase(tx, msg.New.Mid); err != nil {
  238. tx.Rollback()
  239. return
  240. }
  241. if _, err = s.d.TxDelUserTel(tx, msg.New.Mid); err != nil {
  242. tx.Rollback()
  243. return
  244. }
  245. if _, err = s.d.TxDelUserEmail(tx, msg.New.Mid); err != nil {
  246. tx.Rollback()
  247. return
  248. }
  249. if err = tx.Commit(); err != nil {
  250. log.Error("tx.Commit(), error(%v)", err)
  251. return
  252. }
  253. s.addCache(func() {
  254. s.d.DelUserBaseCache(context.Background(), msg.New.Mid)
  255. })
  256. s.addCache(func() {
  257. s.d.DelUserTelCache(context.Background(), msg.New.Mid)
  258. })
  259. s.addCache(func() {
  260. s.d.DelUserEmailCache(context.Background(), msg.New.Mid)
  261. })
  262. case _updateAction:
  263. var (
  264. a *model.OriginAccount
  265. userBase *model.UserBase
  266. userEmail *model.UserEmail
  267. userTel *model.UserTel
  268. currentUserBase *model.UserBase
  269. currentUserEmail *model.UserEmail
  270. currentUserTel *model.UserTel
  271. )
  272. if a, err = s.d.GetAsoAccountByMid(context.Background(), msg.New.Mid); err != nil {
  273. log.Error("fail to get asoAccount by mid(%d) error(%+v)", msg.New.Mid, err)
  274. return
  275. }
  276. if userBase, err = s.convertAccountToUserBase(a); err != nil {
  277. log.Error("fail to convert AsoAccount(%+v) to UserBase error(%+v)", a, err)
  278. return
  279. }
  280. userTel = s.convertAccountToUserTel(a)
  281. userEmail = s.convertAccountToUserEmail(a)
  282. if currentUserEmail, err = s.d.GetUserEmailByMid(context.Background(), a.Mid); err != nil {
  283. return
  284. }
  285. if currentUserTel, err = s.d.GetUserTelByMid(context.Background(), a.Mid); err != nil {
  286. return
  287. }
  288. if currentUserBase, err = s.d.GetUserBaseByMid(context.Background(), a.Mid); err != nil {
  289. return
  290. }
  291. if currentUserBase == nil {
  292. if _, err = s.d.AddUserBase(context.Background(), userBase); err != nil {
  293. return
  294. }
  295. } else {
  296. if _, err = s.d.UpdateUserBase(context.Background(), userBase); err != nil {
  297. return
  298. }
  299. }
  300. if currentUserTel != nil {
  301. if currentUserTel.TelBindTime == 0 {
  302. if err = s.updateUserTelAndBindTime(userTel, msg.Timestamp); err != nil {
  303. return
  304. }
  305. } else {
  306. if err = s.updateUserTel(userTel, msg.Timestamp); err != nil {
  307. return
  308. }
  309. }
  310. } else {
  311. if a.Tel != "" {
  312. if err = s.addUserTel(userTel, msg.Timestamp); err != nil {
  313. return
  314. }
  315. }
  316. }
  317. if currentUserEmail != nil {
  318. if currentUserEmail.EmailBindTime == 0 {
  319. if err = s.updateUserEmailAndBindTime(userEmail, msg.Timestamp); err != nil {
  320. return
  321. }
  322. } else {
  323. if err = s.updateUserEmail(userEmail, msg.Timestamp); err != nil {
  324. return
  325. }
  326. }
  327. if err = s.updateUserEmailVerified(msg.New.Mid); err != nil {
  328. return
  329. }
  330. } else {
  331. if a.Email != "" {
  332. if err = s.addUserEmail(userEmail, msg.Timestamp); err != nil {
  333. return
  334. }
  335. if err = s.updateUserEmailVerified(msg.New.Mid); err != nil {
  336. return
  337. }
  338. }
  339. }
  340. s.addCache(func() {
  341. ub, e := s.d.GetUserBaseByMid(context.Background(), msg.New.Mid)
  342. if e == nil && ub != nil {
  343. s.d.SetUserBaseCache(context.Background(), ub)
  344. }
  345. })
  346. s.addCache(func() {
  347. ut, e := s.d.GetUserTelByMid(context.Background(), msg.New.Mid)
  348. if e == nil && ut != nil {
  349. s.d.SetUserTelCache(context.Background(), ut)
  350. }
  351. })
  352. s.addCache(func() {
  353. ue, e := s.d.GetUserEmailByMid(context.Background(), msg.New.Mid)
  354. if e == nil && ue != nil {
  355. s.d.SetUserEmailCache(context.Background(), ue)
  356. }
  357. })
  358. }
  359. return
  360. }
  361. func (s *Service) handleAsoAccountSns(msg *asoAccountSnsBMsg) (err error) {
  362. var (
  363. a *model.OriginAccountSns
  364. currentSina *model.UserThirdBind
  365. currentQQ *model.UserThirdBind
  366. )
  367. switch msg.Action {
  368. case _insertAction:
  369. if a, err = s.d.GetAsoAccountSnsByMid(context.Background(), msg.New.Mid); err != nil {
  370. log.Error("fail to get asoAccountSns by mid(%d) error(%+v)", msg.New.Mid, err)
  371. return
  372. }
  373. if err = s.syncAsoAccountSns(a); err != nil {
  374. log.Error("fail to sync asoAccountSns(%+v) error(%+v)", a, err)
  375. return
  376. }
  377. case _updateAction:
  378. if a, err = s.d.GetAsoAccountSnsByMid(context.Background(), msg.New.Mid); err != nil {
  379. log.Error("fail to get asoAccountSns by mid(%d) error(%+v)", msg.New.Mid, err)
  380. return
  381. }
  382. qq := s.convertAccountSnsToUserThirdBindQQ(a)
  383. sina := s.convertAccountSnsToUserThirdBindSina(a)
  384. if currentSina, err = s.d.GetUserThirdBindByMidAndPlatform(context.Background(), a.Mid, _platformSINA); err != nil {
  385. return
  386. }
  387. if currentQQ, err = s.d.GetUserThirdBindByMidAndPlatform(context.Background(), a.Mid, _platformQQ); err != nil {
  388. return
  389. }
  390. if currentSina != nil {
  391. if _, err = s.d.UpdateUserThirdBind(context.Background(), sina); err != nil {
  392. return
  393. }
  394. } else {
  395. if a.SinaUID != 0 {
  396. if _, err = s.d.AddUserThirdBind(context.Background(), sina); err != nil {
  397. return
  398. }
  399. }
  400. }
  401. if currentQQ != nil {
  402. if _, err = s.d.UpdateUserThirdBind(context.Background(), qq); err != nil {
  403. return
  404. }
  405. } else {
  406. if a.QQOpenid != "" {
  407. if _, err = s.d.AddUserThirdBind(context.Background(), qq); err != nil {
  408. return
  409. }
  410. }
  411. }
  412. case _deleteAction:
  413. if _, err = s.d.DelUserThirdBind(context.Background(), msg.New.Mid); err != nil {
  414. return
  415. }
  416. s.addCache(func() {
  417. s.d.DelUserThirdBindQQCache(context.Background(), msg.New.Mid)
  418. })
  419. s.addCache(func() {
  420. s.d.DelUserThirdBindSinaCache(context.Background(), msg.New.Mid)
  421. })
  422. return
  423. }
  424. s.addCache(func() {
  425. resQQ, e := s.d.GetUserThirdBindByMidAndPlatform(context.Background(), msg.New.Mid, _platformQQ)
  426. if e == nil && resQQ != nil {
  427. s.d.SetUserThirdBindQQCache(context.Background(), resQQ)
  428. }
  429. })
  430. s.addCache(func() {
  431. resSina, e := s.d.GetUserThirdBindByMidAndPlatform(context.Background(), msg.New.Mid, _platformSINA)
  432. if e == nil && resSina != nil {
  433. s.d.SetUserThirdBindSinaCache(context.Background(), resSina)
  434. }
  435. })
  436. return
  437. }
  438. func (s *Service) handleAsoAccountInfo(msg *asoAccountInfoBMsg) (err error) {
  439. switch msg.Action {
  440. case _insertAction:
  441. var (
  442. a *model.OriginAccountInfo
  443. tx *sql.Tx
  444. userRegOrigin *model.UserRegOrigin
  445. )
  446. if a, err = s.d.GetAsoAccountInfoByMid(context.Background(), msg.New.Mid); err != nil {
  447. log.Error("fail to get asoAccountInfo by mid(%d) error(%+v)", msg.New.Mid, err)
  448. return
  449. }
  450. userRegOrigin = s.convertAccountInfoToUserRegOrigin(a)
  451. tx, err = s.d.BeginTran(context.Background())
  452. if err != nil {
  453. log.Error("s.dao.Begin error(%v)", err)
  454. return
  455. }
  456. if _, err = s.d.TxInsertUpdateUserRegOrigin(tx, userRegOrigin); err != nil {
  457. log.Error("fail to insert update userRegOrigin(%+v) error(%+v)", userRegOrigin, err)
  458. tx.Rollback()
  459. return
  460. }
  461. if err = tx.Commit(); err != nil {
  462. log.Error("tx.Commit(), error(%v)", err)
  463. return
  464. }
  465. s.addCache(func() {
  466. uro, e := s.d.GetUserRegOriginByMid(context.Background(), msg.New.Mid)
  467. if e == nil && uro != nil {
  468. s.d.SetUserRegOriginCache(context.Background(), uro)
  469. }
  470. })
  471. case _updateAction:
  472. var (
  473. a *model.OriginAccountInfo
  474. tx *sql.Tx
  475. userSafeQuestion *model.UserSafeQuestion
  476. userEmail *model.UserEmail
  477. currentUserSafeQuestion *model.UserSafeQuestion
  478. )
  479. if a, err = s.d.GetAsoAccountInfoByMid(context.Background(), msg.New.Mid); err != nil {
  480. log.Error("fail to get asoAccountInfo by mid(%d) error(%+v)", msg.New.Mid, err)
  481. return
  482. }
  483. userEmail = s.convertAccountInfoToUserEmail(a)
  484. userSafeQuestion = s.convertAccountInfoToUserSafeQuestion(a)
  485. if currentUserSafeQuestion, err = s.d.GetUserSafeQuestionByMid(context.Background(), msg.New.Mid); err != nil {
  486. log.Error("fail to get userSafeQuestion by mid(%d) error(%+v)", msg.New.Mid, err)
  487. return
  488. }
  489. tx, err = s.d.BeginTran(context.Background())
  490. if err != nil {
  491. log.Error("s.dao.Begin error(%v)", err)
  492. return
  493. }
  494. if currentUserSafeQuestion != nil {
  495. if _, err = s.d.TxUpdateUserSafeQuesion(tx, userSafeQuestion); err != nil {
  496. log.Error("fail to update user safe question userSafeQuestion(%+v) error(%+v)", userSafeQuestion, err)
  497. tx.Rollback()
  498. return
  499. }
  500. } else {
  501. if a.SafeQuestion != 0 || a.SafeAnswer != "" {
  502. if _, err = s.d.TxAddUserSafeQuestion(tx, userSafeQuestion); err != nil {
  503. log.Error("fail to add user safe question userSafeQuestion(%+v) error(%+v)", userSafeQuestion, err)
  504. tx.Rollback()
  505. return
  506. }
  507. }
  508. }
  509. if _, err = s.d.TxUpdateUserEmailVerified(tx, userEmail); err != nil {
  510. log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
  511. tx.Rollback()
  512. return
  513. }
  514. if err = tx.Commit(); err != nil {
  515. log.Error("tx.Commit(), error(%v)", err)
  516. return
  517. }
  518. }
  519. s.addCache(func() {
  520. usq, e := s.d.GetUserSafeQuestionByMid(context.Background(), msg.New.Mid)
  521. if e == nil && usq != nil {
  522. s.d.SetUserSafeQuestionCache(context.Background(), usq)
  523. }
  524. })
  525. s.addCache(func() {
  526. ue, e := s.d.GetUserEmailByMid(context.Background(), msg.New.Mid)
  527. if e == nil && ue != nil {
  528. s.d.SetUserEmailCache(context.Background(), ue)
  529. }
  530. })
  531. return
  532. }
  533. func (s *Service) handleAsoAccountReg(msg *asoAccountRegBMsg) (err error) {
  534. var a *model.OriginAccountReg
  535. if a, err = s.d.GetAsoAccountRegByMid(context.Background(), msg.New.Mid); err != nil {
  536. log.Error("fail to get asoAccountReg by mid(%d) error(%+v)", msg.New.Mid, err)
  537. return
  538. }
  539. switch msg.Action {
  540. case _insertAction:
  541. if err = s.syncAsoAccountReg(a); err != nil {
  542. log.Error("fail to sync asoAccountReg(%+v) error(%+v)", a, err)
  543. return
  544. }
  545. s.addCache(func() {
  546. uro, e := s.d.GetUserRegOriginByMid(context.Background(), msg.New.Mid)
  547. if e == nil && uro != nil {
  548. s.d.SetUserRegOriginCache(context.Background(), uro)
  549. }
  550. })
  551. }
  552. return
  553. }
  554. func (s *Service) handlerUpdateEmailDuplicate(userEmail *model.UserEmail, timestamp int64) (err error) {
  555. var (
  556. duplicateMid int64
  557. asoAccount *model.OriginAccount
  558. duplicateUserEmail *model.UserEmail
  559. )
  560. // 记录冲突日志
  561. userEmailDuplicate := &model.UserEmailDuplicate{
  562. Mid: userEmail.Mid,
  563. Email: userEmail.Email,
  564. Verified: userEmail.Verified,
  565. EmailBindTime: userEmail.EmailBindTime,
  566. Timestamp: timestamp,
  567. }
  568. if _, err = s.d.AddUserEmailDuplicate(context.Background(), userEmailDuplicate); err != nil {
  569. log.Error("fail to add user email duplicate userEmailDuplicate(%+v) error(%+v)", userEmailDuplicate, err)
  570. return
  571. }
  572. //1. 获取冲突mid
  573. if duplicateMid, err = s.d.GetMidByEmail(context.Background(), userEmail); err != nil {
  574. log.Error("fail to get mid by email userEmail(%+v) error(%+v)", userEmail, err)
  575. return
  576. }
  577. //2. 根据冲突mid查询老库最新数据
  578. if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
  579. log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
  580. return
  581. }
  582. duplicateUserEmail = s.convertAccountToUserEmail(asoAccount)
  583. // 3. 将冲突的Email设置为NULL
  584. duplicateUserMailToNil := &model.UserEmail{
  585. Mid: duplicateMid,
  586. }
  587. log.Info("handle update email duplicate, mid(%d) duplicateMid(%d)", userEmail.Mid, duplicateMid)
  588. if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserMailToNil); err != nil {
  589. log.Error("fail to update user email duplicateUserMailToNil(%+v) error(%+v)", duplicateUserMailToNil, err)
  590. return
  591. }
  592. // 4. 更新email
  593. if _, err = s.d.UpdateUserEmail(context.Background(), userEmail); err != nil {
  594. log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
  595. return
  596. }
  597. // 5. 更新冲突的email
  598. if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserEmail); err != nil {
  599. log.Error("fail to update user email duplicateUserEmail(%+v) error(%+v)", duplicateUserEmail, err)
  600. return
  601. }
  602. return
  603. }
  604. func (s *Service) handlerUpdateEmailAndBindTimeDuplicate(userEmail *model.UserEmail, timestamp int64) (err error) {
  605. var (
  606. duplicateMid int64
  607. asoAccount *model.OriginAccount
  608. duplicateUserEmail *model.UserEmail
  609. )
  610. // 记录冲突日志
  611. userEmailDuplicate := &model.UserEmailDuplicate{
  612. Mid: userEmail.Mid,
  613. Email: userEmail.Email,
  614. Verified: userEmail.Verified,
  615. EmailBindTime: userEmail.EmailBindTime,
  616. Timestamp: timestamp,
  617. }
  618. if _, err = s.d.AddUserEmailDuplicate(context.Background(), userEmailDuplicate); err != nil {
  619. log.Error("fail to add user email duplicate userEmailDuplicate(%+v) error(%+v)", userEmailDuplicate, err)
  620. return
  621. }
  622. //1. 获取冲突mid
  623. if duplicateMid, err = s.d.GetMidByEmail(context.Background(), userEmail); err != nil {
  624. log.Error("fail to get mid by email userEmail(%+v) error(%+v)", userEmail, err)
  625. return
  626. }
  627. //2. 根据冲突mid查询老库最新数据
  628. if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
  629. log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
  630. return
  631. }
  632. duplicateUserEmail = s.convertAccountToUserEmail(asoAccount)
  633. // 3. 将冲突的Email设置为NULL
  634. duplicateUserMailToNil := &model.UserEmail{
  635. Mid: duplicateMid,
  636. }
  637. log.Info("handle update email and bind time duplicate, mid(%d) duplicateMid(%d)", userEmail.Mid, duplicateMid)
  638. if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserMailToNil); err != nil {
  639. log.Error("fail to update user email duplicateUserMailToNil(%+v) error(%+v)", duplicateUserMailToNil, err)
  640. return
  641. }
  642. // 4. 更新email and bind time
  643. if _, err = s.d.UpdateUserEmailAndBindTime(context.Background(), userEmail); err != nil {
  644. log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
  645. return
  646. }
  647. // 5. 更新冲突的email
  648. if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserEmail); err != nil {
  649. log.Error("fail to update user email duplicateUserEmail(%+v) error(%+v)", duplicateUserEmail, err)
  650. return
  651. }
  652. return
  653. }
  654. func (s *Service) handlerInsertEmailDuplicate(userEmail *model.UserEmail, timestamp int64) (err error) {
  655. var (
  656. duplicateMid int64
  657. asoAccount *model.OriginAccount
  658. duplicateUserEmail *model.UserEmail
  659. )
  660. // 记录冲突日志
  661. userEmailDuplicate := &model.UserEmailDuplicate{
  662. Mid: userEmail.Mid,
  663. Email: userEmail.Email,
  664. Verified: userEmail.Verified,
  665. EmailBindTime: userEmail.EmailBindTime,
  666. Timestamp: timestamp,
  667. }
  668. if _, err = s.d.AddUserEmailDuplicate(context.Background(), userEmailDuplicate); err != nil {
  669. log.Error("fail to add user email duplicate userEmailDuplicate(%+v) error(%+v)", userEmailDuplicate, err)
  670. return
  671. }
  672. //1. 获取冲突mid
  673. if duplicateMid, err = s.d.GetMidByEmail(context.Background(), userEmail); err != nil {
  674. log.Error("fail to get mid by email userEmail(%+v) error(%+v)", userEmail, err)
  675. return
  676. }
  677. //2. 根据冲突mid查询老库最新数据
  678. if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
  679. log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
  680. return
  681. }
  682. duplicateUserEmail = s.convertAccountToUserEmail(asoAccount)
  683. // 3. 将冲突的Email设置为NULL
  684. duplicateUserMailToNil := &model.UserEmail{
  685. Mid: duplicateMid,
  686. }
  687. log.Info("handle insert email duplicate, mid(%d) duplicateMid(%d)", userEmail.Mid, duplicateMid)
  688. if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserMailToNil); err != nil {
  689. log.Error("fail to update user email duplicateUserMailToNil(%+v) error(%+v)", duplicateUserMailToNil, err)
  690. return
  691. }
  692. // 4. 新增email
  693. if _, err = s.d.AddUserEmail(context.Background(), userEmail); err != nil {
  694. log.Error("fail to add user email userEmail(%+v) error(%+v)", userEmail, err)
  695. return
  696. }
  697. // 5. 更新冲突的email
  698. if _, err = s.d.UpdateUserEmail(context.Background(), duplicateUserEmail); err != nil {
  699. log.Error("fail to update user email duplicateUserEmail(%+v) error(%+v)", duplicateUserEmail, err)
  700. return
  701. }
  702. return
  703. }
  704. func (s *Service) handlerUpdateTelDuplicate(userTel *model.UserTel, timestamp int64) (err error) {
  705. var (
  706. duplicateMid int64
  707. asoAccount *model.OriginAccount
  708. duplicateUserTel *model.UserTel
  709. )
  710. // 记录冲突日志
  711. userTelDuplicate := &model.UserTelDuplicate{
  712. Mid: userTel.Mid,
  713. Tel: userTel.Tel,
  714. Cid: userTel.Cid,
  715. TelBindTime: userTel.TelBindTime,
  716. Timestamp: timestamp,
  717. }
  718. if _, err = s.d.AddUserTelDuplicate(context.Background(), userTelDuplicate); err != nil {
  719. log.Error("fail to add user tel duplicate userTelDuplicate(%+v) error(%+v)", userTelDuplicate, err)
  720. return
  721. }
  722. //1. 获取冲突mid
  723. if duplicateMid, err = s.d.GetMidByTel(context.Background(), userTel); err != nil {
  724. log.Error("fail to get mid by tel userTel(%+v) error(%+v)", userTel, err)
  725. return
  726. }
  727. //2. 根据冲突mid查询老库最新数据
  728. if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
  729. log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
  730. return
  731. }
  732. duplicateUserTel = s.convertAccountToUserTel(asoAccount)
  733. // 3. 将冲突的Tel设置为NULL
  734. duplicateUserTelToNil := &model.UserTel{
  735. Mid: duplicateMid,
  736. }
  737. log.Info("handle update tel duplicate, mid(%d) duplicateMid(%d)", userTel.Mid, duplicateMid)
  738. if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTelToNil); err != nil {
  739. log.Error("fail to update user tel duplicateUserTelToNil(%+v) error(%+v)", duplicateUserTelToNil, err)
  740. return
  741. }
  742. // 4. 更新tel
  743. if _, err = s.d.UpdateUserTel(context.Background(), userTel); err != nil {
  744. log.Error("fail to update user tel userTel(%+v) error(%+v)", userTel, err)
  745. return
  746. }
  747. // 5. 更新冲突tel
  748. if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTel); err != nil {
  749. log.Error("fail to update user tel duplicateUserTel(%+v) error(%+v)", duplicateUserTel, err)
  750. return
  751. }
  752. return
  753. }
  754. func (s *Service) handlerUpdateTelAndBindTimeDuplicate(userTel *model.UserTel, timestamp int64) (err error) {
  755. var (
  756. duplicateMid int64
  757. asoAccount *model.OriginAccount
  758. duplicateUserTel *model.UserTel
  759. )
  760. // 记录冲突日志
  761. userTelDuplicate := &model.UserTelDuplicate{
  762. Mid: userTel.Mid,
  763. Tel: userTel.Tel,
  764. Cid: userTel.Cid,
  765. TelBindTime: userTel.TelBindTime,
  766. Timestamp: timestamp,
  767. }
  768. if _, err = s.d.AddUserTelDuplicate(context.Background(), userTelDuplicate); err != nil {
  769. log.Error("fail to add user tel duplicate userTelDuplicate(%+v) error(%+v)", userTelDuplicate, err)
  770. return
  771. }
  772. //1. 获取冲突mid
  773. if duplicateMid, err = s.d.GetMidByTel(context.Background(), userTel); err != nil {
  774. log.Error("fail to get mid by tel userTel(%+v) error(%+v)", userTel, err)
  775. return
  776. }
  777. //2. 根据冲突mid查询老库最新数据
  778. if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
  779. log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
  780. return
  781. }
  782. duplicateUserTel = s.convertAccountToUserTel(asoAccount)
  783. // 3. 将冲突的Tel设置为NULL
  784. duplicateUserTelToNil := &model.UserTel{
  785. Mid: duplicateMid,
  786. }
  787. log.Info("handle update tel and bind time duplicate, mid(%d) duplicateMid(%d)", userTel.Mid, duplicateMid)
  788. if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTelToNil); err != nil {
  789. log.Error("fail to update user tel duplicateUserTelToNil(%+v) error(%+v)", duplicateUserTelToNil, err)
  790. return
  791. }
  792. // 4. 更新tel
  793. if _, err = s.d.UpdateUserTelAndBindTime(context.Background(), userTel); err != nil {
  794. log.Error("fail to update user tel userTel(%+v) error(%+v)", userTel, err)
  795. return
  796. }
  797. // 5. 更新冲突tel
  798. if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTel); err != nil {
  799. log.Error("fail to update user tel duplicateUserTel(%+v) error(%+v)", duplicateUserTel, err)
  800. return
  801. }
  802. return
  803. }
  804. func (s *Service) handlerInsertTelDuplicate(userTel *model.UserTel, timestamp int64) (err error) {
  805. var (
  806. duplicateMid int64
  807. asoAccount *model.OriginAccount
  808. duplicateUserTel *model.UserTel
  809. )
  810. // 记录冲突日志
  811. userTelDuplicate := &model.UserTelDuplicate{
  812. Mid: userTel.Mid,
  813. Tel: userTel.Tel,
  814. Cid: userTel.Cid,
  815. TelBindTime: userTel.TelBindTime,
  816. Timestamp: timestamp,
  817. }
  818. if _, err = s.d.AddUserTelDuplicate(context.Background(), userTelDuplicate); err != nil {
  819. log.Error("fail to add user tel duplicate userTelDuplicate(%+v) error(%+v)", userTelDuplicate, err)
  820. return
  821. }
  822. //1. 获取冲突mid
  823. if duplicateMid, err = s.d.GetMidByTel(context.Background(), userTel); err != nil {
  824. log.Error("fail to get mid by tel userTel(%+v) error(%+v)", userTel, err)
  825. return
  826. }
  827. //2. 根据冲突mid查询老库最新数据
  828. if asoAccount, err = s.d.GetAsoAccountByMid(context.Background(), duplicateMid); err != nil {
  829. log.Error("fail to get asoAccount by mid(%d) error(%+v)", duplicateMid, err)
  830. return
  831. }
  832. duplicateUserTel = s.convertAccountToUserTel(asoAccount)
  833. // 3. 将冲突的Tel设置为NULL
  834. duplicateUserTelToNil := &model.UserTel{
  835. Mid: duplicateMid,
  836. }
  837. log.Info("handle insert tel duplicate, mid(%d) duplicateMid(%d)", userTel.Mid, duplicateMid)
  838. if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTelToNil); err != nil {
  839. log.Error("fail to update user tel duplicateUserTelToNil(%+v) error(%+v)", duplicateUserTelToNil, err)
  840. return
  841. }
  842. // 4. 更新tel
  843. if _, err = s.d.AddUserTel(context.Background(), userTel); err != nil {
  844. log.Error("fail to update user tel userTel(%+v) error(%+v)", userTel, err)
  845. return
  846. }
  847. // 5. 更新冲突tel
  848. if _, err = s.d.UpdateUserTel(context.Background(), duplicateUserTel); err != nil {
  849. log.Error("fail to update user tel duplicateUserTel(%+v) error(%+v)", duplicateUserTel, err)
  850. return
  851. }
  852. return
  853. }
  854. func (s *Service) updateUserEmailVerified(mid int64) (err error) {
  855. var (
  856. info *model.OriginAccountInfo
  857. userEmail *model.UserEmail
  858. )
  859. if info, err = s.d.GetAsoAccountInfoByMid(context.Background(), mid); err != nil {
  860. log.Error("fail to get asoAccountInfo by mid(%d) error(%+v)", mid, err)
  861. return
  862. }
  863. userEmail = s.convertAccountInfoToUserEmail(info)
  864. if _, err = s.d.UpdateUserEmailVerified(context.Background(), userEmail); err != nil {
  865. log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
  866. return
  867. }
  868. return
  869. }
  870. func (s *Service) addUserEmail(userEmail *model.UserEmail, timestamp int64) (err error) {
  871. if _, err = s.d.AddUserEmail(context.Background(), userEmail); err != nil {
  872. log.Error("fail to add user email userEmail(%+v) error(%+v)", userEmail, err)
  873. switch nErr := errors.Cause(err).(type) {
  874. case *mysql.MySQLError:
  875. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  876. if err = s.handlerInsertEmailDuplicate(userEmail, timestamp); err != nil {
  877. log.Error("fail to handlerInsertEmailDuplicate userEmail(%+v) error(%+v)", userEmail, err)
  878. return
  879. }
  880. err = nil
  881. return
  882. }
  883. }
  884. return
  885. }
  886. return
  887. }
  888. func (s *Service) updateUserEmail(userEmail *model.UserEmail, timestamp int64) (err error) {
  889. if _, err = s.d.UpdateUserEmail(context.Background(), userEmail); err != nil {
  890. log.Error("fail to update user email userEmail(%+v) error(%+v)", userEmail, err)
  891. switch nErr := errors.Cause(err).(type) {
  892. case *mysql.MySQLError:
  893. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  894. if err = s.handlerUpdateEmailDuplicate(userEmail, timestamp); err != nil {
  895. log.Error("fail to handlerUpdateEmailDuplicate userEmail(%+v) error(%+v)", userEmail, err)
  896. return
  897. }
  898. err = nil
  899. return
  900. }
  901. }
  902. return
  903. }
  904. return
  905. }
  906. func (s *Service) updateUserEmailAndBindTime(userEmail *model.UserEmail, timestamp int64) (err error) {
  907. if _, err = s.d.UpdateUserEmailAndBindTime(context.Background(), userEmail); err != nil {
  908. log.Error("fail to update user email and bind time userEmail(%+v) error(%+v)", userEmail, err)
  909. switch nErr := errors.Cause(err).(type) {
  910. case *mysql.MySQLError:
  911. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  912. if err = s.handlerUpdateEmailAndBindTimeDuplicate(userEmail, timestamp); err != nil {
  913. log.Error("fail to handlerUpdateEmailAndBindTimeDuplicate userEmail(%+v) error(%+v)", userEmail, err)
  914. return
  915. }
  916. err = nil
  917. return
  918. }
  919. }
  920. return
  921. }
  922. return
  923. }
  924. func (s *Service) addUserTel(userTel *model.UserTel, timestamp int64) (err error) {
  925. if _, err = s.d.AddUserTel(context.Background(), userTel); err != nil {
  926. log.Error("fail to add user tel userTel(%+v) error(%+v)", userTel, err)
  927. switch nErr := errors.Cause(err).(type) {
  928. case *mysql.MySQLError:
  929. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  930. if err = s.handlerInsertTelDuplicate(userTel, timestamp); err != nil {
  931. log.Error("fail to handlerTelDuplicate userTel(%+v) error(%+v)", userTel, err)
  932. return
  933. }
  934. err = nil
  935. return
  936. }
  937. }
  938. return
  939. }
  940. return
  941. }
  942. func (s *Service) updateUserTel(userTel *model.UserTel, timestamp int64) (err error) {
  943. if _, err = s.d.UpdateUserTel(context.Background(), userTel); err != nil {
  944. log.Error("fail to update user tel userTel(%+v) error(%+v)", userTel, err)
  945. switch nErr := errors.Cause(err).(type) {
  946. case *mysql.MySQLError:
  947. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  948. if err = s.handlerUpdateTelDuplicate(userTel, timestamp); err != nil {
  949. log.Error("fail to handlerTelDuplicate userTel(%+v) error(%+v)", userTel, err)
  950. return
  951. }
  952. err = nil
  953. return
  954. }
  955. }
  956. return
  957. }
  958. return
  959. }
  960. func (s *Service) updateUserTelAndBindTime(userTel *model.UserTel, timestamp int64) (err error) {
  961. if _, err = s.d.UpdateUserTelAndBindTime(context.Background(), userTel); err != nil {
  962. log.Error("fail to update user tel and bind time userTel(%+v) error(%+v)", userTel, err)
  963. switch nErr := errors.Cause(err).(type) {
  964. case *mysql.MySQLError:
  965. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  966. if err = s.handlerUpdateTelAndBindTimeDuplicate(userTel, timestamp); err != nil {
  967. log.Error("fail to handlerTelDuplicate userTel(%+v) error(%+v)", userTel, err)
  968. return
  969. }
  970. err = nil
  971. return
  972. }
  973. }
  974. return
  975. }
  976. return
  977. }