identify.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/identify/model"
  8. mdl "go-common/app/service/main/identify/model"
  9. "go-common/library/cache/memcache"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. type actionToken struct {
  14. model.Token
  15. Action string
  16. }
  17. type actionCookie struct {
  18. model.Cookie
  19. Action string
  20. }
  21. func (s *Service) identifyNew(msg *databus.Message) (interface{}, error) {
  22. bmsg := new(model.BMsg)
  23. if err := json.Unmarshal(msg.Value, bmsg); err != nil {
  24. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  25. return nil, err
  26. }
  27. log.Info("identify service process databus message, table(%s)", bmsg.Table)
  28. if strings.HasPrefix(bmsg.Table, _tokenTable) {
  29. t := new(actionToken)
  30. if err := json.Unmarshal(bmsg.New, t); err != nil {
  31. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  32. return nil, err
  33. }
  34. log.Info("process databus message, table(%s), action(%s)", bmsg.Table, bmsg.Action)
  35. t.Action = bmsg.Action
  36. return t, nil
  37. }
  38. if strings.HasPrefix(bmsg.Table, _cookieTable) {
  39. t := new(actionCookie)
  40. if err := json.Unmarshal(bmsg.New, t); err != nil {
  41. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  42. return nil, err
  43. }
  44. log.Info("process databus message, table(%s), action(%s)", bmsg.Table, bmsg.Action)
  45. t.Action = bmsg.Action
  46. return t, nil
  47. }
  48. return bmsg, nil
  49. }
  50. func (s *Service) identifySplit(msg *databus.Message, data interface{}) int {
  51. mergeNum := int64(s.c.Databusutil.Num)
  52. mid := int64(0)
  53. switch t := data.(type) {
  54. case *model.Cookie:
  55. mid = t.Mid
  56. case *model.Token:
  57. mid = t.Mid
  58. }
  59. return int(mid % mergeNum)
  60. }
  61. func (s *Service) processIdentifyInfo(bmsgs []interface{}) {
  62. for _, msg := range bmsgs {
  63. switch t := msg.(type) {
  64. case *actionToken:
  65. info := &mdl.IdentifyInfo{
  66. Mid: t.Mid,
  67. Expires: t.Expires,
  68. }
  69. if ok := isGameAppID(t.APPID); ok {
  70. continue
  71. }
  72. s.processIdentify(t.Action, t.AccessToken, info)
  73. case *actionCookie:
  74. info := &mdl.IdentifyInfo{
  75. Mid: t.Mid,
  76. Csrf: t.CSRFToken,
  77. Expires: t.ExpireTime,
  78. }
  79. s.processIdentify(t.Action, t.SessionData, info)
  80. }
  81. }
  82. }
  83. func (s *Service) processIdentify(action, key string, info *mdl.IdentifyInfo) (err error) {
  84. if err = s.processMc(action, key, info); err != nil {
  85. return
  86. }
  87. log.Info("identify process action(%s) mid(%d) csrf(%s) key(%s) success", action, info.Mid, info.Csrf, key[:8])
  88. return
  89. }
  90. // processMc .
  91. func (s *Service) processMc(action, k string, info *mdl.IdentifyInfo) (err error) {
  92. if len(s.poolm) == 0 {
  93. return
  94. }
  95. if _insertAction == action {
  96. expire := info.Expires
  97. if expire < int32(time.Now().Unix()) {
  98. log.Error("identify expire error(%d,%d)", info.Expires, time.Now().Unix())
  99. return
  100. }
  101. for name, p := range s.poolm {
  102. mcc, ok := s.c.Memcaches[name]
  103. if !ok || mcc == nil {
  104. return
  105. }
  106. key := mcc.Prefix + k
  107. conn := p.Get(context.Background())
  108. err = conn.Set(&memcache.Item{Key: key, Object: info, Flags: memcache.FlagProtobuf, Expiration: expire})
  109. conn.Close()
  110. if err != nil {
  111. log.Error("old identify set error(%s,%d,%v)", key, info.Expires, err)
  112. err = nil
  113. }
  114. }
  115. return
  116. }
  117. if _delteAction == action {
  118. for name, p := range s.poolm {
  119. mcc, ok := s.c.Memcaches[name]
  120. if !ok || mcc == nil {
  121. return
  122. }
  123. key := mcc.Prefix + k
  124. // if delete failed, retry until success
  125. for {
  126. conn := p.Get(context.Background())
  127. err := conn.Delete(key)
  128. if err == nil || err == memcache.ErrNotFound {
  129. conn.Close()
  130. break
  131. }
  132. log.Error("dao.DelCache(%s) error(%v)", key, err)
  133. conn.Close()
  134. time.Sleep(time.Second)
  135. }
  136. }
  137. }
  138. return
  139. }
  140. func isGameAppID(appid int64) (res bool) {
  141. for i := 0; i < len(_gameAppID); i++ {
  142. if _gameAppID[i] == appid {
  143. return true
  144. }
  145. }
  146. return false
  147. }