dao.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package archive
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/interface/main/app-feed/conf"
  6. "go-common/app/service/main/archive/api"
  7. arcrpc "go-common/app/service/main/archive/api/gorpc"
  8. "go-common/app/service/main/archive/model/archive"
  9. "go-common/library/cache/memcache"
  10. "go-common/library/log"
  11. "go-common/library/net/metadata"
  12. "go-common/library/sync/errgroup"
  13. "github.com/pkg/errors"
  14. )
  15. // Dao is archive dao.
  16. type Dao struct {
  17. // rpc
  18. arcRPC *arcrpc.Service2
  19. // mc
  20. mc *memcache.Pool
  21. expireMc int32
  22. }
  23. // New new a archive dao.
  24. func New(c *conf.Config) (d *Dao) {
  25. d = &Dao{
  26. // rpc
  27. arcRPC: arcrpc.New2(c.ArchiveRPC),
  28. // mc
  29. mc: memcache.NewPool(c.Memcache.Feed.Config),
  30. expireMc: int32(time.Duration(c.Memcache.Feed.ExpireArchive) / time.Second),
  31. }
  32. return
  33. }
  34. func (d *Dao) PingMC(c context.Context) (err error) {
  35. conn := d.mc.Get(c)
  36. item := &memcache.Item{Key: "ping", Value: []byte{1}, Flags: memcache.FlagRAW, Expiration: d.expireMc}
  37. err = conn.Set(item)
  38. conn.Close()
  39. return
  40. }
  41. // Archives multi get archives.
  42. func (d *Dao) Archives(c context.Context, aids []int64) (am map[int64]*api.Arc, err error) {
  43. if len(aids) == 0 {
  44. return
  45. }
  46. g, ctx := errgroup.WithContext(c)
  47. g.Go(func() (err error) {
  48. var missed []int64
  49. if am, missed, err = d.arcsCache(ctx, aids); err != nil {
  50. missed = aids
  51. log.Error("%+v", err)
  52. err = nil
  53. }
  54. if len(missed) == 0 {
  55. return
  56. }
  57. var tmp map[int64]*api.Arc
  58. arg := &archive.ArgAids2{Aids: missed}
  59. if tmp, err = d.arcRPC.Archives3(c, arg); err != nil {
  60. err = errors.Wrapf(err, "%v", arg)
  61. return
  62. }
  63. for aid, a := range tmp {
  64. am[aid] = a
  65. }
  66. return
  67. })
  68. var stm map[int64]*api.Stat
  69. g.Go(func() (err error) {
  70. var missed []int64
  71. if stm, missed, err = d.statsCache(ctx, aids); err != nil {
  72. missed = aids
  73. log.Error("%+v", err)
  74. err = nil
  75. }
  76. if len(missed) == 0 {
  77. return
  78. }
  79. tmp, err := d.arcRPC.Stats3(ctx, &archive.ArgAids2{Aids: missed})
  80. if err != nil {
  81. log.Error("%+v", err)
  82. err = nil
  83. return
  84. }
  85. for _, st := range tmp {
  86. stm[st.Aid] = st
  87. }
  88. return
  89. })
  90. if err = g.Wait(); err != nil {
  91. return
  92. }
  93. for aid, arc := range am {
  94. if st, ok := stm[aid]; ok {
  95. arc.Stat = *st
  96. }
  97. }
  98. return
  99. }
  100. // ArchivesWithPlayer archives witch player
  101. func (d *Dao) ArchivesWithPlayer(c context.Context, aids []int64, qn int, platform string, fnver, fnval, forceHost, build int) (res map[int64]*archive.ArchiveWithPlayer, err error) {
  102. if len(aids) == 0 {
  103. return
  104. }
  105. ip := metadata.String(c, metadata.RemoteIP)
  106. arg := &archive.ArgPlayer{Aids: aids, Qn: qn, Platform: platform, Fnval: fnval, Fnver: fnver, RealIP: ip, ForceHost: forceHost, Build: build}
  107. if res, err = d.arcRPC.ArchivesWithPlayer(c, arg); err != nil {
  108. err = errors.Wrapf(err, "%v", arg)
  109. }
  110. return
  111. }