backup-stream.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package dao
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/hex"
  6. "fmt"
  7. "go-common/app/service/video/stream-mng/model"
  8. "go-common/library/log"
  9. "math/rand"
  10. "strings"
  11. "time"
  12. "go-common/library/database/sql"
  13. "github.com/pkg/errors"
  14. "go-common/app/service/video/stream-mng/common"
  15. )
  16. const (
  17. _maxRetryTimes = 5
  18. _vendorBVC = 1
  19. _vendorKS = 2
  20. _vendorQN = 4
  21. _vendorTC = 8
  22. _vendorWS = 16
  23. _getBackupStreamByRoomID = "SELECT `room_id`,`stream_name`, `key`,`default_vendor`, `origin_upstream`,`streaming`,`last_stream_time`, `expires_at`, `options` from `backup_stream` WHERE `room_id` = ? and status = 1"
  24. _getBackupStreamByStreamName = "SELECT `room_id`,`stream_name`, `key`, `default_vendor`, `origin_upstream`, `streaming`, `last_stream_time`, `expires_at`, `options` from `backup_stream` WHERE `stream_name` = ? and status = 1;"
  25. _getMultiBackupStreamByRID = "SELECT `room_id`,`stream_name`, `key`, `default_vendor`, `origin_upstream`, `streaming`, `last_stream_time`, `expires_at`, `options` from `backup_stream` WHERE `room_id` = %d and status = 1"
  26. _getBackupRoom = "SELECT distinct room_id from `backup_stream`"
  27. _insertBackupStream = "INSERT INTO `backup_stream` (room_id, stream_name, `key`, default_vendor, expires_at, options) VALUES (?, ?, ?, ?, ?, ?);"
  28. )
  29. // GetBackupStreamByRoomID 根据roomid获取备用流信息
  30. func (d *Dao) GetBackupStreamByRoomID(ctx context.Context, rid int64) (infos []*model.BackupStream, err error) {
  31. var rows *sql.Rows
  32. if rows, err = d.db.Query(ctx, _getBackupStreamByRoomID, rid); err != nil {
  33. err = errors.WithStack(err)
  34. return
  35. }
  36. defer rows.Close()
  37. for rows.Next() {
  38. bs := new(model.BackupStream)
  39. if err = rows.Scan(&bs.RoomID, &bs.StreamName, &bs.Key, &bs.DefaultVendor, &bs.OriginUpstream, &bs.Streaming, &bs.LastStreamTime, &bs.ExpiresAt, &bs.Options); err != nil {
  40. err = errors.WithStack(err)
  41. return
  42. }
  43. infos = append(infos, bs)
  44. }
  45. err = rows.Err()
  46. return
  47. }
  48. // GetMultiBackupStreamByRID 批量查询备用流
  49. func (d *Dao) GetMultiBackupStreamByRID(c context.Context, rids []int64) (infos []*model.BackupStream, err error) {
  50. len := len(rids)
  51. muSql := ""
  52. for i := 0; i < len; i++ {
  53. ss := fmt.Sprintf(_getMultiBackupStreamByRID, rids[i])
  54. if i == 0 {
  55. muSql = fmt.Sprintf("%s%s", muSql, ss)
  56. } else {
  57. muSql = fmt.Sprintf("%s UNION %s", muSql, ss)
  58. }
  59. }
  60. var rows *sql.Rows
  61. if rows, err = d.db.Query(c, muSql); err != nil {
  62. err = errors.WithStack(err)
  63. return
  64. }
  65. defer rows.Close()
  66. for rows.Next() {
  67. bs := new(model.BackupStream)
  68. if err = rows.Scan(&bs.RoomID, &bs.StreamName, &bs.Key, &bs.DefaultVendor, &bs.OriginUpstream, &bs.Streaming, &bs.LastStreamTime, &bs.ExpiresAt, &bs.Options); err != nil {
  69. if err == sql.ErrNoRows {
  70. continue
  71. }
  72. err = errors.WithStack(err)
  73. return
  74. }
  75. infos = append(infos, bs)
  76. }
  77. err = rows.Err()
  78. return
  79. }
  80. // GetBackupRoom 临时获取所有的房间号
  81. func (d *Dao) GetBackupRoom(ctx context.Context) (res map[int64]int64, err error) {
  82. res = map[int64]int64{}
  83. var rows *sql.Rows
  84. if rows, err = d.db.Query(ctx, _getBackupRoom); err != nil {
  85. err = errors.WithStack(err)
  86. return
  87. }
  88. defer rows.Close()
  89. for rows.Next() {
  90. bs := new(model.BackupStream)
  91. if err = rows.Scan(&bs.RoomID); err != nil {
  92. err = errors.WithStack(err)
  93. return
  94. }
  95. res[bs.RoomID] = bs.RoomID
  96. }
  97. err = rows.Err()
  98. return
  99. }
  100. // CreateBackupStream 创建备用流
  101. func (d *Dao) CreateBackupStream(ctx context.Context, bs *model.BackupStream) (*model.BackupStream, error) {
  102. if bs.StreamName == "" {
  103. bs.StreamName = fmt.Sprintf("live_%d_bs_%d", bs.RoomID, rand.Intn(9899999)+100000)
  104. }
  105. if bs.Key == "" {
  106. h := md5.New()
  107. h.Write([]byte(fmt.Sprintf("%s%d", bs.StreamName, time.Now().Nanosecond())))
  108. bs.Key = hex.EncodeToString(h.Sum(nil))
  109. }
  110. if bs.DefaultVendor == 0 {
  111. bs.DefaultVendor = 1
  112. }
  113. // 当传入的默认上行不是五家cdn
  114. if _, ok := common.BitwiseMapSrc[bs.DefaultVendor]; !ok {
  115. bs.DefaultVendor = 1
  116. }
  117. if bs.ExpiresAt.Before(time.Now()) {
  118. bs.ExpiresAt = time.Now().Add(time.Hour * 336) // 14 * 24
  119. }
  120. res, err := d.stmtBackupStreamCreate.Exec(ctx, bs.RoomID, bs.StreamName, bs.Key, bs.DefaultVendor, bs.ExpiresAt.Format("2006-01-02 15:04:05"), bs.Options)
  121. if err != nil {
  122. return bs, err
  123. }
  124. bs.ID, err = res.LastInsertId()
  125. return bs, err
  126. }
  127. // GetBackupStreamByStreamName 根据流名查询备用流
  128. func (d *Dao) GetBackupStreamByStreamName(c context.Context, sn string) (*model.BackupStream, error) {
  129. row := d.db.QueryRow(c, _getBackupStreamByStreamName, sn)
  130. bs := &model.BackupStream{}
  131. err := row.Scan(&bs.RoomID, &bs.StreamName, &bs.Key,
  132. &bs.DefaultVendor, &bs.OriginUpstream, &bs.Streaming,
  133. &bs.LastStreamTime, &bs.ExpiresAt, &bs.Options)
  134. if err != nil {
  135. return nil, err
  136. }
  137. return bs, nil
  138. }
  139. const (
  140. _setOriginUpstream = "UPDATE `backup_stream` SET `origin_upstream` = ?, `streaming` = ? WHERE `stream_name` = ? and `origin_upstream` = 0;"
  141. _setForwardUpstream = "UPDATE `backup_stream` SET `streaming` = ? WHERE `stream_name` = ? and `streaming` = ?;"
  142. _setOriginUpstreamOnClose = "UPDATE `backup_stream` SET `origin_upstream` = 0, `streaming` = 0, `last_stream_time` = CURRENT_TIMESTAMP WHERE `stream_name` = ? and `origin_upstream` != 0;"
  143. _setForwardUpstreamOnClose = "UPDATE `backup_stream` SET `streaming` = ? WHERE `stream_name` = ? and `streaming` = ?;"
  144. )
  145. var cdnBitwiseMap = map[string]int64{
  146. "bvc": _vendorBVC,
  147. "ks": _vendorKS,
  148. "js": _vendorKS, // alias
  149. "qn": _vendorQN,
  150. "tc": _vendorTC,
  151. "tx": _vendorTC, // alias
  152. "txy": _vendorTC, // alias
  153. "ws": _vendorWS,
  154. }
  155. // SetBackupStreamStreamingStatus
  156. func (d *Dao) SetBackupStreamStreamingStatus(c context.Context, p *model.StreamingNotifyParam, bs *model.BackupStream, open bool) (*model.BackupStream, error) {
  157. bitwise, ok := cdnBitwiseMap[strings.ToLower(p.SRC)]
  158. if !ok {
  159. return nil, errors.New("unknown src:" + p.SRC)
  160. }
  161. for i := 1; i <= _maxRetryTimes; i++ {
  162. if open { // 开播
  163. if bs.Streaming&bitwise == bitwise {
  164. return bs, nil
  165. }
  166. if p.Type.String() == "0" { // 主推
  167. if bs.OriginUpstream == 0 { // 只有当前没有原始上行时才去尝试更新主推记录
  168. res, err := d.db.Exec(c, _setOriginUpstream, bitwise, bitwise, bs.StreamName)
  169. if err != nil {
  170. log.Errorw(c, "backup_stream_update_origin_record", err)
  171. } else {
  172. ra, err := res.RowsAffected()
  173. if err != nil {
  174. log.Errorw(c, "backup_stream_update_origin_record_rows_affected", err)
  175. }
  176. if ra == 1 { // 成功
  177. bs.Streaming = bitwise
  178. bs.OriginUpstream = bitwise
  179. return bs, nil
  180. }
  181. // 影响行数为 0,可能是发生了错误
  182. // 也可能是因为原始数据已经发生变更,等待后面重新读取DB中的数据。
  183. }
  184. } else { // 目前已经有上行了。在这里处理。
  185. if bitwise != bs.OriginUpstream {
  186. return bs, errors.New("origin upstream already exists")
  187. }
  188. }
  189. } else { // 转推
  190. if bs.OriginUpstream == 0 {
  191. return bs, errors.New("origin upstream not exists")
  192. }
  193. res, err := d.db.Exec(c, _setForwardUpstream, bs.Streaming|bitwise, bs.StreamName, bs.Streaming)
  194. if err != nil {
  195. log.Errorw(c, "backup_stream_update_forward_record", err)
  196. } else {
  197. ra, err := res.RowsAffected()
  198. if err != nil {
  199. log.Errorw(c, "backup_stream_update_forward_record_rows_affected", err)
  200. }
  201. if ra == 1 { // 成功
  202. bs.Streaming = bs.Streaming | bitwise
  203. return bs, nil
  204. }
  205. }
  206. }
  207. } else { // 关播
  208. if p.Type.String() == "0" { // 主推
  209. if bs.OriginUpstream != bitwise { // 如果不是当前主推,直接拒绝
  210. return bs, errors.New("permission denied")
  211. }
  212. res, err := d.db.Exec(c, _setOriginUpstreamOnClose, bs.StreamName)
  213. if err != nil {
  214. log.Errorw(c, "backup_stream_update_onclose_origin_record", err)
  215. } else {
  216. ra, err := res.RowsAffected()
  217. if err != nil {
  218. log.Errorw(c, "backup_stream_update_origin_onclose_record_rows_affected", err)
  219. }
  220. if ra == 1 { // 成功
  221. bs.Streaming = 0
  222. bs.OriginUpstream = 0
  223. return bs, nil
  224. }
  225. }
  226. // 影响行数为 0,可能是发生了错误
  227. // 也可能是因为原始数据已经发生变更,等待后面重新读取DB中的数据。
  228. } else { // 转推
  229. if bs.OriginUpstream == bitwise {
  230. return bs, errors.New("invalid params. you are origin upstream.")
  231. }
  232. res, err := d.db.Exec(c, _setForwardUpstreamOnClose, bs.Streaming&^bitwise, bs.StreamName, bs.Streaming)
  233. if err != nil {
  234. log.Errorw(c, "backup_stream_update_onclose_forward_record", err)
  235. } else {
  236. ra, err := res.RowsAffected()
  237. if err != nil {
  238. log.Errorw(c, "backup_stream_update_forward_onclose_record_rows_affected", err)
  239. }
  240. if ra == 1 { // 成功
  241. bs.Streaming = bs.Streaming &^ bitwise
  242. return bs, nil
  243. }
  244. }
  245. }
  246. }
  247. time.Sleep(time.Millisecond * 100)
  248. bs, err := d.GetBackupStreamByStreamName(c, bs.StreamName)
  249. if err != nil {
  250. log.Errorw(c, "backup_stream_refresh_record_row", err)
  251. return bs, errors.New("system busy")
  252. }
  253. }
  254. return bs, errors.New("update backup stream failed")
  255. }