dao.go 6.2 KB


  1. package archive
  2. import (
  3. "context"
  4. "runtime"
  5. "time"
  6. "go-common/app/interface/main/app-intl/conf"
  7. arcmdl "go-common/app/interface/main/app-intl/model/player/archive"
  8. "go-common/app/interface/main/app-intl/model/view"
  9. history "go-common/app/interface/main/history/model"
  10. hisrpc "go-common/app/interface/main/history/rpc/client"
  11. "go-common/app/service/main/archive/api"
  12. arcrpc "go-common/app/service/main/archive/api/gorpc"
  13. "go-common/app/service/main/archive/model/archive"
  14. "go-common/library/cache/memcache"
  15. "go-common/library/ecode"
  16. "go-common/library/log"
  17. bm "go-common/library/net/http/blademaster"
  18. "go-common/library/net/metadata"
  19. "go-common/library/sync/errgroup"
  20. "github.com/pkg/errors"
  21. )
  22. // Dao is archive dao.
  23. type Dao struct {
  24. // http client
  25. client *bm.Client
  26. realteURL string
  27. commercialURL string
  28. relateRecURL string
  29. playURL string
  30. // rpc
  31. arcRPC *arcrpc.Service2
  32. arcRPC2 *arcrpc.Service2
  33. hisRPC *hisrpc.Service
  34. // mc
  35. mc *memcache.Pool
  36. expireMc int32
  37. expireRlt int32
  38. // chan
  39. mCh chan func()
  40. }
  41. // New new a archive dao.
  42. func New(c *conf.Config) (d *Dao) {
  43. d = &Dao{
  44. // http client
  45. client: bm.NewClient(c.HTTPWrite),
  46. realteURL: c.Host.Data + _realteURL,
  47. commercialURL: c.Host.APICo + _commercialURL,
  48. relateRecURL: c.Host.Data + _relateRecURL,
  49. playURL: c.Host.Bvcvod + _playURL,
  50. // rpc
  51. arcRPC: arcrpc.New2(c.ArchiveRPC),
  52. arcRPC2: arcrpc.New2(c.ArchiveRPC2),
  53. hisRPC: hisrpc.New(c.HisRPC),
  54. // mc
  55. mc: memcache.NewPool(c.Memcache.Feed.Config),
  56. expireMc: int32(time.Duration(c.Memcache.Feed.Expire) / time.Second),
  57. expireRlt: int32(time.Duration(c.Memcache.Archive.RelateExpire) / time.Second),
  58. // mc proc
  59. mCh: make(chan func(), 10240),
  60. }
  61. for i := 0; i < runtime.NumCPU()*2; i++ {
  62. go d.cacheproc()
  63. }
  64. return
  65. }
  66. // Ping ping check memcache connection
  67. func (d *Dao) Ping(c context.Context) (err error) {
  68. return d.pingMC(c)
  69. }
  70. // Archives multi get archives.
  71. func (d *Dao) Archives(c context.Context, aids []int64) (am map[int64]*api.Arc, err error) {
  72. if len(aids) == 0 {
  73. return
  74. }
  75. g, ctx := errgroup.WithContext(c)
  76. g.Go(func() (err error) {
  77. var missed []int64
  78. if am, missed, err = d.arcsCache(ctx, aids); err != nil {
  79. missed = aids
  80. log.Error("%+v", err)
  81. err = nil
  82. }
  83. if len(missed) == 0 {
  84. return
  85. }
  86. var tmp map[int64]*api.Arc
  87. arg := &archive.ArgAids2{Aids: missed}
  88. if tmp, err = d.arcRPC.Archives3(ctx, arg); err != nil {
  89. err = errors.Wrapf(err, "%v", arg)
  90. return
  91. }
  92. for aid, a := range tmp {
  93. am[aid] = a
  94. }
  95. return
  96. })
  97. var stm map[int64]*api.Stat
  98. g.Go(func() (err error) {
  99. var missed []int64
  100. if stm, missed, err = d.statsCache(ctx, aids); err != nil {
  101. missed = aids
  102. log.Error("%+v", err)
  103. err = nil
  104. }
  105. if len(missed) == 0 {
  106. return
  107. }
  108. tmp, err := d.arcRPC.Stats3(ctx, &archive.ArgAids2{Aids: missed})
  109. if err != nil {
  110. log.Error("%+v", err)
  111. err = nil
  112. return
  113. }
  114. for _, st := range tmp {
  115. stm[st.Aid] = st
  116. }
  117. return
  118. })
  119. if err = g.Wait(); err != nil {
  120. return
  121. }
  122. for aid, arc := range am {
  123. if st, ok := stm[aid]; ok {
  124. arc.Stat = *st
  125. }
  126. }
  127. return
  128. }
  129. // ArchivesWithPlayer archives witch player
  130. func (d *Dao) ArchivesWithPlayer(c context.Context, aids []int64, qn int, platform string, fnver, fnval int) (res map[int64]*archive.ArchiveWithPlayer, err error) {
  131. if len(aids) == 0 {
  132. return
  133. }
  134. // 国际版暂时不秒开
  135. // ip := metadata.String(c, metadata.RemoteIP)
  136. ip := ""
  137. arg := &archive.ArgPlayer{Aids: aids, Qn: qn, Platform: platform, Fnval: fnval, Fnver: fnver, RealIP: ip}
  138. if res, err = d.arcRPC.ArchivesWithPlayer(c, arg); err != nil {
  139. err = errors.Wrapf(err, "%v", arg)
  140. }
  141. return
  142. }
  143. // Archive get archive mc->rpc.
  144. func (d *Dao) Archive(c context.Context, aid int64) (a *api.Arc, err error) {
  145. if a, err = d.arcCache(c, aid); err != nil {
  146. log.Error("%+v", err)
  147. } else if a != nil {
  148. return
  149. }
  150. arg := &archive.ArgAid2{Aid: aid}
  151. if a, err = d.arcRPC.Archive3(c, arg); err != nil {
  152. log.Error("d.arcRPC.Archive3(%v) error(%v)", arg, err)
  153. if a, err = d.arcRPC2.Archive3(c, arg); err != nil {
  154. err = errors.Wrapf(err, "%v", arg)
  155. }
  156. }
  157. return
  158. }
  159. // Archive3 get archive.
  160. func (d *Dao) Archive3(c context.Context, aid int64) (a *api.Arc, err error) {
  161. arg := &archive.ArgAid2{Aid: aid}
  162. if a, err = d.arcRPC.Archive3(c, arg); err != nil {
  163. log.Error("d.arcRPC.Archive3(%v) error(%+v)", arg, err)
  164. if a, err = d.arcRPC2.Archive3(c, arg); err != nil {
  165. err = errors.Wrapf(err, "d.arcRPC2.Archive3(%v)", arg)
  166. return
  167. }
  168. }
  169. return
  170. }
  171. // Progress is archive plays progress .
  172. func (d *Dao) Progress(c context.Context, aid, mid int64) (h *view.History, err error) {
  173. ip := metadata.String(c, metadata.RemoteIP)
  174. arg := &history.ArgPro{Mid: mid, Aids: []int64{aid}, RealIP: ip}
  175. his, err := d.hisRPC.Progress(c, arg)
  176. if err != nil {
  177. log.Error("d.hisRPC.Progress(%v) error(%v)", arg, err)
  178. return
  179. }
  180. if his[aid] != nil {
  181. h = &view.History{Cid: his[aid].Cid, Progress: his[aid].Pro}
  182. }
  183. return
  184. }
  185. // UpCount2 get upper count.
  186. func (d *Dao) UpCount2(c context.Context, mid int64) (cnt int, err error) {
  187. arg := &archive.ArgUpCount2{Mid: mid}
  188. if cnt, err = d.arcRPC.UpCount2(c, arg); err != nil {
  189. err = errors.Wrapf(err, "%v", arg)
  190. }
  191. return
  192. }
  193. // ArchiveCache is
  194. func (d *Dao) ArchiveCache(c context.Context, aid int64) (arc *arcmdl.Info, err error) {
  195. var (
  196. vp *archive.View3
  197. cids []int64
  198. )
  199. if vp, err = d.ViewCache(c, aid); err != nil {
  200. log.Error("%+v", err)
  201. }
  202. if vp == nil || vp.Archive3 == nil || len(vp.Pages) == 0 || vp.AttrVal(archive.AttrBitIsMovie) == archive.AttrYes {
  203. if vp, err = d.View3(c, aid); err != nil {
  204. log.Error("%+v", err)
  205. err = ecode.NothingFound
  206. return
  207. }
  208. }
  209. if vp == nil || vp.Archive3 == nil || len(vp.Pages) == 0 {
  210. err = ecode.NothingFound
  211. return
  212. }
  213. for _, p := range vp.Pages {
  214. cids = append(cids, p.Cid)
  215. }
  216. arc = &arcmdl.Info{
  217. Aid: vp.Aid,
  218. State: vp.State,
  219. Mid: vp.Author.Mid,
  220. Cids: cids,
  221. Attribute: vp.Attribute,
  222. }
  223. return
  224. }
  225. // addCache add archive to mc or redis
  226. func (d *Dao) addCache(f func()) {
  227. select {
  228. case d.mCh <- f:
  229. default:
  230. log.Warn("cacheproc chan full")
  231. }
  232. }
  233. // cacheproc write memcache and stat redis use goroutine
  234. func (d *Dao) cacheproc() {
  235. for {
  236. f := <-d.mCh
  237. f()
  238. }
  239. }