localcache.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/service/video/stream-mng/model"
  7. "go-common/library/log"
  8. "net/http"
  9. "time"
  10. )
  11. const (
  12. _localLiveStreamList = "live_stream"
  13. _localStreamInfo = "rid:%d"
  14. _liveExpiredTime = 600
  15. _localStreamExpiredTime = 1
  16. )
  17. type OnAirStream struct {
  18. StreamName string `json:"stream_name, omitempty"`
  19. }
  20. type OnAirStreamList struct {
  21. List []*OnAirStream `json:"list,omitempty"`
  22. }
  23. type AllOnAirStream struct {
  24. M map[string]*OnAirStreamList `json:"m,omitempty"`
  25. }
  26. func (d *Dao) getLocalLiveStreamListKey() string {
  27. return _localLiveStreamList
  28. }
  29. func (d *Dao) getLocalStreamInfoKey(rid int64) string {
  30. return fmt.Sprintf(_localStreamInfo, rid)
  31. }
  32. // loadLiveStreamList 判断流是否在播
  33. func (d *Dao) LoadLiveStreamList(c context.Context, rids []int64) map[int64]bool {
  34. list, _ := d.localCache.Get(d.getLocalLiveStreamListKey())
  35. isLive := map[int64]bool{}
  36. if res, ok := list.(map[string]int); ok {
  37. for _, v := range rids {
  38. // rid => stream
  39. info, err := d.streamFullInfo(c, v, "")
  40. if err != nil || info == nil {
  41. continue
  42. }
  43. var sname string
  44. var origin int64
  45. for _, v := range info.List {
  46. if v.Type == 1 {
  47. sname = v.StreamName
  48. origin = v.Origin
  49. break
  50. }
  51. }
  52. // 不在在播列表&orgin 为0===》不在播
  53. if _, exe := res[sname]; !exe {
  54. if origin == 0 {
  55. isLive[v] = false
  56. continue
  57. }
  58. }
  59. isLive[v] = true
  60. }
  61. } else {
  62. // 当从缓存中获取失败 or 项目刚刚启动
  63. for _, v := range rids {
  64. isLive[v] = true
  65. }
  66. }
  67. return isLive
  68. }
  69. // StoreLiveStreamList 刷新在播列表缓存
  70. func (d *Dao) StoreLiveStreamList() {
  71. ctx := context.Background()
  72. type liveStream struct {
  73. Code int `json:"code,omitempty"`
  74. Data *AllOnAirStream `json:"data,omitempty"`
  75. }
  76. resp := &liveStream{}
  77. uri := d.getLiveStreamUrl("/api/live/vendor/onairstreamlist?cdn=bvc")
  78. err := d.NewRequst(ctx, http.MethodGet, uri, nil, nil, nil, resp)
  79. if err != nil {
  80. log.Errorv(ctx, log.KV("log", fmt.Sprintf("http_live_err=%v", err)))
  81. return
  82. }
  83. res := map[string]int{}
  84. if resp.Code == 0 && resp.Data != nil && resp.Data.M != nil && len(resp.Data.M["BVC"].List) > 0 {
  85. for _, v := range resp.Data.M["BVC"].List {
  86. res[v.StreamName] = 1
  87. }
  88. } else {
  89. res, _ := json.Marshal(resp)
  90. log.Errorv(ctx, log.KV("log", fmt.Sprintf("http_live_err=%v", string(res))))
  91. }
  92. // 存10分钟
  93. d.localCache.SetWithExpire(d.getLocalLiveStreamListKey(), res, _liveExpiredTime*time.Second)
  94. }
  95. // storeStreamInfo 存储单个流信息
  96. func (d *Dao) storeStreamInfo(c context.Context, info *model.StreamFullInfo) {
  97. if info == nil || info.RoomID < 0 {
  98. return
  99. }
  100. key := d.getLocalStreamInfoKey(info.RoomID)
  101. d.localCache.SetWithExpire(key, info, _localStreamExpiredTime*time.Second)
  102. }
  103. // loadStreamInfo 读取单个信息
  104. func (d *Dao) loadStreamInfo(c context.Context, rid int64) *model.StreamFullInfo {
  105. key := d.getLocalStreamInfoKey(rid)
  106. info, _ := d.localCache.Get(key)
  107. if res, ok := info.(*model.StreamFullInfo); ok {
  108. if res != nil && res.RoomID > 0 {
  109. return res
  110. }
  111. }
  112. return nil
  113. }
  114. // storeMultiStreamInfo 存储多路流
  115. func (d *Dao) storeMultiStreamInfo(c context.Context, infos map[int64]*model.StreamFullInfo) {
  116. for _, v := range infos {
  117. key := d.getLocalStreamInfoKey(v.RoomID)
  118. //log.Warn("key=%v", key)
  119. d.localCache.SetWithExpire(key, v, _localStreamExpiredTime*time.Second)
  120. }
  121. }
  122. // loadMultiStreamInfo 读取批量信息
  123. func (d *Dao) loadMultiStreamInfo(c context.Context, rids []int64) (map[int64]*model.StreamFullInfo, []int64) {
  124. infos := map[int64]*model.StreamFullInfo{}
  125. missRids := []int64{}
  126. for _, v := range rids {
  127. key := d.getLocalStreamInfoKey(v)
  128. info, _ := d.localCache.Get(key)
  129. if res, ok := info.(*model.StreamFullInfo); ok {
  130. if res != nil && res.RoomID > 0 {
  131. infos[v] = res
  132. }
  133. } else {
  134. missRids = append(missRids, v)
  135. }
  136. }
  137. return infos, missRids
  138. }