123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package service
- import (
- "context"
- "encoding/json"
- "strconv"
- "strings"
- "time"
- "go-common/app/job/main/passport/model"
- igmdl "go-common/app/service/main/identify-game/model"
- "go-common/library/log"
- "go-common/library/queue/databus"
- )
- const (
- _changePwd = "changePwd"
- _retryCount = 3
- _retryDuration = time.Second
- )
- func (s *Service) tokenconsumeproc() {
- mergeNum := s.c.Group.AsoBinLog.Num
- var (
- err error
- n int
- msgs = s.dsToken.Messages()
- )
- for {
- msg, ok := <-msgs
- if !ok {
- log.Error("s.tokenconsumeproc closed")
- return
- }
- // marked head to first commit
- m := &message{data: msg}
- if n, err = strconv.Atoi(msg.Key); err != nil {
- log.Error("strconv.Atoi(%s) error(%v)", msg.Key, err)
- continue
- }
- s.mu.Lock()
- if s.head == nil {
- s.head = m
- s.last = m
- } else {
- s.last.next = m
- s.last = m
- }
- s.mu.Unlock()
- // use specify goroutine to merge messages
- s.tokenMergeChans[n%mergeNum] <- m
- log.Info("tokenconsumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
- }
- }
- func (s *Service) tokencommitproc() {
- commits := make(map[int32]*databus.Message, s.c.Group.AsoBinLog.Size)
- for {
- done := <-s.tokenDoneChan
- // merge partitions to commit offset
- for _, d := range done {
- d.done = true
- }
- s.mu.Lock()
- for ; s.head != nil && s.head.done; s.head = s.head.next {
- commits[s.head.data.Partition] = s.head.data
- }
- s.mu.Unlock()
- for k, m := range commits {
- log.Info("tokencommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset)
- m.Commit()
- delete(commits, k)
- }
- }
- }
- func (s *Service) tokenmergeproc(c chan *message) {
- var (
- err error
- max = s.c.Group.AsoBinLog.Size
- merges = make([]*model.AccessInfo, 0, max)
- marked = make([]*message, 0, max)
- ticker = time.NewTicker(time.Duration(s.c.Group.AsoBinLog.Ticker))
- )
- for {
- select {
- case msg, ok := <-c:
- if !ok {
- log.Error("s.tokenmergeproc closed")
- return
- }
- bmsg := &model.BMsg{}
- if err = json.Unmarshal(msg.data.Value, bmsg); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", string(msg.data.Value), err)
- continue
- }
- if bmsg.Action == "delete" && strings.HasPrefix(bmsg.Table, "aso_app_perm") {
- t := &model.AccessInfo{}
- if err = json.Unmarshal(bmsg.New, t); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
- continue
- }
- merges = append(merges, t)
- }
- marked = append(marked, msg)
- if len(marked) < max && len(merges) < max {
- continue
- }
- case <-ticker.C:
- }
- if len(merges) > 0 {
- s.cleanTokens(merges)
- merges = make([]*model.AccessInfo, 0, max)
- }
- if len(marked) > 0 {
- s.tokenDoneChan <- marked
- marked = make([]*message, 0, max)
- }
- }
- }
- // cleanTokens clean tokens.
- func (s *Service) cleanTokens(tokens []*model.AccessInfo) {
- for _, token := range tokens {
- s.cleanToken(token)
- }
- }
- // cleanToken to notify other clean access token.
- func (s *Service) cleanToken(token *model.AccessInfo) (err error) {
- if token == nil || token.Expires < time.Now().Unix() {
- return
- }
- isGame := false
- for _, id := range s.gameAppIDs {
- if id == token.AppID {
- isGame = true
- break
- }
- }
- if !isGame {
- return
- }
- for {
- if err = s.d.DelCache(context.TODO(), token.Token); err == nil {
- break
- }
- time.Sleep(_retryDuration)
- }
- for i := 0; i < _retryCount; i++ {
- arg := &igmdl.CleanCacheArgs{
- Token: token.Token,
- Mid: token.Mid,
- }
- if err = s.igRPC.DelCache(context.TODO(), arg); err == nil {
- break
- }
- log.Error("service.identifyGameRPC.DelCache(%+v) error(%v)", arg, err)
- time.Sleep(_retryDuration)
- }
- for i := 0; i < _retryCount; i++ {
- if err = s.d.NotifyGame(token, _changePwd); err == nil {
- return
- }
- time.Sleep(_retryDuration)
- }
- log.Error("notify err, token(%+v)", token)
- return
- }
|