daoAnchor.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/library/ecode"
  6. "fmt"
  7. daoAnchorV0 "go-common/app/service/live/dao-anchor/api/grpc/v0"
  8. "go-common/app/service/live/dao-anchor/api/grpc/v1"
  9. "go-common/library/log"
  10. )
  11. //LIVE_OPEN 开播
  12. const LIVE_OPEN = 1
  13. //LIVE_CLOSE 关播
  14. const LIVE_CLOSE = 0
  15. //LIVE_ROUND 轮播
  16. const LIVE_ROUND = 2
  17. //PAGE_SIZE 分页数据量
  18. const PAGE_SIZE = 100
  19. //RETRY_TIME 接口充实次数
  20. const RETRY_TIME = 3
  21. //GetAllLiveRoom 获取在播列表
  22. func (d *Dao) GetAllLiveRoom(c context.Context, fields []string) (resp map[int64]*v1.RoomData, err error) {
  23. page := 0
  24. resp = map[int64]*v1.RoomData{}
  25. retry := 1
  26. for true {
  27. reply, err := d.daoAnchorApi.RoomOnlineList(c, &v1.RoomOnlineListReq{Fields: fields, Page: int64(page), PageSize: PAGE_SIZE})
  28. if err != nil {
  29. if retry >= RETRY_TIME {
  30. break
  31. }
  32. retry++
  33. time.Sleep(time.Second * 3)
  34. log.Errorw(c, "log", fmt.Sprintf("getAllLiveRoom_RoomOnlineList_error:page=%d;err=%v", page, err))
  35. continue
  36. }
  37. if len(reply.RoomDataList) <= 0 {
  38. break
  39. }
  40. page = page + 1
  41. roomDataList := reply.RoomDataList
  42. for roomId, info := range roomDataList {
  43. v := info
  44. resp[roomId] = v
  45. }
  46. time.Sleep(time.Millisecond)
  47. }
  48. return
  49. }
  50. //GetAllLiveRoomIds 获取在播列表
  51. func (d *Dao) GetAllLiveRoomIds(c context.Context) (resp []int64, err error) {
  52. page := 0
  53. reply, err := d.daoAnchorApi.RoomOnlineListByArea(c, &v1.RoomOnlineListByAreaReq{})
  54. if err != nil {
  55. log.Errorw(c, "log", fmt.Sprintf("GetAllLiveRoomIds_RoomOnlineList_error:page=%d;err=%v", page, err))
  56. return
  57. }
  58. if len(reply.RoomIds) <= 0 {
  59. return
  60. }
  61. resp = reply.RoomIds
  62. return
  63. }
  64. //GetInfosByRoomIds 获取主播房间信息
  65. func (d *Dao) GetInfosByRoomIds(c context.Context, roomIds []int64, fields []string) (resp map[int64]*v1.RoomData, err error) {
  66. if roomIds == nil {
  67. err = ecode.InvalidParam
  68. log.Errorw(c, "log", fmt.Sprintf("getInfosByRoomIds_params_error:%v", roomIds))
  69. return
  70. }
  71. reply, err := d.daoAnchorApi.FetchRoomByIDs(c, &v1.RoomByIDsReq{RoomIds: roomIds, Fields: fields})
  72. if err != nil {
  73. log.Errorw(c, "log", fmt.Sprintf("getInfosByRoomIds_FetchRoomByIDs_error:reply=%v;err=%v", reply, err))
  74. return
  75. }
  76. if reply == nil {
  77. err = ecode.CallDaoAnchorError
  78. log.Errorw(c, "log", "getInfosByRoomIds_FetchRoomByIDs_error")
  79. return
  80. }
  81. resp = reply.RoomDataSet
  82. return
  83. }
  84. //UpdateRoomEx ...
  85. func (d *Dao) UpdateRoomEx(c context.Context, roomId int64, fields []string, keyFrame string) (resp int64, err error) {
  86. if roomId < 0 {
  87. err = ecode.InvalidParam
  88. log.Errorw(c, "log", fmt.Sprintf("updateRoom_params_error:%v", roomId))
  89. return
  90. }
  91. reply, err := d.daoAnchorApi.RoomExtendUpdate(c, &v1.RoomExtendUpdateReq{Fields: fields, RoomId: roomId, Keyframe: keyFrame})
  92. if err != nil {
  93. log.Errorw(c, "log", fmt.Sprintf("updateRoom_RoomUpdate_error:reply=%v;err=%v", reply, err))
  94. return
  95. }
  96. resp = reply.AffectedRows
  97. return
  98. }
  99. func (d *Dao) CreateCacheList(c context.Context, roomIds []int64, content string) (err error) {
  100. if len(roomIds) <= 0 || content == "" {
  101. log.Errorw(c, "log", fmt.Sprintf("CreateCacheList_params_error:room_id=%v;content=%s", roomIds, content))
  102. return
  103. }
  104. reply, err := d.daoAnchorApiV0.CreateLiveCacheList(c, &daoAnchorV0.CreateLiveCacheListReq{RoomIds: roomIds, Content: content})
  105. if err != nil {
  106. log.Errorw(c, "log", fmt.Sprintf("CreateCacheList_error:reply=%v;err=%v", reply, err))
  107. return
  108. }
  109. log.Info("CreateCacheList_info:roomIds=%v;content=%s;reply=%v", roomIds, content, reply)
  110. return
  111. }
  112. func (d *Dao) GetContentMap(c context.Context) (resp map[string]int64, err error) {
  113. reply, err := d.daoAnchorApiV0.GetContentMap(c, &daoAnchorV0.GetContentMapReq{})
  114. if err != nil {
  115. log.Errorw(c, "log", fmt.Sprintf("GetContentMap_error:reply=%v;err=%v", reply, err))
  116. return
  117. }
  118. resp = reply.List
  119. return
  120. }
  121. func (d *Dao) CreateDBData(c context.Context, roomIds []int64, content string) (err error) {
  122. if len(roomIds) <= 0 || content == "" {
  123. log.Errorw(c, "log", fmt.Sprintf("CreateCacheList_params_error:room_id=%v;content=%s", roomIds, content))
  124. return
  125. }
  126. reply, err := d.daoAnchorApiV0.CreateDBData(c, &daoAnchorV0.CreateDBDataReq{RoomIds: roomIds, Content: content})
  127. if err != nil {
  128. log.Errorw(c, "log", fmt.Sprintf("CreateCacheList_error:reply=%v;err=%v", reply, err))
  129. return
  130. }
  131. return
  132. }