123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "sync"
- "time"
- appmdl "go-common/app/interface/main/app-player/model/archive"
- "go-common/app/job/main/app-player/conf"
- "go-common/app/job/main/app-player/dao"
- "go-common/app/job/main/app-player/model"
- arcrpc "go-common/app/service/main/archive/api"
- "go-common/library/ecode"
- "go-common/library/log"
- "go-common/library/queue/databus"
- )
- const (
- _updateAct = "update"
- _insertAct = "insert"
- _tableArchive = "archive"
- )
- // Service is service.
- type Service struct {
- c *conf.Config
- dao *dao.Dao
- arcRPC arcrpc.ArchiveClient
- // sub
- archiveNotifySub *databus.Databus
- waiter sync.WaitGroup
- closed bool
- }
- // New new a service.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- dao: dao.New(c),
- archiveNotifySub: databus.New(c.ArchiveNotifySub),
- closed: false,
- }
- var err error
- s.arcRPC, err = arcrpc.NewClient(nil)
- if err != nil {
- panic(fmt.Sprintf("archive NewClient error(%v)", err))
- }
- s.waiter.Add(1)
- go s.arcConsumeproc()
- s.waiter.Add(1)
- go s.retryproc()
- return
- }
- // Close Databus consumer close.
- func (s *Service) Close() {
- s.closed = true
- s.archiveNotifySub.Close()
- s.waiter.Wait()
- }
- // Ping is
- func (s *Service) Ping(c context.Context) (err error) {
- if err = s.dao.PingMc(c); err != nil {
- return
- }
- return
- }
- // arcConsumeproc consumer archive
- func (s *Service) arcConsumeproc() {
- var (
- msg *databus.Message
- ok bool
- err error
- )
- msgs := s.archiveNotifySub.Messages()
- for {
- if msg, ok = <-msgs; !ok {
- log.Info("arc databus Consumer exit")
- break
- }
- log.Info("got databus message(%s)", msg.Value)
- msg.Commit()
- var ms = &model.Message{}
- if err = json.Unmarshal(msg.Value, ms); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
- continue
- }
- switch ms.Table {
- case _tableArchive:
- s.archiveUpdate(ms.Action, ms.New)
- }
- }
- s.waiter.Done()
- }
- func (s *Service) archiveUpdate(action string, nwMsg []byte) {
- nw := &model.ArcMsg
- if err := json.Unmarshal(nwMsg, nw); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", nwMsg, err)
- return
- }
- switch action {
- case _updateAct, _insertAct:
- s.upArcCache(nw.Aid)
- }
- }
- func (s *Service) upArcCache(aid int64) {
- var (
- view *arcrpc.ViewReply
- arc *appmdl.Info
- cids []int64
- err error
- )
- defer func() {
- if err != nil {
- retry := &model.Retry{Aid: aid}
- s.dao.PushList(context.Background(), retry)
- log.Warn("upArcCache fail(%+v)", retry)
- }
- }()
- c := context.Background()
- if view, err = s.arcRPC.View(c, &arcrpc.ViewRequest{Aid: aid}); err != nil {
- if ecode.Cause(err).Equal(ecode.NothingFound) {
- err = nil
- return
- }
- log.Error("s.arcRPC.View3(%d) error(%v)", aid, err)
- return
- }
- for _, p := range view.Pages {
- cids = append(cids, p.Cid)
- }
- arc = &appmdl.Info{
- Aid: aid,
- Cids: cids,
- State: view.Arc.State,
- Mid: view.Arc.Author.Mid,
- Attribute: view.Arc.Attribute,
- }
- if err = s.dao.AddArchiveCache(context.Background(), aid, arc); err == nil {
- log.Info("update view cahce aid(%d) success", aid)
- }
- }
- func (s *Service) retryproc() {
- for {
- if s.closed {
- break
- }
- var (
- retry = &model.Retry{}
- bs []byte
- err error
- )
- if bs, err = s.dao.PopList(context.Background()); err != nil || len(bs) == 0 {
- time.Sleep(5 * time.Second)
- continue
- }
- if err = json.Unmarshal(bs, retry); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", bs, err)
- continue
- }
- log.Info("retry data(%+v) start", retry)
- if retry.Aid != 0 {
- s.upArcCache(retry.Aid)
- }
- log.Info("retry data(%+v) end", retry)
- }
- s.waiter.Done()
- }
|