wechat.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. tagmdl "go-common/app/interface/main/tag/model"
  6. "go-common/app/interface/main/web/model"
  7. arcmdl "go-common/app/service/main/archive/api"
  8. "go-common/library/log"
  9. "go-common/library/net/metadata"
  10. "go-common/library/sync/errgroup"
  11. )
  12. const (
  13. _tagBlkSize = 50
  14. _tagArcType = 3
  15. )
  16. var (
  17. _emptyWxArc = make([]*model.WxArchive, 0)
  18. _emptyWxArcTag = make([]*model.WxArcTag, 0)
  19. )
  20. // WxHot get wx hot archives.
  21. func (s *Service) WxHot(c context.Context, pn, ps int) (res []*model.WxArchive, count int, err error) {
  22. var (
  23. addCache = true
  24. aids []int64
  25. wxArcs, allRes []*model.WxArchive
  26. arcs map[int64]*model.WxArchive
  27. )
  28. defer func() {
  29. res, count = wxHotPage(allRes, pn, ps)
  30. }()
  31. if wxArcs, err = s.dao.WxHotCache(c); err != nil {
  32. err = nil
  33. addCache = false
  34. } else if len(wxArcs) > 0 {
  35. allRes = s.fmtWxArcs(c, wxArcs)
  36. return
  37. }
  38. if aids, err = s.dao.WxHot(c); err != nil {
  39. err = nil
  40. } else if len(aids) > s.c.Rule.MinWxHotCount {
  41. if arcs, err = s.archiveWithTag(c, aids); err != nil {
  42. err = nil
  43. } else if len(arcs) > 0 {
  44. for _, aid := range aids {
  45. if arc, ok := arcs[aid]; ok && arc != nil {
  46. allRes = append(allRes, arc)
  47. }
  48. }
  49. if addCache {
  50. s.cache.Do(c, func(c context.Context) {
  51. s.dao.SetWxHotCache(c, allRes)
  52. })
  53. }
  54. return
  55. }
  56. } else {
  57. log.Error("s.dao.WxHot len(%d)", len(allRes))
  58. }
  59. allRes, err = s.dao.WxHotBakCache(c)
  60. return
  61. }
  62. func wxHotPage(list []*model.WxArchive, pn, ps int) (res []*model.WxArchive, count int) {
  63. count = len(list)
  64. start := (pn - 1) * ps
  65. end := start + ps - 1
  66. if count == 0 || count < start {
  67. res = _emptyWxArc
  68. return
  69. }
  70. if count > end {
  71. res = list[start : end+1]
  72. } else {
  73. res = list[start:]
  74. }
  75. return
  76. }
  77. func (s *Service) archiveWithTag(c context.Context, aids []int64) (list map[int64]*model.WxArchive, err error) {
  78. var (
  79. arcErr, tagErr error
  80. arcsReply *arcmdl.ArcsReply
  81. tags map[int64][]*tagmdl.Tag
  82. mutex = sync.Mutex{}
  83. ip = metadata.String(c, metadata.RemoteIP)
  84. )
  85. group, errCtx := errgroup.WithContext(c)
  86. group.Go(func() error {
  87. if arcsReply, arcErr = s.arcClient.Arcs(errCtx, &arcmdl.ArcsRequest{Aids: aids}); arcErr != nil {
  88. log.Error("s.arcClient.Arcs(%d) error %v", aids, err)
  89. return arcErr
  90. }
  91. return nil
  92. })
  93. aidsLen := len(aids)
  94. tags = make(map[int64][]*tagmdl.Tag, aidsLen)
  95. for i := 0; i < aidsLen; i += _tagBlkSize {
  96. var partAids []int64
  97. if i+_tagBlkSize > aidsLen {
  98. partAids = aids[i:]
  99. } else {
  100. partAids = aids[i : i+_tagBlkSize]
  101. }
  102. group.Go(func() (err error) {
  103. var tmpRes map[int64][]*tagmdl.Tag
  104. arg := &tagmdl.ArgResTags{Oids: partAids, Type: _tagArcType, RealIP: ip}
  105. if tmpRes, tagErr = s.tag.ResTags(errCtx, arg); tagErr != nil {
  106. log.Error("s.tag.ResTag(%+v) error(%v)", arg, tagErr)
  107. return
  108. }
  109. mutex.Lock()
  110. for aid, tmpTags := range tmpRes {
  111. tags[aid] = tmpTags
  112. }
  113. mutex.Unlock()
  114. return nil
  115. })
  116. }
  117. if err = group.Wait(); err != nil {
  118. return
  119. }
  120. list = make(map[int64]*model.WxArchive, len(aids))
  121. for _, aid := range aids {
  122. if arc, ok := arcsReply.Arcs[aid]; ok && arc.IsNormal() {
  123. wxArc := new(model.WxArchive)
  124. wxArc.FromArchive(arc)
  125. if tag, ok := tags[aid]; ok {
  126. for _, v := range tag {
  127. wxArc.Tags = append(wxArc.Tags, &model.WxArcTag{ID: v.ID, Name: v.Name})
  128. }
  129. }
  130. if len(wxArc.Tags) == 0 {
  131. wxArc.Tags = _emptyWxArcTag
  132. }
  133. list[aid] = wxArc
  134. }
  135. }
  136. return
  137. }
  138. func (s *Service) fmtWxArcs(c context.Context, arcs []*model.WxArchive) (res []*model.WxArchive) {
  139. var (
  140. aids []int64
  141. newArcs map[int64]*model.WxArchive
  142. err error
  143. )
  144. for _, arc := range arcs {
  145. aids = append(aids, arc.Aid)
  146. }
  147. if newArcs, err = s.archiveWithTag(c, aids); err != nil {
  148. log.Error("fmtWxArcStat archiveWithTag aids(%+v) error(%v)", aids, err)
  149. return
  150. }
  151. for _, arc := range arcs {
  152. if newArc, ok := newArcs[arc.Aid]; ok {
  153. res = append(res, newArc)
  154. } else {
  155. res = append(res, arc)
  156. }
  157. }
  158. return
  159. }