123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- package service
- import (
- "context"
- "time"
- "go-common/app/job/main/member/model/queue"
- "go-common/library/log"
- )
- // Item is
- type Item struct {
- Mid int64
- Time time.Time
- Action string
- }
- // Compare is
- func (i *Item) Compare(other queue.Item) int {
- o := asItem(other)
- if o == nil {
- return -1
- }
- if i.Time.Equal(o.Time) {
- return 0
- }
- if i.Time.After(o.Time) {
- return 1
- }
- return -1
- }
- // HashCode is
- func (i *Item) HashCode() int64 {
- return i.Mid
- }
- func asItem(in queue.Item) *Item {
- o, ok := in.(*Item)
- if !ok {
- return nil
- }
- return o
- }
- func asItems(in []queue.Item) []*Item {
- out := make([]*Item, 0, len(in))
- for _, i := range in {
- item := asItem(i)
- if item == nil {
- continue
- }
- out = append(out, item)
- }
- return out
- }
- func (s *Service) cachedelayproc(ctx context.Context) {
- fiveSeconds := time.Second * 5
- t := time.NewTicker(fiveSeconds)
- delayed := func(t time.Time) bool {
- top := asItem(s.cachepq.Peek())
- if top == nil {
- log.Info("Empty cache queue top at: %v", t)
- return false
- }
- if t.Sub(top.Time) < fiveSeconds {
- log.Info("Top item is in five seconds, skip and waiting for next tick")
- return false
- }
- return true
- }
- for ti := range t.C {
- if !delayed(ti) {
- continue
- }
- for {
- qitems, err := s.cachepq.Get(1)
- if err != nil {
- log.Error("Failed to get queue items from cache queue: %+v", err)
- return
- }
- items := asItems(qitems)
- for _, it := range items {
- log.Info("Notify purge cache in delay queue with mid: %d", it.Mid)
- s.dao.NotifyPurgeCache(ctx, it.Mid, it.Action)
- }
- if s.cachepq.Empty() || !delayed(time.Now()) {
- break
- }
- }
- }
- }
|