service.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "time"
  8. appmdl "go-common/app/interface/main/app-player/model/archive"
  9. "go-common/app/job/main/app-player/conf"
  10. "go-common/app/job/main/app-player/dao"
  11. "go-common/app/job/main/app-player/model"
  12. arcrpc "go-common/app/service/main/archive/api"
  13. "go-common/library/ecode"
  14. "go-common/library/log"
  15. "go-common/library/queue/databus"
  16. )
  17. const (
  18. _updateAct = "update"
  19. _insertAct = "insert"
  20. _tableArchive = "archive"
  21. )
  22. // Service is service.
  23. type Service struct {
  24. c *conf.Config
  25. dao *dao.Dao
  26. arcRPC arcrpc.ArchiveClient
  27. // sub
  28. archiveNotifySub *databus.Databus
  29. waiter sync.WaitGroup
  30. closed bool
  31. }
  32. // New new a service.
  33. func New(c *conf.Config) (s *Service) {
  34. s = &Service{
  35. c: c,
  36. dao: dao.New(c),
  37. archiveNotifySub: databus.New(c.ArchiveNotifySub),
  38. closed: false,
  39. }
  40. var err error
  41. s.arcRPC, err = arcrpc.NewClient(nil)
  42. if err != nil {
  43. panic(fmt.Sprintf("archive NewClient error(%v)", err))
  44. }
  45. s.waiter.Add(1)
  46. go s.arcConsumeproc()
  47. s.waiter.Add(1)
  48. go s.retryproc()
  49. return
  50. }
  51. // Close Databus consumer close.
  52. func (s *Service) Close() {
  53. s.closed = true
  54. s.archiveNotifySub.Close()
  55. s.waiter.Wait()
  56. }
  57. // Ping is
  58. func (s *Service) Ping(c context.Context) (err error) {
  59. if err = s.dao.PingMc(c); err != nil {
  60. return
  61. }
  62. return
  63. }
  64. // arcConsumeproc consumer archive
  65. func (s *Service) arcConsumeproc() {
  66. var (
  67. msg *databus.Message
  68. ok bool
  69. err error
  70. )
  71. msgs := s.archiveNotifySub.Messages()
  72. for {
  73. if msg, ok = <-msgs; !ok {
  74. log.Info("arc databus Consumer exit")
  75. break
  76. }
  77. log.Info("got databus message(%s)", msg.Value)
  78. msg.Commit()
  79. var ms = &model.Message{}
  80. if err = json.Unmarshal(msg.Value, ms); err != nil {
  81. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  82. continue
  83. }
  84. switch ms.Table {
  85. case _tableArchive:
  86. s.archiveUpdate(ms.Action, ms.New)
  87. }
  88. }
  89. s.waiter.Done()
  90. }
  91. func (s *Service) archiveUpdate(action string, nwMsg []byte) {
  92. nw := &model.ArcMsg
  93. if err := json.Unmarshal(nwMsg, nw); err != nil {
  94. log.Error("json.Unmarshal(%s) error(%v)", nwMsg, err)
  95. return
  96. }
  97. switch action {
  98. case _updateAct, _insertAct:
  99. s.upArcCache(nw.Aid)
  100. }
  101. }
  102. func (s *Service) upArcCache(aid int64) {
  103. var (
  104. view *arcrpc.ViewReply
  105. arc *appmdl.Info
  106. cids []int64
  107. err error
  108. )
  109. defer func() {
  110. if err != nil {
  111. retry := &model.Retry{Aid: aid}
  112. s.dao.PushList(context.Background(), retry)
  113. log.Warn("upArcCache fail(%+v)", retry)
  114. }
  115. }()
  116. c := context.Background()
  117. if view, err = s.arcRPC.View(c, &arcrpc.ViewRequest{Aid: aid}); err != nil {
  118. if ecode.Cause(err).Equal(ecode.NothingFound) {
  119. err = nil
  120. return
  121. }
  122. log.Error("s.arcRPC.View3(%d) error(%v)", aid, err)
  123. return
  124. }
  125. for _, p := range view.Pages {
  126. cids = append(cids, p.Cid)
  127. }
  128. arc = &appmdl.Info{
  129. Aid: aid,
  130. Cids: cids,
  131. State: view.Arc.State,
  132. Mid: view.Arc.Author.Mid,
  133. Attribute: view.Arc.Attribute,
  134. }
  135. if err = s.dao.AddArchiveCache(context.Background(), aid, arc); err == nil {
  136. log.Info("update view cahce aid(%d) success", aid)
  137. }
  138. }
  139. func (s *Service) retryproc() {
  140. for {
  141. if s.closed {
  142. break
  143. }
  144. var (
  145. retry = &model.Retry{}
  146. bs []byte
  147. err error
  148. )
  149. if bs, err = s.dao.PopList(context.Background()); err != nil || len(bs) == 0 {
  150. time.Sleep(5 * time.Second)
  151. continue
  152. }
  153. if err = json.Unmarshal(bs, retry); err != nil {
  154. log.Error("json.Unmarshal(%s) error(%v)", bs, err)
  155. continue
  156. }
  157. log.Info("retry data(%+v) start", retry)
  158. if retry.Aid != 0 {
  159. s.upArcCache(retry.Aid)
  160. }
  161. log.Info("retry data(%+v) end", retry)
  162. }
  163. s.waiter.Done()
  164. }