dao.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package archive
  2. import (
  3. "context"
  4. "runtime"
  5. "time"
  6. "go-common/app/interface/main/app-view/conf"
  7. "go-common/app/interface/main/app-view/model/view"
  8. "go-common/app/service/main/archive/api"
  9. "go-common/app/service/main/archive/model/archive"
  10. "go-common/library/sync/errgroup"
  11. history "go-common/app/interface/main/history/model"
  12. hisrpc "go-common/app/interface/main/history/rpc/client"
  13. arcrpc "go-common/app/service/main/archive/api/gorpc"
  14. "go-common/library/cache/memcache"
  15. "go-common/library/log"
  16. bm "go-common/library/net/http/blademaster"
  17. "go-common/library/net/metadata"
  18. "github.com/pkg/errors"
  19. )
  20. // Dao is archive dao.
  21. type Dao struct {
  22. // http client
  23. client *bm.Client
  24. httpClient *bm.Client
  25. realteURL string
  26. commercialURL string
  27. relateRecURL string
  28. playURL string
  29. // rpc
  30. arcRPC *arcrpc.Service2
  31. hisRPC *hisrpc.Service
  32. // memcache
  33. arcMc *memcache.Pool
  34. expireArc int32
  35. expireRlt int32
  36. // chan
  37. mCh chan func()
  38. }
  39. // New new a archive dao.
  40. func New(c *conf.Config) (d *Dao) {
  41. d = &Dao{
  42. // http client
  43. client: bm.NewClient(c.HTTPWrite),
  44. httpClient: bm.NewClient(c.HTTPClient),
  45. arcRPC: arcrpc.New2(c.ArchiveRPC),
  46. hisRPC: hisrpc.New(c.HisRPC),
  47. realteURL: c.Host.Data + _realteURL,
  48. commercialURL: c.Host.APICo + _commercialURL,
  49. relateRecURL: c.Host.Data + _relateRecURL,
  50. playURL: c.Host.Bvcvod + _playURL,
  51. // memcache
  52. arcMc: memcache.NewPool(c.Memcache.Archive.Config),
  53. expireArc: int32(time.Duration(c.Memcache.Archive.ArchiveExpire) / time.Second),
  54. expireRlt: int32(time.Duration(c.Memcache.Archive.RelateExpire) / time.Second),
  55. // mc proc
  56. mCh: make(chan func(), 10240),
  57. }
  58. // video db
  59. for i := 0; i < runtime.NumCPU()*2; i++ {
  60. go d.cacheproc()
  61. }
  62. return
  63. }
  64. // Ping ping check memcache connection
  65. func (d *Dao) Ping(c context.Context) (err error) {
  66. return d.pingMC(c)
  67. }
  68. // Archive3 get archive.
  69. func (d *Dao) Archive3(c context.Context, aid int64) (a *api.Arc, err error) {
  70. arg := &archive.ArgAid2{Aid: aid}
  71. if a, err = d.arcRPC.Archive3(c, arg); err != nil {
  72. log.Error("d.arcRPC.Archive3(%v) error(%+v)", arg, err)
  73. return
  74. }
  75. return
  76. }
  77. // Archives multi get archives.
  78. func (d *Dao) Archives(c context.Context, aids []int64) (as map[int64]*api.Arc, err error) {
  79. if len(aids) == 0 {
  80. return
  81. }
  82. var stm map[int64]*api.Stat
  83. g, ctx := errgroup.WithContext(c)
  84. g.Go(func() (err error) {
  85. var missed []int64
  86. if as, missed, err = d.arcsCache(ctx, aids); err != nil {
  87. as = make(map[int64]*api.Arc, len(aids))
  88. missed = aids
  89. log.Error("%+v", err)
  90. err = nil
  91. }
  92. if len(missed) == 0 {
  93. return
  94. }
  95. var tmp map[int64]*api.Arc
  96. arg := &archive.ArgAids2{Aids: missed}
  97. if tmp, err = d.arcRPC.Archives3(ctx, arg); err != nil {
  98. log.Error("d.arcRPC.Archives3(%v) error(%v)", arg, err)
  99. return
  100. }
  101. for aid, a := range tmp {
  102. as[aid] = a
  103. }
  104. return
  105. })
  106. g.Go(func() (err error) {
  107. var missed []int64
  108. if stm, missed, err = d.statsCache(ctx, aids); err != nil {
  109. stm = make(map[int64]*api.Stat, len(aids))
  110. missed = aids
  111. log.Error("%+v", err)
  112. err = nil
  113. }
  114. if len(missed) == 0 {
  115. return
  116. }
  117. var tmp map[int64]*api.Stat
  118. arg := &archive.ArgAids2{Aids: missed}
  119. if tmp, err = d.arcRPC.Stats3(ctx, arg); err != nil {
  120. log.Error("d.arcRPC.Stats3(%v) error(%v)", arg, err)
  121. err = nil
  122. return
  123. }
  124. for aid, st := range tmp {
  125. stm[aid] = st
  126. }
  127. return
  128. })
  129. if err = g.Wait(); err != nil {
  130. log.Error("%+v", err)
  131. return
  132. }
  133. for aid, a := range as {
  134. if st, ok := stm[aid]; ok {
  135. a.Stat = *st
  136. }
  137. }
  138. return
  139. }
  140. // Shot get video shot.
  141. func (d *Dao) Shot(c context.Context, aid, cid int64) (shot *archive.Videoshot, err error) {
  142. ip := metadata.String(c, metadata.RemoteIP)
  143. arg := &archive.ArgCid2{Aid: aid, Cid: cid, RealIP: ip}
  144. return d.arcRPC.Videoshot2(c, arg)
  145. }
  146. // UpCount2 get upper count.
  147. func (d *Dao) UpCount2(c context.Context, mid int64) (cnt int, err error) {
  148. arg := &archive.ArgUpCount2{Mid: mid}
  149. if cnt, err = d.arcRPC.UpCount2(c, arg); err != nil {
  150. err = errors.Wrapf(err, "%v", arg)
  151. }
  152. return
  153. }
  154. // UpArcs3 get upper archives.
  155. func (d *Dao) UpArcs3(c context.Context, mid int64, pn, ps int) (as []*api.Arc, err error) {
  156. arg := &archive.ArgUpArcs2{Mid: mid, Pn: pn, Ps: ps}
  157. if as, err = d.arcRPC.UpArcs3(c, arg); err != nil {
  158. err = errors.Wrapf(err, "%v", arg)
  159. }
  160. return
  161. }
  162. // Progress is archive plays progress .
  163. func (d *Dao) Progress(c context.Context, aid, mid int64) (h *view.History, err error) {
  164. ip := metadata.String(c, metadata.RemoteIP)
  165. arg := &history.ArgPro{Mid: mid, Aids: []int64{aid}, RealIP: ip}
  166. his, err := d.hisRPC.Progress(c, arg)
  167. if err != nil {
  168. log.Error("d.hisRPC.Progress(%v) error(%v)", arg, err)
  169. return
  170. }
  171. if his[aid] != nil {
  172. h = &view.History{Cid: his[aid].Cid, Progress: his[aid].Pro}
  173. }
  174. return
  175. }
  176. // Archive 用的时候注意了!!!这个方法得到的稿件Stat不是最新的!!!
  177. func (d *Dao) Archive(c context.Context, aid int64) (a *api.Arc, err error) {
  178. if a, err = d.arcCache(c, aid); err != nil {
  179. log.Error("%+v", err)
  180. } else if a != nil {
  181. return
  182. }
  183. if a, err = d.arcRPC.Archive3(c, &archive.ArgAid2{Aid: aid}); err != nil {
  184. log.Error("d.arcRPC.Archive3(%d) error(%v)", aid, err)
  185. return
  186. }
  187. return
  188. }
  189. // addCache add archive to mc or redis
  190. func (d *Dao) addCache(f func()) {
  191. select {
  192. case d.mCh <- f:
  193. default:
  194. log.Warn("cacheproc chan full")
  195. }
  196. }
  197. // cacheproc write memcache and stat redis use goroutine
  198. func (d *Dao) cacheproc() {
  199. for {
  200. f := <-d.mCh
  201. f()
  202. }
  203. }