sync_sns.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strconv"
  6. "time"
  7. "go-common/app/job/main/passport-sns/model"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. "github.com/go-sql-driver/mysql"
  11. "github.com/pkg/errors"
  12. )
  13. const (
  14. _asoAccountSnsTable = "aso_account_sns"
  15. _insertAction = "insert"
  16. _updateAction = "update"
  17. _deleteAction = "delete"
  18. _mySQLErrCodeDuplicateEntry = 1062
  19. )
  20. type asoAccountSnsBMsg struct {
  21. Action string
  22. Table string
  23. New *model.AsoAccountSns
  24. Old *model.AsoAccountSns
  25. Timestamp int64
  26. }
  27. func (s *Service) asoBinLogConsume() {
  28. s.group.New = func(msg *databus.Message) (res interface{}, err error) {
  29. bmsg := new(model.BMsg)
  30. if err = json.Unmarshal(msg.Value, bmsg); err != nil {
  31. log.Error("json.Unmarshal(%s) error(%+v)", string(msg.Value), err)
  32. return
  33. }
  34. log.Info("receive msg action(%s) table(%s) key(%s) partition(%d) offset(%d) timestamp(%d) New(%s) Old(%s)",
  35. bmsg.Action, bmsg.Table, msg.Key, msg.Partition, msg.Offset, msg.Timestamp, string(bmsg.New), string(bmsg.Old))
  36. if bmsg.Table == _asoAccountSnsTable {
  37. asoAccountSnsBMsg := &asoAccountSnsBMsg{
  38. Action: bmsg.Action,
  39. Table: bmsg.Table,
  40. Timestamp: msg.Timestamp,
  41. }
  42. newAccountSns := new(model.AsoAccountSns)
  43. if err = json.Unmarshal(bmsg.New, newAccountSns); err != nil {
  44. log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.New), err)
  45. return
  46. }
  47. asoAccountSnsBMsg.New = newAccountSns
  48. if bmsg.Action == _updateAction {
  49. oldAccountSns := new(model.AsoAccountSns)
  50. if err = json.Unmarshal(bmsg.Old, oldAccountSns); err != nil {
  51. log.Error("json.Unmarshal(%s) error(%+v)", string(bmsg.Old), err)
  52. return
  53. }
  54. asoAccountSnsBMsg.Old = oldAccountSns
  55. }
  56. return asoAccountSnsBMsg, nil
  57. }
  58. return
  59. }
  60. s.group.Split = func(msg *databus.Message, data interface{}) int {
  61. if t, ok := data.(*asoAccountSnsBMsg); ok {
  62. return int(t.New.Mid)
  63. }
  64. return 0
  65. }
  66. s.group.Do = func(msgs []interface{}) {
  67. for _, m := range msgs {
  68. if msg, ok := m.(*asoAccountSnsBMsg); ok {
  69. for {
  70. if err := s.handleAsoAccountSns(msg); err != nil {
  71. log.Error("fail to handleAsoAccountSns msg(%+v) new(%+v) error(%+v)", msg, msg.New, err)
  72. time.Sleep(100 * time.Millisecond)
  73. continue
  74. }
  75. break
  76. }
  77. }
  78. }
  79. }
  80. // start the group
  81. s.group.Start()
  82. log.Info("s.group.Start()")
  83. }
  84. func (s *Service) handleAsoAccountSns(msg *asoAccountSnsBMsg) (err error) {
  85. switch msg.Action {
  86. case _insertAction:
  87. if msg.New.QQOpenid != "" {
  88. if err = s.addSnsQQ(msg.New); err != nil {
  89. return
  90. }
  91. }
  92. if msg.New.SinaUID != 0 {
  93. if err = s.addSnsWeibo(msg.New); err != nil {
  94. return
  95. }
  96. }
  97. case _updateAction:
  98. if msg.New.QQOpenid != msg.Old.QQOpenid {
  99. if msg.New.QQOpenid == "" {
  100. if _, err = s.d.DelSnsUser(context.Background(), msg.New.Mid, model.PlatformQQ); err != nil {
  101. return
  102. }
  103. s.cache.Do(context.Background(), func(c context.Context) {
  104. s.d.DelSnsCache(c, msg.New.Mid, model.PlatformQQStr)
  105. })
  106. }
  107. if msg.New.QQOpenid != "" {
  108. var user *model.SnsUser
  109. if user, err = s.d.SnsUserByMid(context.Background(), msg.New.Mid, model.PlatformQQ); err != nil {
  110. return
  111. }
  112. if user == nil {
  113. if err = s.addSnsQQ(msg.New); err != nil {
  114. return
  115. }
  116. } else {
  117. if err = s.updateSnsQQ(msg.New); err != nil {
  118. return
  119. }
  120. }
  121. }
  122. return
  123. }
  124. if msg.New.SinaUID != msg.Old.SinaUID {
  125. if msg.New.SinaUID == 0 {
  126. if _, err = s.d.DelSnsUser(context.Background(), msg.New.Mid, model.PlatformWEIBO); err != nil {
  127. return
  128. }
  129. s.cache.Do(context.Background(), func(c context.Context) {
  130. s.d.DelSnsCache(c, msg.New.Mid, model.PlatformWEIBOStr)
  131. })
  132. }
  133. if msg.New.SinaUID != 0 {
  134. var user *model.SnsUser
  135. if user, err = s.d.SnsUserByMid(context.Background(), msg.New.Mid, model.PlatformWEIBO); err != nil {
  136. return
  137. }
  138. if user == nil {
  139. if err = s.addSnsWeibo(msg.New); err != nil {
  140. return
  141. }
  142. } else {
  143. if err = s.updateSnsWeibo(msg.New); err != nil {
  144. return
  145. }
  146. }
  147. }
  148. return
  149. }
  150. if msg.New.SinaAccessExpires != msg.Old.SinaAccessExpires {
  151. return s.updateSns(msg.New.Mid, msg.New.SinaAccessExpires, strconv.FormatInt(msg.New.SinaUID, 10), msg.New.SinaAccessToken, model.PlatformWEIBO)
  152. }
  153. if msg.New.QQAccessExpires != msg.Old.QQAccessExpires {
  154. var qqUnionID string
  155. if qqUnionID, err = s.d.QQUnionID(context.Background(), msg.New.QQOpenid); err != nil {
  156. return
  157. }
  158. if qqUnionID == "" {
  159. log.Error("update qq expires, qqUnionID is null, oldMsg(%+v) newMsg(%+v)", msg.Old, msg.New)
  160. return
  161. }
  162. return s.updateSns(msg.New.Mid, msg.New.QQAccessExpires, qqUnionID, msg.New.QQAccessToken, model.PlatformQQ)
  163. }
  164. case _deleteAction:
  165. if _, err = s.d.DelSnsUser(context.Background(), msg.New.Mid, model.PlatformQQ); err != nil {
  166. return
  167. }
  168. if _, err = s.d.DelSnsUser(context.Background(), msg.New.Mid, model.PlatformWEIBO); err != nil {
  169. return
  170. }
  171. s.cache.Do(context.Background(), func(c context.Context) {
  172. s.d.DelSnsCache(c, msg.New.Mid, model.PlatformQQStr)
  173. })
  174. s.cache.Do(context.Background(), func(c context.Context) {
  175. s.d.DelSnsCache(c, msg.New.Mid, model.PlatformWEIBOStr)
  176. })
  177. }
  178. return
  179. }
  180. func (s *Service) updateSns(mid, expires int64, unionID, token string, platform int) (err error) {
  181. tx, err := s.d.BeginSnsTran(context.Background())
  182. if err != nil {
  183. log.Error("s.d.BeginTran error(%+v)", err)
  184. return
  185. }
  186. defer func() {
  187. if err != nil {
  188. if err1 := tx.Rollback(); err1 != nil {
  189. log.Error("tx.Rollback() error(%v)", err1)
  190. }
  191. return
  192. }
  193. if err = tx.Commit(); err != nil {
  194. log.Error("tx.Commit() error(%v)", err)
  195. }
  196. }()
  197. snsUser := &model.SnsUser{
  198. Mid: mid,
  199. UnionID: unionID,
  200. Platform: platform,
  201. Expires: expires,
  202. }
  203. if _, err = s.d.TxUpdateSnsUserExpires(tx, snsUser); err != nil {
  204. return
  205. }
  206. snsToken := &model.SnsToken{
  207. Mid: mid,
  208. Platform: platform,
  209. Token: token,
  210. Expires: expires,
  211. }
  212. if _, err = s.d.TxUpdateSnsToken(tx, snsToken); err != nil {
  213. return
  214. }
  215. s.cache.Do(context.Background(), func(c context.Context) {
  216. proto := &model.SnsProto{
  217. Mid: snsUser.Mid,
  218. Platform: int32(platform),
  219. UnionID: snsUser.UnionID,
  220. Expires: snsUser.Expires,
  221. }
  222. s.d.SetSnsCache(c, mid, parsePlatformStr(platform), proto)
  223. })
  224. return
  225. }
  226. func (s *Service) addSnsQQ(sns *model.AsoAccountSns) (err error) {
  227. unionID, err := s.d.GetUnionIDCache(context.Background(), sns.QQOpenid)
  228. if err != nil || unionID == "" {
  229. unionID, err = s.d.QQUnionID(context.Background(), sns.QQOpenid)
  230. if err != nil || unionID == "" {
  231. return
  232. }
  233. }
  234. tx, err := s.d.BeginSnsTran(context.Background())
  235. if err != nil {
  236. log.Error("s.d.BeginTran error(%+v)", err)
  237. return
  238. }
  239. defer func() {
  240. if err != nil {
  241. if err1 := tx.Rollback(); err1 != nil {
  242. log.Error("tx.Rollback() error(%v)", err1)
  243. }
  244. return
  245. }
  246. if err = tx.Commit(); err != nil {
  247. log.Error("tx.Commit() error(%v)", err)
  248. }
  249. }()
  250. snsUser := &model.SnsUser{
  251. Mid: sns.Mid,
  252. UnionID: unionID,
  253. Platform: model.PlatformQQ,
  254. Expires: sns.QQAccessExpires,
  255. }
  256. if _, err = s.d.TxAddSnsUser(tx, snsUser); err != nil {
  257. switch nErr := errors.Cause(err).(type) {
  258. case *mysql.MySQLError:
  259. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  260. u, e := s.d.SnsUserByMid(context.Background(), snsUser.Mid, snsUser.Platform)
  261. if e != nil {
  262. return e
  263. }
  264. err = nil
  265. if u != nil && u.Mid == snsUser.Mid {
  266. return
  267. }
  268. log.Error("add sns qq duplicate (%+v)", snsUser)
  269. return
  270. }
  271. }
  272. return
  273. }
  274. snsOpenID := &model.SnsOpenID{
  275. Mid: sns.Mid,
  276. OpenID: sns.QQOpenid,
  277. UnionID: unionID,
  278. AppID: model.OldAppID,
  279. Platform: model.PlatformQQ,
  280. }
  281. if _, err = s.d.TxAddSnsOpenID(tx, snsOpenID); err != nil {
  282. return
  283. }
  284. snsToken := &model.SnsToken{
  285. Mid: sns.Mid,
  286. OpenID: sns.QQOpenid,
  287. UnionID: unionID,
  288. Platform: model.PlatformQQ,
  289. Token: sns.QQAccessToken,
  290. Expires: sns.QQAccessExpires,
  291. AppID: model.OldAppID,
  292. }
  293. if _, err = s.d.TxAddSnsToken(tx, snsToken); err != nil {
  294. return
  295. }
  296. proto := &model.SnsProto{
  297. Mid: snsUser.Mid,
  298. Platform: model.PlatformQQ,
  299. UnionID: snsUser.UnionID,
  300. Expires: snsUser.Expires,
  301. }
  302. s.cache.Do(context.Background(), func(c context.Context) {
  303. s.d.SetSnsCache(c, sns.Mid, model.PlatformQQStr, proto)
  304. })
  305. return
  306. }
  307. func (s *Service) addSnsWeibo(sns *model.AsoAccountSns) (err error) {
  308. tx, err := s.d.BeginSnsTran(context.Background())
  309. if err != nil {
  310. log.Error("s.d.BeginTran error(%+v)", err)
  311. return
  312. }
  313. defer func() {
  314. if err != nil {
  315. if err1 := tx.Rollback(); err1 != nil {
  316. log.Error("tx.Rollback() error(%v)", err1)
  317. }
  318. return
  319. }
  320. if err = tx.Commit(); err != nil {
  321. log.Error("tx.Commit() error(%v)", err)
  322. }
  323. }()
  324. snsUser := &model.SnsUser{
  325. Mid: sns.Mid,
  326. UnionID: strconv.FormatInt(sns.SinaUID, 10),
  327. Platform: model.PlatformWEIBO,
  328. Expires: sns.SinaAccessExpires,
  329. }
  330. if _, err = s.d.TxAddSnsUser(tx, snsUser); err != nil {
  331. switch nErr := errors.Cause(err).(type) {
  332. case *mysql.MySQLError:
  333. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  334. u, e := s.d.SnsUserByMid(context.Background(), snsUser.Mid, snsUser.Platform)
  335. if e != nil {
  336. return e
  337. }
  338. err = nil
  339. if u != nil && u.Mid == snsUser.Mid {
  340. return
  341. }
  342. log.Error("add sns weibo duplicate (%+v)", snsUser)
  343. return
  344. }
  345. }
  346. return
  347. }
  348. snsOpenID := &model.SnsOpenID{
  349. Mid: sns.Mid,
  350. OpenID: strconv.FormatInt(sns.SinaUID, 10),
  351. UnionID: strconv.FormatInt(sns.SinaUID, 10),
  352. AppID: model.OldAppID,
  353. Platform: model.PlatformWEIBO,
  354. }
  355. if _, err = s.d.TxAddSnsOpenID(tx, snsOpenID); err != nil {
  356. return
  357. }
  358. snsToken := &model.SnsToken{
  359. Mid: sns.Mid,
  360. OpenID: strconv.FormatInt(sns.SinaUID, 10),
  361. UnionID: strconv.FormatInt(sns.SinaUID, 10),
  362. Platform: model.PlatformWEIBO,
  363. Token: sns.SinaAccessToken,
  364. Expires: sns.SinaAccessExpires,
  365. AppID: model.OldAppID,
  366. }
  367. if _, err = s.d.TxAddSnsToken(tx, snsToken); err != nil {
  368. return
  369. }
  370. proto := &model.SnsProto{
  371. Mid: snsUser.Mid,
  372. Platform: model.PlatformWEIBO,
  373. UnionID: snsUser.UnionID,
  374. Expires: snsUser.Expires,
  375. }
  376. s.cache.Do(context.Background(), func(c context.Context) {
  377. s.d.SetSnsCache(c, sns.Mid, model.PlatformWEIBOStr, proto)
  378. })
  379. return
  380. }
  381. func (s *Service) updateSnsQQ(sns *model.AsoAccountSns) (err error) {
  382. unionID, err := s.d.GetUnionIDCache(context.Background(), sns.QQOpenid)
  383. if err != nil || unionID == "" {
  384. unionID, err = s.d.QQUnionID(context.Background(), sns.QQOpenid)
  385. if err != nil || unionID == "" {
  386. return
  387. }
  388. }
  389. tx, err := s.d.BeginSnsTran(context.Background())
  390. if err != nil {
  391. log.Error("s.d.BeginTran error(%+v)", err)
  392. return
  393. }
  394. defer func() {
  395. if err != nil {
  396. if err1 := tx.Rollback(); err1 != nil {
  397. log.Error("tx.Rollback() error(%v)", err1)
  398. }
  399. return
  400. }
  401. if err = tx.Commit(); err != nil {
  402. log.Error("tx.Commit() error(%v)", err)
  403. }
  404. }()
  405. if _, err = s.d.TxUpdateSnsUser(tx, sns.Mid, sns.QQAccessExpires, unionID, model.PlatformQQ); err != nil {
  406. switch nErr := errors.Cause(err).(type) {
  407. case *mysql.MySQLError:
  408. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  409. u, e := s.d.SnsUserByMid(context.Background(), sns.Mid, model.PlatformQQ)
  410. if e != nil {
  411. return e
  412. }
  413. err = nil
  414. if u != nil && u.Mid == sns.Mid {
  415. return
  416. }
  417. log.Error("update sns qq duplicate, mid(%d) unionID(%s)", sns.Mid, unionID)
  418. return
  419. }
  420. }
  421. return
  422. }
  423. snsToken := &model.SnsToken{
  424. Mid: sns.Mid,
  425. OpenID: sns.QQOpenid,
  426. UnionID: unionID,
  427. Platform: model.PlatformQQ,
  428. Token: sns.QQAccessToken,
  429. Expires: sns.QQAccessExpires,
  430. AppID: model.OldAppID,
  431. }
  432. if _, err = s.d.TxAddSnsToken(tx, snsToken); err != nil {
  433. return
  434. }
  435. proto := &model.SnsProto{
  436. Mid: sns.Mid,
  437. Platform: model.PlatformQQ,
  438. UnionID: unionID,
  439. Expires: sns.QQAccessExpires,
  440. }
  441. s.cache.Do(context.Background(), func(c context.Context) {
  442. s.d.SetSnsCache(c, sns.Mid, model.PlatformQQStr, proto)
  443. })
  444. return
  445. }
  446. func (s *Service) updateSnsWeibo(sns *model.AsoAccountSns) (err error) {
  447. tx, err := s.d.BeginSnsTran(context.Background())
  448. if err != nil {
  449. log.Error("s.d.BeginTran error(%+v)", err)
  450. return
  451. }
  452. defer func() {
  453. if err != nil {
  454. if err1 := tx.Rollback(); err1 != nil {
  455. log.Error("tx.Rollback() error(%v)", err1)
  456. }
  457. return
  458. }
  459. if err = tx.Commit(); err != nil {
  460. log.Error("tx.Commit() error(%v)", err)
  461. }
  462. }()
  463. if _, err = s.d.TxUpdateSnsUser(tx, sns.Mid, sns.SinaAccessExpires, strconv.FormatInt(sns.SinaUID, 10), model.PlatformWEIBO); err != nil {
  464. switch nErr := errors.Cause(err).(type) {
  465. case *mysql.MySQLError:
  466. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  467. u, e := s.d.SnsUserByMid(context.Background(), sns.Mid, model.PlatformWEIBO)
  468. if e != nil {
  469. return e
  470. }
  471. err = nil
  472. if u != nil && u.Mid == sns.Mid {
  473. return
  474. }
  475. log.Error("update sns weibo duplicate, mid(%d) unionID(%s)", sns.Mid, strconv.FormatInt(sns.SinaUID, 10))
  476. return
  477. }
  478. }
  479. return
  480. }
  481. snsToken := &model.SnsToken{
  482. Mid: sns.Mid,
  483. OpenID: strconv.FormatInt(sns.SinaUID, 10),
  484. UnionID: strconv.FormatInt(sns.SinaUID, 10),
  485. Platform: model.PlatformWEIBO,
  486. Token: sns.SinaAccessToken,
  487. Expires: sns.SinaAccessExpires,
  488. AppID: model.OldAppID,
  489. }
  490. if _, err = s.d.TxAddSnsToken(tx, snsToken); err != nil {
  491. return
  492. }
  493. proto := &model.SnsProto{
  494. Mid: sns.Mid,
  495. Platform: model.PlatformWEIBO,
  496. UnionID: strconv.FormatInt(sns.SinaUID, 10),
  497. Expires: sns.SinaAccessExpires,
  498. }
  499. s.cache.Do(context.Background(), func(c context.Context) {
  500. s.d.SetSnsCache(c, sns.Mid, model.PlatformWEIBOStr, proto)
  501. })
  502. return
  503. }
  504. func (s *Service) fullSyncSns() {
  505. var (
  506. start int64
  507. chanNum = int64(s.c.SyncConf.ChanNum)
  508. )
  509. for {
  510. log.Info("fullSyncSns, start %d", start)
  511. res, err := s.d.AsoAccountSns(context.Background(), start)
  512. if err != nil {
  513. log.Error("fail to get AsoAccountSns error(%+v)", err)
  514. time.Sleep(100 * time.Millisecond)
  515. continue
  516. }
  517. for _, a := range res {
  518. s.snsChan[a.Mid%chanNum] <- a
  519. }
  520. if len(res) == 0 {
  521. log.Info("fullSyncSns finished! endID(%d)", start)
  522. break
  523. }
  524. start = res[len(res)-1].Mid
  525. }
  526. }
  527. func (s *Service) fullSyncSnsConsume(c chan *model.AsoAccountSns) {
  528. for {
  529. a, ok := <-c
  530. if !ok {
  531. log.Error("snsChan closed")
  532. return
  533. }
  534. for {
  535. if a.SinaUID != 0 {
  536. if err := s.addSnsWeibo(a); err != nil {
  537. continue
  538. }
  539. }
  540. if a.QQOpenid != "" {
  541. if err := s.addSnsQQ(a); err != nil {
  542. continue
  543. }
  544. }
  545. break
  546. }
  547. }
  548. }
  549. func (s *Service) checkAll() {
  550. ticker := time.NewTicker(time.Duration(s.c.SyncConf.CheckTicker))
  551. for {
  552. s.checkSnsUser()
  553. <-ticker.C
  554. }
  555. }
  556. func (s *Service) checkConsume(c chan *model.AsoAccountSns) {
  557. for {
  558. a, ok := <-c
  559. if !ok {
  560. log.Error("snsChan closed")
  561. return
  562. }
  563. for {
  564. if err := s.checkWeibo(a); err != nil {
  565. time.Sleep(100 * time.Millisecond)
  566. continue
  567. }
  568. if err := s.checkQQ(a); err != nil {
  569. time.Sleep(100 * time.Millisecond)
  570. continue
  571. }
  572. break
  573. }
  574. }
  575. }
  576. func (s *Service) checkSnsUser() {
  577. var (
  578. start int64
  579. chanNum = int64(s.c.SyncConf.ChanNum)
  580. )
  581. for {
  582. log.Info("checkSnsUser, start %d", start)
  583. res, err := s.d.AsoAccountSnsAll(context.Background(), start)
  584. if err != nil {
  585. log.Error("fail to get AsoAccountSns error(%+v)", err)
  586. time.Sleep(100 * time.Millisecond)
  587. continue
  588. }
  589. for _, a := range res {
  590. s.checkChan[a.Mid%chanNum] <- a
  591. }
  592. if len(res) == 0 {
  593. log.Info("checkSnsUser finished! endID(%d)", start)
  594. break
  595. }
  596. start = res[len(res)-1].Mid
  597. }
  598. }
  599. func (s *Service) checkQQ(a *model.AsoAccountSns) (err error) {
  600. sns, err := s.d.SnsUserByMid(context.Background(), a.Mid, model.PlatformQQ)
  601. if err != nil {
  602. return
  603. }
  604. if a.QQOpenid == "" {
  605. if sns != nil {
  606. log.Error("qq not match, old is nil,old(%+v) new(%+v)", a, sns)
  607. if _, err = s.d.DelSnsUser(context.Background(), a.Mid, model.PlatformQQ); err != nil {
  608. return
  609. }
  610. s.cache.Do(context.Background(), func(c context.Context) {
  611. s.d.DelSnsCache(c, a.Mid, model.PlatformQQStr)
  612. })
  613. }
  614. return
  615. }
  616. qqUnionID, err := s.d.QQUnionID(context.Background(), a.QQOpenid)
  617. if err != nil || qqUnionID == "" {
  618. return
  619. }
  620. if sns == nil {
  621. log.Error("qq not match, new not exists, old(%+v), ", a)
  622. if _, err = s.d.AddSnsUser(context.Background(), a.Mid, a.QQAccessExpires, qqUnionID, model.PlatformQQ); err != nil {
  623. switch nErr := errors.Cause(err).(type) {
  624. case *mysql.MySQLError:
  625. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  626. log.Error("checkQQ, add qq duplicate, mid(%d) unionid(%s)", a.Mid, qqUnionID)
  627. err = nil
  628. //var (
  629. // u *model.SnsUser
  630. // aso *model.AsoAccountSns
  631. //)
  632. //if u, err = s.d.SnsUserByUnionID(context.Background(), qqUnionID, model.PlatformQQ); err != nil {
  633. // return
  634. //}
  635. //if _, err = s.d.DelSnsUser(context.Background(), u.Mid, model.PlatformQQ); err != nil {
  636. // return
  637. //}
  638. //if _, err = s.d.AddSnsUser(context.Background(), a.Mid, a.QQAccessExpires, qqUnionID, model.PlatformQQ); err != nil {
  639. // return
  640. //}
  641. //if aso, err = s.d.AsoAccountSnsByMid(context.Background(), u.Mid); err != nil {
  642. // return
  643. //}
  644. //if qqUnionID, err = s.d.QQUnionID(context.Background(), a.QQOpenid); err != nil {
  645. // return
  646. //}
  647. //if _, err = s.d.AddSnsUser(context.Background(), aso.Mid, aso.QQAccessExpires, qqUnionID, model.PlatformQQ); err != nil {
  648. // return
  649. //}
  650. }
  651. }
  652. return
  653. }
  654. } else if sns.UnionID != qqUnionID {
  655. log.Error("qq not match, new(%s), mid(%d) oldOpenID(%s) oldUnionID(%s) ", sns.UnionID, a.Mid, a.QQOpenid, qqUnionID)
  656. if _, err = s.d.UpdateSnsUser(context.Background(), a.Mid, a.QQAccessExpires, qqUnionID, model.PlatformQQ); err != nil {
  657. switch nErr := errors.Cause(err).(type) {
  658. case *mysql.MySQLError:
  659. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  660. log.Error("checkQQ, update qq duplicate, mid(%d) unionid(%s)", a.Mid, qqUnionID)
  661. err = nil
  662. //var (
  663. // u *model.SnsUser
  664. // aso *model.AsoAccountSns
  665. //)
  666. //if u, err = s.d.SnsUserByUnionID(context.Background(), qqUnionID, model.PlatformQQ); err != nil {
  667. // return
  668. //}
  669. //if _, err = s.d.DelSnsUser(context.Background(), u.Mid, model.PlatformQQ); err != nil {
  670. // return
  671. //}
  672. //if _, err = s.d.UpdateSnsUser(context.Background(), a.Mid, a.QQAccessExpires, qqUnionID, model.PlatformQQ); err != nil {
  673. // return
  674. //}
  675. //if aso, err = s.d.AsoAccountSnsByMid(context.Background(), u.Mid); err != nil {
  676. // return
  677. //}
  678. //if qqUnionID, err = s.d.QQUnionID(context.Background(), a.QQOpenid); err != nil {
  679. // return
  680. //}
  681. //if _, err = s.d.AddSnsUser(context.Background(), aso.Mid, aso.QQAccessExpires, qqUnionID, model.PlatformQQ); err != nil {
  682. // return
  683. //}
  684. }
  685. }
  686. return
  687. }
  688. } else if sns.Expires != a.QQAccessExpires {
  689. log.Error("qq expires not match, new(%+v), old(%+v)", sns, a)
  690. if err = s.updateSns(a.Mid, a.QQAccessExpires, a.QQOpenid, a.QQAccessToken, model.PlatformQQ); err != nil {
  691. return
  692. }
  693. return
  694. }
  695. s.cache.Do(context.Background(), func(c context.Context) {
  696. proto := &model.SnsProto{
  697. Mid: a.Mid,
  698. Platform: int32(model.PlatformQQ),
  699. UnionID: qqUnionID,
  700. Expires: a.QQAccessExpires,
  701. }
  702. s.d.SetSnsCache(c, a.Mid, model.PlatformQQStr, proto)
  703. })
  704. return
  705. }
  706. func (s *Service) checkWeibo(a *model.AsoAccountSns) (err error) {
  707. sns, err := s.d.SnsUserByMid(context.Background(), a.Mid, model.PlatformWEIBO)
  708. if err != nil {
  709. return
  710. }
  711. if a.SinaUID == 0 {
  712. if sns != nil {
  713. log.Error("weibo not match, old is nil,old(%+v) new(%+v)", a, sns)
  714. if _, err = s.d.DelSnsUser(context.Background(), a.Mid, model.PlatformWEIBO); err != nil {
  715. return
  716. }
  717. s.cache.Do(context.Background(), func(c context.Context) {
  718. s.d.DelSnsCache(c, a.Mid, model.PlatformWEIBOStr)
  719. })
  720. }
  721. return
  722. }
  723. if sns == nil {
  724. log.Error("weibo not match, new not exists, old(%+v)", a)
  725. if _, err = s.d.AddSnsUser(context.Background(), a.Mid, a.SinaAccessExpires, strconv.FormatInt(a.SinaUID, 10), model.PlatformWEIBO); err != nil {
  726. switch nErr := errors.Cause(err).(type) {
  727. case *mysql.MySQLError:
  728. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  729. log.Error("checkWeibo, add weibo duplicate, mid(%d) unionid(%s)", a.Mid, strconv.FormatInt(a.SinaUID, 10))
  730. err = nil
  731. //var (
  732. // u *model.SnsUser
  733. // aso *model.AsoAccountSns
  734. //)
  735. //if u, err = s.d.SnsUserByUnionID(context.Background(), strconv.FormatInt(a.SinaUID, 10), model.PlatformWEIBO); err != nil {
  736. // return
  737. //}
  738. //if _, err = s.d.DelSnsUser(context.Background(), u.Mid, model.PlatformWEIBO); err != nil {
  739. // return
  740. //}
  741. //if _, err = s.d.AddSnsUser(context.Background(), a.Mid, a.SinaAccessExpires, strconv.FormatInt(a.SinaUID, 10), model.PlatformWEIBO); err != nil {
  742. // return
  743. //}
  744. //if aso, err = s.d.AsoAccountSnsByMid(context.Background(), u.Mid); err != nil {
  745. // return
  746. //}
  747. //if _, err = s.d.AddSnsUser(context.Background(), aso.Mid, aso.SinaAccessExpires, strconv.FormatInt(aso.SinaUID, 10), model.PlatformWEIBO); err != nil {
  748. // return
  749. //}
  750. }
  751. }
  752. return
  753. }
  754. } else if sns.UnionID != strconv.FormatInt(a.SinaUID, 10) {
  755. log.Error("weibo not match, new(%s), mid(%d) old(%d)", sns.UnionID, a.Mid, a.SinaUID)
  756. if _, err = s.d.UpdateSnsUser(context.Background(), a.Mid, a.SinaAccessExpires, strconv.FormatInt(a.SinaUID, 10), model.PlatformWEIBO); err != nil {
  757. switch nErr := errors.Cause(err).(type) {
  758. case *mysql.MySQLError:
  759. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  760. log.Error("checkWeibo, update weibo duplicate, mid(%d) unionid(%s)", a.Mid, strconv.FormatInt(a.SinaUID, 10))
  761. err = nil
  762. //var (
  763. // u *model.SnsUser
  764. // aso *model.AsoAccountSns
  765. //)
  766. //if u, err = s.d.SnsUserByUnionID(context.Background(), strconv.FormatInt(a.SinaUID, 10), model.PlatformWEIBO); err != nil {
  767. // return
  768. //}
  769. //if _, err = s.d.DelSnsUser(context.Background(), u.Mid, model.PlatformWEIBO); err != nil {
  770. // return
  771. //}
  772. //if _, err = s.d.UpdateSnsUser(context.Background(), a.Mid, a.SinaAccessExpires, strconv.FormatInt(a.SinaUID, 10), model.PlatformWEIBO); err != nil {
  773. // return
  774. //}
  775. //if aso, err = s.d.AsoAccountSnsByMid(context.Background(), u.Mid); err != nil {
  776. // return
  777. //}
  778. //if _, err = s.d.AddSnsUser(context.Background(), aso.Mid, aso.SinaAccessExpires, strconv.FormatInt(aso.SinaUID, 10), model.PlatformWEIBO); err != nil {
  779. // return
  780. //}
  781. }
  782. }
  783. return
  784. }
  785. } else if sns.Expires != a.SinaAccessExpires {
  786. log.Error("weibo expires not match, new(%+v), old(%+v)", sns, a)
  787. if err = s.updateSns(a.Mid, a.SinaAccessExpires, strconv.FormatInt(a.SinaUID, 10), a.SinaAccessToken, model.PlatformWEIBO); err != nil {
  788. return
  789. }
  790. return
  791. }
  792. s.cache.Do(context.Background(), func(c context.Context) {
  793. proto := &model.SnsProto{
  794. Mid: a.Mid,
  795. Platform: int32(model.PlatformWEIBO),
  796. UnionID: strconv.FormatInt(a.SinaUID, 10),
  797. Expires: a.SinaAccessExpires,
  798. }
  799. s.d.SetSnsCache(c, a.Mid, model.PlatformWEIBOStr, proto)
  800. })
  801. return
  802. }