auth.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package service
  2. import (
  3. "encoding/base64"
  4. "encoding/hex"
  5. "encoding/json"
  6. "strings"
  7. "go-common/app/job/main/identify/model"
  8. mdl "go-common/app/service/main/identify/model"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. )
  12. func (s *Service) new(msg *databus.Message) (interface{}, error) {
  13. bmsg := new(model.BMsg)
  14. if err := json.Unmarshal(msg.Value, bmsg); err != nil {
  15. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  16. return nil, err
  17. }
  18. if bmsg.Action != "delete" {
  19. return bmsg, nil
  20. }
  21. if strings.HasPrefix(bmsg.Table, "user_token_") {
  22. t := new(model.AuthToken)
  23. if err := json.Unmarshal(bmsg.New, t); err != nil {
  24. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  25. return nil, err
  26. }
  27. log.Info("identifyconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset)
  28. return t, nil
  29. } else if strings.HasPrefix(bmsg.Table, "user_cookie_") {
  30. t := new(model.AuthCookie)
  31. if err := json.Unmarshal(bmsg.New, t); err != nil {
  32. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  33. return nil, err
  34. }
  35. log.Info("identifyconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset)
  36. return t, nil
  37. }
  38. return bmsg, nil
  39. }
  40. func (s *Service) spilt(msg *databus.Message, data interface{}) int {
  41. switch t := data.(type) {
  42. case *model.AuthToken:
  43. return int(t.Mid)
  44. case *model.AuthCookie:
  45. return int(t.Mid)
  46. default:
  47. return 0
  48. }
  49. }
  50. func (s *Service) processAuthBinlog2(bmsgs []interface{}) {
  51. for _, msg := range bmsgs {
  52. switch t := msg.(type) {
  53. case *model.AuthToken:
  54. var (
  55. bytes []byte
  56. err error
  57. )
  58. if bytes, err = base64.StdEncoding.DecodeString(t.Token); err != nil {
  59. log.Error("cleanCookieCache base64 decode err %v", err)
  60. err = nil
  61. return
  62. }
  63. info := &mdl.IdentifyInfo{
  64. Mid: t.Mid,
  65. Expires: int32(t.Expires),
  66. }
  67. if ok := isGameAppID(t.AppID); ok {
  68. continue
  69. }
  70. for {
  71. log.Info("auth service process token databus, key(%s)", hex.EncodeToString(bytes))
  72. if err := s.processIdentify("delete", hex.EncodeToString(bytes), info); err != nil {
  73. continue
  74. }
  75. break
  76. }
  77. case *model.AuthCookie:
  78. var (
  79. bytes []byte
  80. bytesCSRF []byte
  81. err error
  82. )
  83. if bytes, err = base64.StdEncoding.DecodeString(t.Session); err != nil {
  84. log.Error("cleanCookieCache base64 decode err %v", err)
  85. err = nil
  86. return
  87. }
  88. if bytesCSRF, err = base64.StdEncoding.DecodeString(t.CSRF); err != nil {
  89. log.Error("cleanCookieCache base64 decode err %v", err)
  90. err = nil
  91. return
  92. }
  93. info := &mdl.IdentifyInfo{
  94. Mid: t.Mid,
  95. Csrf: hex.EncodeToString(bytesCSRF),
  96. Expires: int32(t.Expires),
  97. }
  98. for {
  99. log.Info("auth service process cookie databus, key(%s)", string(bytes))
  100. if err := s.processIdentify("delete", string(bytes), info); err != nil {
  101. continue
  102. }
  103. break
  104. }
  105. default:
  106. return
  107. }
  108. }
  109. }