dao.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package archive
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "go-common/app/interface/main/app-player/conf"
  7. "go-common/app/interface/main/app-player/model/archive"
  8. arcrpc "go-common/app/service/main/archive/api"
  9. "go-common/library/cache/memcache"
  10. "go-common/library/log"
  11. )
  12. // Dao is archive dao.
  13. type Dao struct {
  14. // memcache
  15. arcMc *memcache.Pool
  16. // chan
  17. mCh chan func()
  18. // rpc
  19. arcRPC arcrpc.ArchiveClient
  20. }
  21. // New new a archive dao.
  22. func New(c *conf.Config) (d *Dao) {
  23. d = &Dao{
  24. // memcache
  25. arcMc: memcache.NewPool(c.Memcache),
  26. // mc proc
  27. mCh: make(chan func(), 1024),
  28. }
  29. var err error
  30. d.arcRPC, err = arcrpc.NewClient(c.ArchiveClient)
  31. if err != nil {
  32. panic(fmt.Sprintf("archive NewClient error(%v)", err))
  33. }
  34. for i := 0; i < runtime.NumCPU(); i++ {
  35. go d.cacheproc()
  36. }
  37. return
  38. }
  39. // Ping ping check memcache connection
  40. func (d *Dao) Ping(c context.Context) (err error) {
  41. return d.pingMC(c)
  42. }
  43. // addCache add archive to mc or redis
  44. func (d *Dao) addCache(f func()) {
  45. select {
  46. case d.mCh <- f:
  47. default:
  48. log.Warn("cacheproc chan full")
  49. }
  50. }
  51. // cacheproc write memcache and stat redis use goroutine
  52. func (d *Dao) cacheproc() {
  53. for {
  54. f, ok := <-d.mCh
  55. if !ok {
  56. return
  57. }
  58. f()
  59. }
  60. }
  61. // ArchiveCache is
  62. func (d *Dao) ArchiveCache(c context.Context, aid int64) (arc *archive.Info, err error) {
  63. if arc, err = d.archiveCache(c, aid); err != nil {
  64. log.Error("%+v", err)
  65. err = nil
  66. }
  67. if arc != nil {
  68. return
  69. }
  70. var (
  71. view *arcrpc.ViewReply
  72. cids []int64
  73. )
  74. if view, err = d.arcRPC.View(c, &arcrpc.ViewRequest{Aid: aid}); err != nil {
  75. log.Error("d.arcRPC.View3(%d) error(%+v)", aid, err)
  76. return
  77. }
  78. for _, p := range view.Pages {
  79. cids = append(cids, p.Cid)
  80. }
  81. arc = &archive.Info{
  82. Aid: view.Arc.Aid,
  83. State: view.Arc.State,
  84. Mid: view.Arc.Author.Mid,
  85. Cids: cids,
  86. Attribute: view.Arc.Attribute,
  87. }
  88. d.addCache(func() {
  89. d.addArchiveCache(context.Background(), aid, arc)
  90. })
  91. return
  92. }
  93. // Views is
  94. func (d *Dao) Views(c context.Context, aids []int64) (arcs map[int64]*archive.Info, err error) {
  95. var reply *arcrpc.ViewsReply
  96. if reply, err = d.arcRPC.Views(c, &arcrpc.ViewsRequest{Aids: aids}); err != nil {
  97. return
  98. }
  99. arcs = make(map[int64]*archive.Info)
  100. for _, v := range reply.Views {
  101. var (
  102. info = new(archive.Info)
  103. cids []int64
  104. )
  105. info.Aid = v.Arc.Aid
  106. info.State = v.Arc.State
  107. info.Mid = v.Arc.Author.Mid
  108. info.Attribute = v.Arc.Attribute
  109. for _, p := range v.Pages {
  110. cids = append(cids, p.Cid)
  111. }
  112. info.Cids = cids
  113. arcs[info.Aid] = info
  114. }
  115. return
  116. }