official-stream.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/service/video/stream-mng/model"
  6. "go-common/library/database/sql"
  7. "go-common/library/log"
  8. "time"
  9. "github.com/pkg/errors"
  10. )
  11. const (
  12. _getOfficialStreamByName = "SELECT id, room_id, src, `name`, `key`, up_rank, down_rank, `status` FROM `sv_ls_stream` WHERE name = ?"
  13. _getOfficialStreamByRoomID = "SELECT id, room_id, src, `name`, `key`, up_rank, down_rank, `status` FROM `sv_ls_stream` WHERE room_id = ?"
  14. _getMultiOfficalStreamByRID = "SELECT id, room_id, src, `name`, `key`, up_rank, down_rank, `status` FROM `sv_ls_stream` WHERE room_id = %d"
  15. _insertOfficialStream = "INSERT INTO `sv_ls_stream` (room_id, `name`, `src`,`key`, `status`, up_rank, down_rank, last_status_updated_at, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?)"
  16. _updateUpOfficialStreamStatus = "UPDATE `sv_ls_stream` SET `up_rank` = 1,`last_status_updated_at` = CURRENT_TIMESTAMP WHERE `room_id` = ? and `src` = ?"
  17. _updateForwardOfficialStreamStatus = "UPDATE `sv_ls_stream` SET `up_rank` = 0,`last_status_updated_at` = CURRENT_TIMESTAMP WHERE `room_id` = ? and `src` != ?"
  18. _updateOfficalStreamUpRankStatus = "UPDATE `sv_ls_stream` SET `up_rank` = ?,`last_status_updated_at` = CURRENT_TIMESTAMP WHERE `room_id` = ? AND `up_rank` = ?;"
  19. )
  20. // GetOfficialStreamByName 根据流名查流信息, 可以获取多条记录
  21. func (d *Dao) GetOfficialStreamByName(c context.Context, name string) (infos []*model.OfficialStream, err error) {
  22. var rows *sql.Rows
  23. if rows, err = d.db.Query(c, _getOfficialStreamByName, name); err != nil {
  24. err = errors.WithStack(err)
  25. return
  26. }
  27. defer rows.Close()
  28. for rows.Next() {
  29. info := new(model.OfficialStream)
  30. if err = rows.Scan(&info.ID, &info.RoomID,
  31. &info.Src, &info.Name, &info.Key,
  32. &info.UpRank, &info.DownRank, &info.Status); err != nil {
  33. log.Warn("sv_ls_stream sql err = %v", err)
  34. err = errors.WithStack(err)
  35. infos = nil
  36. return
  37. }
  38. infos = append(infos, info)
  39. }
  40. err = rows.Err()
  41. return
  42. }
  43. // GetOfficialStreamByRoomID 根据roomid查询流信息, 可以获取多条记录
  44. func (d *Dao) GetOfficialStreamByRoomID(c context.Context, rid int64) (infos []*model.OfficialStream, err error) {
  45. var rows *sql.Rows
  46. if rows, err = d.db.Query(c, _getOfficialStreamByRoomID, rid); err != nil {
  47. err = errors.WithStack(err)
  48. return
  49. }
  50. defer rows.Close()
  51. for rows.Next() {
  52. info := new(model.OfficialStream)
  53. if err = rows.Scan(&info.ID, &info.RoomID,
  54. &info.Src, &info.Name, &info.Key,
  55. &info.UpRank, &info.DownRank, &info.Status); err != nil {
  56. log.Warn("sv_ls_stream sql err = %v", err)
  57. err = errors.WithStack(err)
  58. infos = nil
  59. return
  60. }
  61. infos = append(infos, info)
  62. }
  63. err = rows.Err()
  64. // 查询多个数据,不会报错, 只能判断为空
  65. if err == nil && len(infos) == 0 && rid < 10000 {
  66. infos = append(infos, &model.OfficialStream{
  67. RoomID: rid,
  68. Name: "miss",
  69. Src: 32,
  70. UpRank: 1,
  71. Key: "miss",
  72. })
  73. }
  74. return
  75. }
  76. // GetMultiOfficalStreamByRID 批量获取
  77. func (d *Dao) GetMultiOfficalStreamByRID(c context.Context, rids []int64) (infos []*model.OfficialStream, err error) {
  78. len := len(rids)
  79. muSql := ""
  80. for i := 0; i < len; i++ {
  81. ss := fmt.Sprintf(_getMultiOfficalStreamByRID, rids[i])
  82. if i == 0 {
  83. muSql = fmt.Sprintf("%s%s", muSql, ss)
  84. } else {
  85. muSql = fmt.Sprintf("%s UNION %s", muSql, ss)
  86. }
  87. }
  88. var rows *sql.Rows
  89. if rows, err = d.db.Query(c, muSql); err != nil {
  90. err = errors.WithStack(err)
  91. return
  92. }
  93. defer rows.Close()
  94. for rows.Next() {
  95. info := new(model.OfficialStream)
  96. if err = rows.Scan(&info.ID, &info.RoomID,
  97. &info.Src, &info.Name, &info.Key,
  98. &info.UpRank, &info.DownRank, &info.Status); err != nil {
  99. log.Warn("sv_ls_stream sql err = %v", err)
  100. if err == sql.ErrNoRows {
  101. continue
  102. } else {
  103. err = errors.WithStack(err)
  104. infos = nil
  105. return infos, err
  106. }
  107. }
  108. infos = append(infos, info)
  109. }
  110. err = rows.Err()
  111. return
  112. }
  113. // CreateOfficialStream 创建正式流
  114. func (d *Dao) CreateOfficialStream(c context.Context, infos []*model.OfficialStream) (err error) {
  115. tx, err := d.db.Begin(c)
  116. if err != nil {
  117. return err
  118. }
  119. defer func() {
  120. if err != nil {
  121. tx.Rollback()
  122. }
  123. }()
  124. ts := time.Now().Format("2006-01-02 15:04:05")
  125. for _, v := range infos {
  126. if _, err = d.stmtLegacyStreamCreate.Exec(c, v.RoomID, v.Name, v.Src, v.Key, v.Status, v.UpRank, v.DownRank, ts, ts, ts); err != nil {
  127. return err
  128. }
  129. }
  130. err = tx.Commit()
  131. return err
  132. }
  133. // UpdateOfficialStreamStatus 切换cdn时,更新流状态
  134. func (d *Dao) UpdateOfficialStreamStatus(c context.Context, rid int64, src int8) (err error) {
  135. // 事务操作, 同时操作多条记录,任何一条失败均回滚
  136. tx, err := d.db.Begin(c)
  137. if err != nil {
  138. return err
  139. }
  140. defer func() {
  141. if err != nil {
  142. tx.Rollback()
  143. }
  144. }()
  145. if _, err = d.stmtLegacyStreamEnableNewUpRank.Exec(c, rid, src); err != nil {
  146. return err
  147. }
  148. if _, err = d.stmtLegacyStreamDisableUpRank.Exec(c, rid, src); err != nil {
  149. return err
  150. }
  151. err = tx.Commit()
  152. return err
  153. }
  154. // UpdateOfficialUpRankStatus 清理互推标准,更新up_rank
  155. func (d *Dao) UpdateOfficialUpRankStatus(c context.Context, rid int64, whereSrc int8, toSrc int8) error {
  156. res, err := d.stmtLegacyStreamClearStreamFoward.Exec(c, toSrc, rid, whereSrc)
  157. if err != nil {
  158. return err
  159. }
  160. _, err = res.RowsAffected()
  161. return err
  162. }