123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- package dao
- import (
- "context"
- "crypto/md5"
- "encoding/hex"
- "fmt"
- "go-common/app/service/video/stream-mng/model"
- "go-common/library/log"
- "math/rand"
- "strings"
- "time"
- "go-common/library/database/sql"
- "github.com/pkg/errors"
- "go-common/app/service/video/stream-mng/common"
- )
- const (
- _maxRetryTimes = 5
- _vendorBVC = 1
- _vendorKS = 2
- _vendorQN = 4
- _vendorTC = 8
- _vendorWS = 16
- _getBackupStreamByRoomID = "SELECT `room_id`,`stream_name`, `key`,`default_vendor`, `origin_upstream`,`streaming`,`last_stream_time`, `expires_at`, `options` from `backup_stream` WHERE `room_id` = ? and status = 1"
- _getBackupStreamByStreamName = "SELECT `room_id`,`stream_name`, `key`, `default_vendor`, `origin_upstream`, `streaming`, `last_stream_time`, `expires_at`, `options` from `backup_stream` WHERE `stream_name` = ? and status = 1;"
- _getMultiBackupStreamByRID = "SELECT `room_id`,`stream_name`, `key`, `default_vendor`, `origin_upstream`, `streaming`, `last_stream_time`, `expires_at`, `options` from `backup_stream` WHERE `room_id` = %d and status = 1"
- _getBackupRoom = "SELECT distinct room_id from `backup_stream`"
- _insertBackupStream = "INSERT INTO `backup_stream` (room_id, stream_name, `key`, default_vendor, expires_at, options) VALUES (?, ?, ?, ?, ?, ?);"
- )
- // GetBackupStreamByRoomID 根据roomid获取备用流信息
- func (d *Dao) GetBackupStreamByRoomID(ctx context.Context, rid int64) (infos []*model.BackupStream, err error) {
- var rows *sql.Rows
- if rows, err = d.db.Query(ctx, _getBackupStreamByRoomID, rid); err != nil {
- err = errors.WithStack(err)
- return
- }
- defer rows.Close()
- for rows.Next() {
- bs := new(model.BackupStream)
- if err = rows.Scan(&bs.RoomID, &bs.StreamName, &bs.Key, &bs.DefaultVendor, &bs.OriginUpstream, &bs.Streaming, &bs.LastStreamTime, &bs.ExpiresAt, &bs.Options); err != nil {
- err = errors.WithStack(err)
- return
- }
- infos = append(infos, bs)
- }
- err = rows.Err()
- return
- }
- // GetMultiBackupStreamByRID 批量查询备用流
- func (d *Dao) GetMultiBackupStreamByRID(c context.Context, rids []int64) (infos []*model.BackupStream, err error) {
- len := len(rids)
- muSql := ""
- for i := 0; i < len; i++ {
- ss := fmt.Sprintf(_getMultiBackupStreamByRID, rids[i])
- if i == 0 {
- muSql = fmt.Sprintf("%s%s", muSql, ss)
- } else {
- muSql = fmt.Sprintf("%s UNION %s", muSql, ss)
- }
- }
- var rows *sql.Rows
- if rows, err = d.db.Query(c, muSql); err != nil {
- err = errors.WithStack(err)
- return
- }
- defer rows.Close()
- for rows.Next() {
- bs := new(model.BackupStream)
- if err = rows.Scan(&bs.RoomID, &bs.StreamName, &bs.Key, &bs.DefaultVendor, &bs.OriginUpstream, &bs.Streaming, &bs.LastStreamTime, &bs.ExpiresAt, &bs.Options); err != nil {
- if err == sql.ErrNoRows {
- continue
- }
- err = errors.WithStack(err)
- return
- }
- infos = append(infos, bs)
- }
- err = rows.Err()
- return
- }
- // GetBackupRoom 临时获取所有的房间号
- func (d *Dao) GetBackupRoom(ctx context.Context) (res map[int64]int64, err error) {
- res = map[int64]int64{}
- var rows *sql.Rows
- if rows, err = d.db.Query(ctx, _getBackupRoom); err != nil {
- err = errors.WithStack(err)
- return
- }
- defer rows.Close()
- for rows.Next() {
- bs := new(model.BackupStream)
- if err = rows.Scan(&bs.RoomID); err != nil {
- err = errors.WithStack(err)
- return
- }
- res[bs.RoomID] = bs.RoomID
- }
- err = rows.Err()
- return
- }
- // CreateBackupStream 创建备用流
- func (d *Dao) CreateBackupStream(ctx context.Context, bs *model.BackupStream) (*model.BackupStream, error) {
- if bs.StreamName == "" {
- bs.StreamName = fmt.Sprintf("live_%d_bs_%d", bs.RoomID, rand.Intn(9899999)+100000)
- }
- if bs.Key == "" {
- h := md5.New()
- h.Write([]byte(fmt.Sprintf("%s%d", bs.StreamName, time.Now().Nanosecond())))
- bs.Key = hex.EncodeToString(h.Sum(nil))
- }
- if bs.DefaultVendor == 0 {
- bs.DefaultVendor = 1
- }
- // 当传入的默认上行不是五家cdn
- if _, ok := common.BitwiseMapSrc[bs.DefaultVendor]; !ok {
- bs.DefaultVendor = 1
- }
- if bs.ExpiresAt.Before(time.Now()) {
- bs.ExpiresAt = time.Now().Add(time.Hour * 336) // 14 * 24
- }
- res, err := d.stmtBackupStreamCreate.Exec(ctx, bs.RoomID, bs.StreamName, bs.Key, bs.DefaultVendor, bs.ExpiresAt.Format("2006-01-02 15:04:05"), bs.Options)
- if err != nil {
- return bs, err
- }
- bs.ID, err = res.LastInsertId()
- return bs, err
- }
- // GetBackupStreamByStreamName 根据流名查询备用流
- func (d *Dao) GetBackupStreamByStreamName(c context.Context, sn string) (*model.BackupStream, error) {
- row := d.db.QueryRow(c, _getBackupStreamByStreamName, sn)
- bs := &model.BackupStream{}
- err := row.Scan(&bs.RoomID, &bs.StreamName, &bs.Key,
- &bs.DefaultVendor, &bs.OriginUpstream, &bs.Streaming,
- &bs.LastStreamTime, &bs.ExpiresAt, &bs.Options)
- if err != nil {
- return nil, err
- }
- return bs, nil
- }
- const (
- _setOriginUpstream = "UPDATE `backup_stream` SET `origin_upstream` = ?, `streaming` = ? WHERE `stream_name` = ? and `origin_upstream` = 0;"
- _setForwardUpstream = "UPDATE `backup_stream` SET `streaming` = ? WHERE `stream_name` = ? and `streaming` = ?;"
- _setOriginUpstreamOnClose = "UPDATE `backup_stream` SET `origin_upstream` = 0, `streaming` = 0, `last_stream_time` = CURRENT_TIMESTAMP WHERE `stream_name` = ? and `origin_upstream` != 0;"
- _setForwardUpstreamOnClose = "UPDATE `backup_stream` SET `streaming` = ? WHERE `stream_name` = ? and `streaming` = ?;"
- )
- var cdnBitwiseMap = map[string]int64{
- "bvc": _vendorBVC,
- "ks": _vendorKS,
- "js": _vendorKS, // alias
- "qn": _vendorQN,
- "tc": _vendorTC,
- "tx": _vendorTC, // alias
- "txy": _vendorTC, // alias
- "ws": _vendorWS,
- }
- // SetBackupStreamStreamingStatus
- func (d *Dao) SetBackupStreamStreamingStatus(c context.Context, p *model.StreamingNotifyParam, bs *model.BackupStream, open bool) (*model.BackupStream, error) {
- bitwise, ok := cdnBitwiseMap[strings.ToLower(p.SRC)]
- if !ok {
- return nil, errors.New("unknown src:" + p.SRC)
- }
- for i := 1; i <= _maxRetryTimes; i++ {
- if open { // 开播
- if bs.Streaming&bitwise == bitwise {
- return bs, nil
- }
- if p.Type.String() == "0" { // 主推
- if bs.OriginUpstream == 0 { // 只有当前没有原始上行时才去尝试更新主推记录
- res, err := d.db.Exec(c, _setOriginUpstream, bitwise, bitwise, bs.StreamName)
- if err != nil {
- log.Errorw(c, "backup_stream_update_origin_record", err)
- } else {
- ra, err := res.RowsAffected()
- if err != nil {
- log.Errorw(c, "backup_stream_update_origin_record_rows_affected", err)
- }
- if ra == 1 { // 成功
- bs.Streaming = bitwise
- bs.OriginUpstream = bitwise
- return bs, nil
- }
- // 影响行数为 0,可能是发生了错误
- // 也可能是因为原始数据已经发生变更,等待后面重新读取DB中的数据。
- }
- } else { // 目前已经有上行了。在这里处理。
- if bitwise != bs.OriginUpstream {
- return bs, errors.New("origin upstream already exists")
- }
- }
- } else { // 转推
- if bs.OriginUpstream == 0 {
- return bs, errors.New("origin upstream not exists")
- }
- res, err := d.db.Exec(c, _setForwardUpstream, bs.Streaming|bitwise, bs.StreamName, bs.Streaming)
- if err != nil {
- log.Errorw(c, "backup_stream_update_forward_record", err)
- } else {
- ra, err := res.RowsAffected()
- if err != nil {
- log.Errorw(c, "backup_stream_update_forward_record_rows_affected", err)
- }
- if ra == 1 { // 成功
- bs.Streaming = bs.Streaming | bitwise
- return bs, nil
- }
- }
- }
- } else { // 关播
- if p.Type.String() == "0" { // 主推
- if bs.OriginUpstream != bitwise { // 如果不是当前主推,直接拒绝
- return bs, errors.New("permission denied")
- }
- res, err := d.db.Exec(c, _setOriginUpstreamOnClose, bs.StreamName)
- if err != nil {
- log.Errorw(c, "backup_stream_update_onclose_origin_record", err)
- } else {
- ra, err := res.RowsAffected()
- if err != nil {
- log.Errorw(c, "backup_stream_update_origin_onclose_record_rows_affected", err)
- }
- if ra == 1 { // 成功
- bs.Streaming = 0
- bs.OriginUpstream = 0
- return bs, nil
- }
- }
- // 影响行数为 0,可能是发生了错误
- // 也可能是因为原始数据已经发生变更,等待后面重新读取DB中的数据。
- } else { // 转推
- if bs.OriginUpstream == bitwise {
- return bs, errors.New("invalid params. you are origin upstream.")
- }
- res, err := d.db.Exec(c, _setForwardUpstreamOnClose, bs.Streaming&^bitwise, bs.StreamName, bs.Streaming)
- if err != nil {
- log.Errorw(c, "backup_stream_update_onclose_forward_record", err)
- } else {
- ra, err := res.RowsAffected()
- if err != nil {
- log.Errorw(c, "backup_stream_update_forward_onclose_record_rows_affected", err)
- }
- if ra == 1 { // 成功
- bs.Streaming = bs.Streaming &^ bitwise
- return bs, nil
- }
- }
- }
- }
- time.Sleep(time.Millisecond * 100)
- bs, err := d.GetBackupStreamByStreamName(c, bs.StreamName)
- if err != nil {
- log.Errorw(c, "backup_stream_refresh_record_row", err)
- return bs, errors.New("system busy")
- }
- }
- return bs, errors.New("update backup stream failed")
- }
|