123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "strconv"
- "sync"
- "time"
- dm2rpc "go-common/app/interface/main/dm2/rpc/client"
- "go-common/app/job/main/archive/conf"
- "go-common/app/job/main/archive/dao/archive"
- "go-common/app/job/main/archive/dao/email"
- "go-common/app/job/main/archive/dao/monitor"
- "go-common/app/job/main/archive/dao/reply"
- "go-common/app/job/main/archive/dao/result"
- dbusmdl "go-common/app/job/main/archive/model/databus"
- resmdl "go-common/app/job/main/archive/model/result"
- "go-common/app/job/main/archive/model/retry"
- accgrpc "go-common/app/service/main/account/api"
- arcrpc "go-common/app/service/main/archive/api/gorpc"
- xredis "go-common/library/cache/redis"
- "go-common/library/log"
- "go-common/library/queue/databus"
- )
- // Service service
- type Service struct {
- c *conf.Config
- closeRetry bool
- closeSub bool
- archiveDao *archive.Dao
- emailDao *email.Dao
- monitorDao *monitor.Dao
- replyDao *reply.Dao
- resultDao *result.Dao
- redis *xredis.Pool
- waiter sync.WaitGroup
- videoupSub *databus.Databus
- archiveResultPub *databus.Databus
- dmPub *databus.Databus
- dmSub *databus.Databus
- cacheSub *databus.Databus
- accountNotifySub *databus.Databus
- sfTpsCache map[int16]int16
- adtTpsCache map[int16]struct{}
- arcServices []*arcrpc.Service2
- accGRPC accgrpc.AccountClient
- dm2RPC *dm2rpc.Service
- // databus channel
- videoupAids []chan int64
- pgcAids chan int64
- // dm count
- dmCids map[int64]struct{}
- dmMu sync.Mutex
- notifyMid map[int64]struct{}
- notifyMu sync.Mutex
- }
- // New is archive service implementation.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- archiveDao: archive.New(c),
- emailDao: email.New(c),
- monitorDao: monitor.New(c),
- replyDao: reply.New(c),
- resultDao: result.New(c),
- dm2RPC: dm2rpc.New(c.Dm2RPC),
- videoupSub: databus.New(c.VideoupSub),
- dmSub: databus.New(c.DmSub),
- dmPub: databus.New(c.DmPub),
- archiveResultPub: databus.New(c.ArchiveResultPub),
- cacheSub: databus.New(c.CacheSub),
- accountNotifySub: databus.New(c.AccountNotifySub),
- redis: xredis.NewPool(c.Redis),
- pgcAids: make(chan int64, 1024),
- dmCids: make(map[int64]struct{}),
- notifyMid: make(map[int64]struct{}),
- arcServices: make([]*arcrpc.Service2, 0),
- }
- var err error
- if s.accGRPC, err = accgrpc.NewClient(nil); err != nil {
- panic(fmt.Sprintf("account.service grpc not found!!!!!!!!!!!! error(%v)", err))
- }
- for _, sc := range s.c.ArchiveServices {
- s.arcServices = append(s.arcServices, arcrpc.New2(sc))
- }
- for i := 0; i < s.c.ChanSize; i++ {
- s.videoupAids = append(s.videoupAids, make(chan int64, 1024))
- s.waiter.Add(1)
- go s.consumerVideoup(i)
- s.waiter.Add(1)
- go s.pgcConsumer()
- }
- s.loadType()
- go s.cacheproc()
- // sync archive_result db!!!!!!!
- s.waiter.Add(1)
- go s.videoupConsumer()
- s.waiter.Add(1)
- go s.dmConsumer()
- // check consumer
- go s.checkConsume()
- s.waiter.Add(1)
- go s.retryproc()
- s.waiter.Add(1)
- go s.dmCounter()
- s.waiter.Add(1)
- go s.cachesubproc()
- s.waiter.Add(1)
- go s.accountNotifyproc()
- s.waiter.Add(1)
- go s.clearMidCache()
- return s
- }
- func (s *Service) sendNotify(upInfo *resmdl.ArchiveUpInfo) {
- var (
- nw []byte
- old []byte
- err error
- msg *dbusmdl.Message
- c = context.TODO()
- rt = &retry.Info{}
- )
- if nw, err = json.Marshal(upInfo.Nw); err != nil {
- log.Error("json.Marshal(%+v) error(%v)", upInfo.Nw, err)
- return
- }
- if old, err = json.Marshal(upInfo.Old); err != nil {
- log.Error("json.Marshal(%+v) error(%v)", upInfo.Old, err)
- return
- }
- msg = &dbusmdl.Message{Action: upInfo.Action, Table: upInfo.Table, New: nw, Old: old}
- if err = s.archiveResultPub.Send(c, strconv.FormatInt(upInfo.Nw.AID, 10), msg); err != nil {
- log.Error("s.archiveResultPub.Send(%+v) error(%v)", msg, err)
- rt.Action = retry.FailDatabus
- rt.Data.Aid = upInfo.Nw.AID
- rt.Data.DatabusMsg = upInfo
- s.PushFail(c, rt)
- return
- }
- msgStr, _ := json.Marshal(msg)
- log.Info("sendNotify(%s) successed", msgStr)
- }
- func (s *Service) loadType() {
- tpm, err := s.archiveDao.TypeMapping(context.TODO())
- if err != nil {
- log.Error("s.dede.TypeMapping error(%v)", err)
- return
- }
- s.sfTpsCache = tpm
- // audit types
- adt, err := s.archiveDao.AuditTypesConf(context.TODO())
- if err != nil {
- log.Error("s.dede.AuditTypesConf error(%v)", err)
- return
- }
- s.adtTpsCache = adt
- }
- func (s *Service) cacheproc() {
- for {
- time.Sleep(1 * time.Minute)
- s.loadType()
- }
- }
- // check consumer stat
- func (s *Service) checkConsume() {
- if s.c.Env != "pro" {
- return
- }
- for {
- time.Sleep(1 * time.Minute)
- for i := 0; i < s.c.ChanSize; i++ {
- if l := len(s.videoupAids[i]); l > s.c.MonitorSize {
- s.monitorDao.Send(context.TODO(), s.c.WeChantUsers, fmt.Sprintf("archive-job报警了啊\n UGC的chan太大了!!!\n s.videoupAids[%d] size(%d) is too large\n 是不是有人在刷数据!!!!", i, l), s.c.WeChatToken, s.c.WeChatSecret)
- }
- }
- if l := len(s.pgcAids); l > s.c.MonitorSize {
- s.monitorDao.Send(context.TODO(), s.c.WeChantUsers, fmt.Sprintf("archive-job报警了啊\n PGC的chan太大了!!!\n chan size(%d) is too large \n 是不是有人在刷数据!!!!", l), s.c.WeChatToken, s.c.WeChatSecret)
- }
- }
- }
- // Close kafaka consumer close.
- func (s *Service) Close() (err error) {
- s.closeSub = true
- time.Sleep(2 * time.Second)
- s.videoupSub.Close()
- s.closeRetry = true
- s.waiter.Wait()
- return
- }
|