123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- package service
- import (
- "encoding/base64"
- "encoding/hex"
- "encoding/json"
- "strings"
- "go-common/app/job/main/identify/model"
- mdl "go-common/app/service/main/identify/model"
- "go-common/library/log"
- "go-common/library/queue/databus"
- )
- func (s *Service) new(msg *databus.Message) (interface{}, error) {
- bmsg := new(model.BMsg)
- if err := json.Unmarshal(msg.Value, bmsg); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
- return nil, err
- }
- if bmsg.Action != "delete" {
- return bmsg, nil
- }
- if strings.HasPrefix(bmsg.Table, "user_token_") {
- t := new(model.AuthToken)
- if err := json.Unmarshal(bmsg.New, t); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
- return nil, err
- }
- log.Info("identifyconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset)
- return t, nil
- } else if strings.HasPrefix(bmsg.Table, "user_cookie_") {
- t := new(model.AuthCookie)
- if err := json.Unmarshal(bmsg.New, t); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
- return nil, err
- }
- log.Info("identifyconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset)
- return t, nil
- }
- return bmsg, nil
- }
- func (s *Service) spilt(msg *databus.Message, data interface{}) int {
- switch t := data.(type) {
- case *model.AuthToken:
- return int(t.Mid)
- case *model.AuthCookie:
- return int(t.Mid)
- default:
- return 0
- }
- }
- func (s *Service) processAuthBinlog2(bmsgs []interface{}) {
- for _, msg := range bmsgs {
- switch t := msg.(type) {
- case *model.AuthToken:
- var (
- bytes []byte
- err error
- )
- if bytes, err = base64.StdEncoding.DecodeString(t.Token); err != nil {
- log.Error("cleanCookieCache base64 decode err %v", err)
- err = nil
- return
- }
- info := &mdl.IdentifyInfo{
- Mid: t.Mid,
- Expires: int32(t.Expires),
- }
- if ok := isGameAppID(t.AppID); ok {
- continue
- }
- for {
- log.Info("auth service process token databus, key(%s)", hex.EncodeToString(bytes))
- if err := s.processIdentify("delete", hex.EncodeToString(bytes), info); err != nil {
- continue
- }
- break
- }
- case *model.AuthCookie:
- var (
- bytes []byte
- bytesCSRF []byte
- err error
- )
- if bytes, err = base64.StdEncoding.DecodeString(t.Session); err != nil {
- log.Error("cleanCookieCache base64 decode err %v", err)
- err = nil
- return
- }
- if bytesCSRF, err = base64.StdEncoding.DecodeString(t.CSRF); err != nil {
- log.Error("cleanCookieCache base64 decode err %v", err)
- err = nil
- return
- }
- info := &mdl.IdentifyInfo{
- Mid: t.Mid,
- Csrf: hex.EncodeToString(bytesCSRF),
- Expires: int32(t.Expires),
- }
- for {
- log.Info("auth service process cookie databus, key(%s)", string(bytes))
- if err := s.processIdentify("delete", string(bytes), info); err != nil {
- continue
- }
- break
- }
- default:
- return
- }
- }
- }
|