main-stream.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package dao
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/hex"
  6. "fmt"
  7. "go-common/app/service/video/stream-mng/common"
  8. "go-common/app/service/video/stream-mng/model"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "time"
  12. "github.com/pkg/errors"
  13. )
  14. /*
  15. 全新推流结构
  16. 所需功能:
  17. 1. 创建流 (从老表搬运) done
  18. 2. 校验流/读取 done
  19. 3. 开关播回调
  20. 4. 切上行
  21. 5. 清理互推标记
  22. */
  23. const (
  24. // 创建流
  25. _insertMainStream = "INSERT INTO `main_stream` (room_id, stream_name, `key`, default_vendor, options) VALUES (?, ?, ?, ?, ?);"
  26. // 读取流
  27. _getMainStreamWithoutConds = "SELECT `room_id`, `stream_name`, `key`, `default_vendor`, `origin_upstream`, `streaming`, `last_stream_time`, `options` from `main_stream` WHERE "
  28. _getMultiMainStreamByRID = "SELECT `room_id`, `stream_name`, `key`, `default_vendor`, `origin_upstream`, `streaming`, `last_stream_time`, `options` from `main_stream` WHERE room_id = %d"
  29. // 切上行
  30. _changeDefaultVendor = "UPDATE `main_stream` SET `default_vendor` = ? WHERE `room_id` = ? AND status = 1"
  31. // 切options
  32. _changeOptions = "UPDATE `main_stream` SET `options` = ? WHERE `room_id` = ? AND status = 1 AND `options` = ?"
  33. // 清理互推
  34. _clearAllStreaming = "UPDATE `main_stream` SET `origin_upstream` = 0, `streaming` = 0, `options` = ? WHERE `room_id` = ? AND `options` = ? AND status = 1"
  35. // 开关回调
  36. _notifyMainStreamOrigin = "UPDATE `main_stream` SET `origin_upstream` = ?, `streaming` = ? WHERE `room_id` = ? and `streaming` = ? and status = 1 limit 1"
  37. _notifyMainStreamOriginClose = "UPDATE `main_stream` SET `options` = ?,`origin_upstream` = 0, `streaming` = 0, `last_stream_time` = CURRENT_TIMESTAMP WHERE `room_id` = ? AND `options` = ? AND `status` = 1 limit 1"
  38. _notifyMainStreamForward = "UPDATE `main_stream` SET `streaming` = ? WHERE `room_id` = ? and `streaming` = ? and status = 1 limit 1"
  39. )
  40. // CreateNewStream used to create new Stream record
  41. func (d *Dao) CreateNewStream(c context.Context, stream *model.MainStream) (*model.MainStream, error) {
  42. if stream.RoomID <= 0 {
  43. return stream, fmt.Errorf("room id can not be empty")
  44. }
  45. if stream.StreamName == "" {
  46. return stream, fmt.Errorf("stream name can not be empty")
  47. }
  48. if stream.Key == "" {
  49. h := md5.New()
  50. h.Write([]byte(fmt.Sprintf("%s%d", stream.StreamName, time.Now().Nanosecond())))
  51. stream.Key = hex.EncodeToString(h.Sum(nil))
  52. }
  53. if stream.DefaultVendor == 0 {
  54. stream.DefaultVendor = 1
  55. }
  56. res, err := d.stmtMainStreamCreate.Exec(c, stream.RoomID, stream.StreamName, stream.Key, stream.DefaultVendor, stream.Options)
  57. if err != nil {
  58. return stream, err
  59. }
  60. stream.ID, err = res.LastInsertId()
  61. return stream, nil
  62. }
  63. // GetMainStreamFromDB 从DB中读取流信息
  64. // roomID 和 streamName 可以只传一个,传哪个就用哪个查询,否则必须两者对应
  65. func (d *Dao) GetMainStreamFromDB(c context.Context, roomID int64, streamName string) (*model.MainStream, error) {
  66. if roomID <= 0 && streamName == "" {
  67. return nil, errors.New("roomID and streamName cannot be empty at SAME time")
  68. }
  69. var row *sql.Row
  70. if roomID > 0 && streamName != "" {
  71. q := fmt.Sprintf("%s `room_id` = ? AND `stream_name` = ? AND status = 1", _getMainStreamWithoutConds)
  72. row = d.db.QueryRow(c, q, roomID, streamName)
  73. } else if roomID > 0 && streamName == "" {
  74. q := fmt.Sprintf("%s `room_id` = ? AND status = 1", _getMainStreamWithoutConds)
  75. row = d.db.QueryRow(c, q, roomID)
  76. } else if roomID <= 0 && streamName != "" {
  77. q := fmt.Sprintf("%s `stream_name` = ? AND status = 1", _getMainStreamWithoutConds)
  78. row = d.db.QueryRow(c, q, streamName)
  79. }
  80. stream := new(model.MainStream)
  81. err := row.Scan(&stream.RoomID, &stream.StreamName, &stream.Key,
  82. &stream.DefaultVendor, &stream.OriginUpstream, &stream.Streaming,
  83. &stream.LastStreamTime, &stream.Options)
  84. if err != nil {
  85. return nil, err
  86. }
  87. return stream, nil
  88. }
  89. // GetMultiMainStreamFromDB 批量从main-stream读取
  90. func (d *Dao) GetMultiMainStreamFromDB(c context.Context, rids []int64) (mainStream []*model.MainStream, err error) {
  91. len := len(rids)
  92. muSql := ""
  93. for i := 0; i < len; i++ {
  94. ss := fmt.Sprintf(_getMultiMainStreamByRID, rids[i])
  95. if i == 0 {
  96. muSql = fmt.Sprintf("%s%s", muSql, ss)
  97. } else {
  98. muSql = fmt.Sprintf("%s UNION %s", muSql, ss)
  99. }
  100. }
  101. var rows *sql.Rows
  102. if rows, err = d.db.Query(c, muSql); err != nil {
  103. err = errors.WithStack(err)
  104. return
  105. }
  106. defer rows.Close()
  107. for rows.Next() {
  108. stream := new(model.MainStream)
  109. if err = rows.Scan(&stream.RoomID, &stream.StreamName, &stream.Key,
  110. &stream.DefaultVendor, &stream.OriginUpstream, &stream.Streaming,
  111. &stream.LastStreamTime, &stream.Options); err != nil {
  112. if err == sql.ErrNoRows {
  113. continue
  114. }
  115. err = errors.WithStack(err)
  116. return
  117. }
  118. mainStream = append(mainStream, stream)
  119. }
  120. err = rows.Err()
  121. return
  122. }
  123. // ChangeDefaultVendor 切换默认上行
  124. func (d *Dao) ChangeDefaultVendor(c context.Context, roomID int64, newVendor int64) error {
  125. if roomID <= 0 {
  126. return errors.New("invalid roomID")
  127. }
  128. if _, ok := common.BitwiseMapName[newVendor]; !ok {
  129. return errors.New("invalid vendor")
  130. }
  131. _, err := d.stmtMainStreamChangeDefaultVendor.Exec(c, newVendor, roomID)
  132. return err
  133. }
  134. // ChangeMainStreamOptions 切换Options
  135. func (d *Dao) ChangeMainStreamOptions(c context.Context, roomID int64, newOptions int64, options int64) error {
  136. if roomID <= 0 {
  137. return errors.New("invalid roomID")
  138. }
  139. _, err := d.stmtMainStreamChangeOptions.Exec(c, newOptions, roomID, options)
  140. return err
  141. }
  142. // ClearMainStreaming 清理互推标记
  143. func (d *Dao) ClearMainStreaming(c context.Context, roomID int64, newoptions int64, options int64) error {
  144. if roomID <= 0 {
  145. return errors.New("invalid roomID")
  146. }
  147. _, err := d.stmtMainStreamClearAllStreaming.Exec(c, newoptions, roomID, options)
  148. return err
  149. }
  150. // MainStreamNotify 开关播回调
  151. // @param roomID 房间号
  152. // @param vendor 上行 CDN 位
  153. // @param isOpen 是否是开播 true 开播 false 关播
  154. // @param isOrigin 是否是原始上行 true 是 false 转推
  155. func (d *Dao) MainStreamNotify(c context.Context, roomID, vendor int64, isOpen bool, isOrigin bool, options int64, newoptions int64) error {
  156. if _, ok := common.BitwiseMapName[vendor]; !ok {
  157. return fmt.Errorf("Unknow vendor %d", vendor)
  158. }
  159. log.Infov(c, log.KV("roomID", roomID), log.KV("vendor", vendor), log.KV("isOpen", isOpen), log.KV("isOrigin", isOrigin), log.KV("options", options), log.KV("newoptions", newoptions))
  160. // "UPDATE `main_stream` SET `origin_upstream` = ?, `streaming` = ? WHERE `room_id` = ? AND `streaming` = ? AND `origin_upstream` = 0 and status = 1 limit 1"
  161. // "UPDATE `main_stream` SET `streaming` = ? WHERE `room_id` = ? and `streaming` = ? and status = 1 limit 1"
  162. ms, err := d.GetMainStreamFromDB(c, roomID, "")
  163. if ms == nil || err != nil {
  164. return fmt.Errorf("cannot found main stream by roomid (%d) with error:%v", roomID, err)
  165. }
  166. // 开播
  167. if isOpen {
  168. if isOrigin { // 主推
  169. _, err := d.db.Exec(c, _notifyMainStreamOrigin, vendor, ms.Streaming|vendor, roomID, ms.Streaming)
  170. return err
  171. }
  172. // 转推
  173. _, err := d.db.Exec(c, _notifyMainStreamForward, ms.Streaming|vendor, roomID, ms.Streaming)
  174. return err
  175. } else {
  176. log.Infov(c, log.KV("----test----", fmt.Sprintf("---- %v ----- %v ---- %v ---- %v -", _notifyMainStreamOriginClose, newoptions, roomID, options)))
  177. // 关播的时候, 必须是当前的origin=传递过来的cdn才可以关, 修复开关播的时序性问题
  178. if isOrigin && ms.OriginUpstream == vendor {
  179. _, err := d.db.Exec(c, _notifyMainStreamOriginClose, newoptions, roomID, options)
  180. return err
  181. }
  182. // 转推
  183. _, err := d.db.Exec(c, _notifyMainStreamForward, ms.Streaming&^vendor, roomID, ms.Streaming)
  184. return err
  185. }
  186. }