syn_auth.go 7.1 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "encoding/json"
  6. "strings"
  7. "time"
  8. "go-common/app/job/main/passport-auth/model"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. xtime "go-common/library/time"
  12. )
  13. var (
  14. local, _ = time.LoadLocation("Local")
  15. )
  16. type tokenBMsg struct {
  17. Action string
  18. Table string
  19. New *model.OldToken
  20. }
  21. type cookieBMsg struct {
  22. Action string
  23. Table string
  24. New *model.OldCookie
  25. }
  26. func (s *Service) consumeproc() {
  27. // fill callbacks
  28. s.g.New = func(msg *databus.Message) (res interface{}, err error) {
  29. bmsg := new(model.BMsg)
  30. if err = json.Unmarshal(msg.Value, &bmsg); err != nil {
  31. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  32. return
  33. }
  34. log.Info("receive aso msg action(%s) table(%s) key(%s) partition(%d) offset(%d) timestamp(%d) New(%s) Old(%s)",
  35. bmsg.Action, bmsg.Table, msg.Key, msg.Partition, msg.Offset, msg.Timestamp, string(bmsg.New), string(bmsg.Old))
  36. if strings.HasPrefix(bmsg.Table, "aso_app_perm") {
  37. tokenBMsg := &tokenBMsg{
  38. Action: bmsg.Action,
  39. Table: bmsg.Table,
  40. }
  41. newToken := new(model.OldToken)
  42. if err = json.Unmarshal(bmsg.New, &newToken); err != nil {
  43. log.Error("json.Unmarshal(%s) error(%v)", bmsg.New, err)
  44. return
  45. }
  46. tokenBMsg.New = newToken
  47. return tokenBMsg, nil
  48. } else if strings.HasPrefix(bmsg.Table, "aso_cookie_token") {
  49. cookieBMsg := &cookieBMsg{
  50. Action: bmsg.Action,
  51. Table: bmsg.Table,
  52. }
  53. newCookie := new(model.OldCookie)
  54. if err = json.Unmarshal(bmsg.New, newCookie); err != nil {
  55. log.Error("json.Unmarshal(%s) error(%v)", bmsg.New, err)
  56. return
  57. }
  58. cookieBMsg.New = newCookie
  59. return cookieBMsg, nil
  60. }
  61. return
  62. }
  63. s.g.Split = func(msg *databus.Message, data interface{}) int {
  64. if t, ok := data.(*tokenBMsg); ok {
  65. return int(t.New.Mid)
  66. } else if t, ok := data.(*cookieBMsg); ok {
  67. return int(t.New.Mid)
  68. }
  69. return 0
  70. }
  71. s.g.Do = func(msgs []interface{}) {
  72. for _, m := range msgs {
  73. if msg, ok := m.(*tokenBMsg); ok {
  74. for {
  75. if err := s.handleToken(msg); err != nil {
  76. log.Error("do handleToken error", err)
  77. time.Sleep(100 * time.Millisecond)
  78. continue
  79. }
  80. break
  81. }
  82. } else if msg, ok := m.(*cookieBMsg); ok {
  83. for {
  84. if err := s.handleCookie(msg); err != nil {
  85. log.Error("do handleCookie error", err)
  86. time.Sleep(100 * time.Millisecond)
  87. continue
  88. }
  89. break
  90. }
  91. }
  92. }
  93. }
  94. // start the group
  95. s.g.Start()
  96. }
  97. func (s *Service) handleCookie(cookie *cookieBMsg) (err error) {
  98. newCookie := &model.Cookie{
  99. Mid: cookie.New.Mid,
  100. Session: cookie.New.Session,
  101. CSRF: cookie.New.CSRFToken,
  102. Expires: cookie.New.Expires,
  103. }
  104. csrfByte, _ := hex.DecodeString(cookie.New.CSRFToken)
  105. if strings.ToLower(cookie.Action) == "insert" {
  106. if _, err = s.dao.AddCookie(context.Background(), newCookie, []byte(newCookie.Session), csrfByte, time.Now()); err != nil {
  107. return
  108. }
  109. return
  110. } else if strings.ToLower(cookie.Action) == "delete" {
  111. now := time.Now()
  112. if _, err = s.dao.DelCookie(context.Background(), []byte(cookie.New.Session), now); err != nil {
  113. log.Error("del db cookie(%s) error(%+v)", cookie.New.Session, err)
  114. return
  115. }
  116. if _, err = s.dao.DelCookie(context.Background(), []byte(cookie.New.Session), previousMonth(now, -1)); err != nil {
  117. log.Error("del db cookie(%s) error(%+v)", cookie.New.Session, err)
  118. return
  119. }
  120. if newCookie.Expires > now.Unix() {
  121. var t time.Time
  122. if cookie.New.ModifyTime == "0000-00-00 00:00:00" {
  123. t = time.Now()
  124. } else {
  125. t, err = time.ParseInLocation("2006-01-02 15:04:05", cookie.New.ModifyTime, local)
  126. if err != nil {
  127. log.Error("handleCookie error: ctime parse error(%+v) cookie(%+v)", err, cookie.New)
  128. return
  129. }
  130. }
  131. timestamp := xtime.Time(t.Unix())
  132. newCookie.Ctime = timestamp
  133. if _, err = s.dao.AddCookieDeleted(context.Background(), newCookie, []byte(newCookie.Session), csrfByte, t); err != nil {
  134. return
  135. }
  136. }
  137. if err = s.authRPC.DelCookieCookie(context.Background(), cookie.New.Session); err != nil {
  138. log.Error("del cache cookie(%s) error(%+v)", cookie.New.Session, err)
  139. }
  140. if err = s.dao.AsoCleanCache(context.Background(), "", cookie.New.Session, cookie.New.Mid); err != nil {
  141. return
  142. }
  143. }
  144. return
  145. }
  146. func (s *Service) handleToken(token *tokenBMsg) (err error) {
  147. var t time.Time
  148. t, err = time.ParseInLocation("2006-01-02 15:04:05", token.New.CTime, local)
  149. if err != nil {
  150. log.Error("handleToken error: ctime parse err. (%+v)", token.New)
  151. return
  152. }
  153. timestamp := xtime.Time(t.Unix())
  154. newToken := &model.Token{
  155. Mid: token.New.Mid,
  156. Token: token.New.AccessToken,
  157. AppID: token.New.AppID,
  158. Expires: token.New.Expires,
  159. Type: token.New.Type,
  160. Ctime: timestamp,
  161. }
  162. tokenByte, _ := hex.DecodeString(newToken.Token)
  163. if strings.ToLower(token.Action) == "insert" {
  164. if _, err = s.dao.AddToken(context.Background(), newToken, tokenByte, t); err != nil {
  165. return
  166. }
  167. if token.New.RefreshToken == "" {
  168. return
  169. }
  170. newRefresh := &model.Refresh{
  171. Mid: token.New.Mid,
  172. Refresh: token.New.RefreshToken,
  173. Token: token.New.AccessToken,
  174. AppID: token.New.AppID,
  175. Expires: token.New.Expires, // 需要考虑+30天
  176. }
  177. tokenByteRefresh, _ := hex.DecodeString(newRefresh.Token)
  178. refreshByte, _ := hex.DecodeString(newRefresh.Refresh)
  179. if _, err = s.dao.AddRefresh(context.Background(), newRefresh, refreshByte, tokenByteRefresh, t); err != nil {
  180. return
  181. }
  182. } else if strings.ToLower(token.Action) == "delete" {
  183. now := time.Now()
  184. if _, err = s.dao.DelToken(context.Background(), tokenByte, now); err != nil {
  185. log.Error("del db token(%s) error(%+v)", token.New.AccessToken, err)
  186. return
  187. }
  188. if _, err = s.dao.DelToken(context.Background(), tokenByte, previousMonth(now, -1)); err != nil {
  189. log.Error("del db token(%s) error(%+v)", token.New.AccessToken, err)
  190. return
  191. }
  192. if _, err = s.dao.DelToken(context.Background(), tokenByte, previousMonth(now, -2)); err != nil {
  193. log.Error("del db token(%s) error(%+v)", token.New.AccessToken, err)
  194. return
  195. }
  196. if _, err = s.dao.AddTokenDeleted(context.Background(), newToken, tokenByte, t); err != nil {
  197. return
  198. }
  199. if err = s.authRPC.DelTokenCache(context.Background(), token.New.AccessToken); err != nil {
  200. log.Error("del cache token(%s) error(%+v)", token.New.AccessToken, err)
  201. }
  202. if err = s.dao.AsoCleanCache(context.Background(), token.New.AccessToken, "", token.New.Mid); err != nil {
  203. return
  204. }
  205. if token.New.RefreshToken == "" {
  206. return
  207. }
  208. refreshByte, _ := hex.DecodeString(token.New.RefreshToken)
  209. if _, err = s.dao.DelRefresh(context.Background(), refreshByte, t); err != nil {
  210. log.Error("del db refresh(%s) error(%+v)", token.New.RefreshToken, err)
  211. return
  212. }
  213. if _, err = s.dao.DelRefresh(context.Background(), refreshByte, previousMonth(t, -2)); err != nil {
  214. log.Error("del db refresh(%s) error(%+v)", token.New.RefreshToken, err)
  215. return
  216. }
  217. }
  218. return
  219. }
  220. func previousMonth(t time.Time, delta int) time.Time {
  221. if delta == 0 {
  222. return t
  223. }
  224. year, month, _ := t.Date()
  225. thisMonthFirstDay := time.Date(year, month, 1, 1, 1, 1, 1, t.Location())
  226. return thisMonthFirstDay.AddDate(0, delta, 0)
  227. }