123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- package dao
- import (
- "context"
- "errors"
- "fmt"
- "go-common/app/service/video/stream-mng/common"
- "go-common/app/service/video/stream-mng/model"
- "go-common/library/log"
- "go-common/library/net/metadata"
- "go-common/library/sync/errgroup"
- )
- // RawStreamFullInfo 直接从数据库中查询流信息,可传入流名, 也可传入rid
- func (d *Dao) RawStreamFullInfo(c context.Context, id int64, sname string) (res *model.StreamFullInfo, err error) {
- var (
- official []*model.OfficialStream
- backup []*model.StreamBase
- mainStream *model.MainStream
- )
- if sname != "" {
- official, err = d.GetOfficialStreamByName(c, sname)
- // 可以从原表中查询到
- if err == nil && official != nil && len(official) > 0 {
- id = official[0].RoomID
- goto END
- }
- var backUpInfo *model.BackupStream
- // 原表中查询不到
- backUpInfo, err = d.GetBackupStreamByStreamName(c, sname)
- if err != nil {
- log.Errorv(c, log.KV("log", fmt.Sprintf("sql backup_stream err = %v", err)))
- return
- }
- if backUpInfo == nil {
- err = fmt.Errorf("can not find any info by %s", sname)
- return
- }
- id = backUpInfo.RoomID
- }
- END:
- // todo 这里用老的errgroup, 新errgroup2 暂时未有人用,bug未知
- group, errCtx := errgroup.WithContext(c)
- // 如果还未查sv_ls_stream则需要查询
- if id > 0 && len(official) == 0 {
- group.Go(func() (err error) {
- log.Warn("group offical")
- if official, err = d.GetOfficialStreamByRoomID(errCtx, id); err != nil {
- log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group offical err=%v", err)))
- }
- return nil
- })
- }
- if id > 0 {
- group.Go(func() (err error) {
- log.Warn("group main")
- if mainStream, err = d.GetMainStreamFromDB(errCtx, id, ""); err != nil {
- log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group main err=%v", err)))
- }
- return nil
- })
- group.Go(func() (err error) {
- log.Warn("group back")
- back, err := d.GetBackupStreamByRoomID(errCtx, id)
- if err != nil {
- log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group backup err=%v", err)))
- } else {
- backup = d.formatBackup2BaseInfo(c, back)
- }
- return nil
- })
- }
- err = group.Wait()
- if err != nil {
- return
- }
- if len(official) == 0 {
- err = fmt.Errorf("can not find any info by room_id=%d", id)
- return
- }
- return d.formatStreamFullInfo(c, official, backup, mainStream)
- }
- // RawStreamRIDByName 查询rid
- func (d *Dao) RawStreamRIDByName(c context.Context, sname string) (res *model.StreamFullInfo, err error) {
- return d.RawStreamFullInfo(c, 0, sname)
- }
- // RawMultiStreamInfo 批量查询流信息
- func (d *Dao) RawMultiStreamInfo(c context.Context, rids []int64) (res map[int64]*model.StreamFullInfo, err error) {
- var (
- official []*model.OfficialStream
- backup []*model.BackupStream
- mainStream []*model.MainStream
- )
- group, errCtx := errgroup.WithContext(c)
- group.Go(func() (err error) {
- if official, err = d.GetMultiOfficalStreamByRID(errCtx, rids); err != nil {
- log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group offical err=%v", err)))
- }
- return nil
- })
- group.Go(func() (err error) {
- if backup, err = d.GetMultiBackupStreamByRID(errCtx, rids); err != nil {
- log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group back err=%v", err)))
- }
- return nil
- })
- group.Go(func() (err error) {
- if mainStream, err = d.GetMultiMainStreamFromDB(errCtx, rids); err != nil {
- log.Errorv(errCtx, log.KV("log", fmt.Sprintf("group back err=%v", err)))
- }
- return nil
- })
- err = group.Wait()
- if err != nil {
- return
- }
- // 把rid相同的放为一组
- ridMapOfficial := map[int64][]*model.OfficialStream{}
- for _, v := range official {
- ridMapOfficial[v.RoomID] = append(ridMapOfficial[v.RoomID], v)
- }
- ridMapBackup := map[int64][]*model.BackupStream{}
- for _, v := range backup {
- ridMapBackup[v.RoomID] = append(ridMapBackup[v.RoomID], v)
- }
- ridMapBackupBase := map[int64][]*model.StreamBase{}
- for id, v := range ridMapBackup {
- ridMapBackupBase[id] = d.formatBackup2BaseInfo(c, v)
- }
- ridMapMain := map[int64]*model.MainStream{}
- for _, v := range mainStream {
- ridMapMain[v.RoomID] = v
- }
- infos := map[int64]*model.StreamFullInfo{}
- flag := false
- for id, v := range ridMapOfficial {
- flag = true
- infos[id], _ = d.formatStreamFullInfo(c, v, ridMapBackupBase[id], ridMapMain[id])
- }
- if flag {
- return infos, nil
- }
- log.Errorv(c, log.KV("log", fmt.Errorf("can not find any info by room_ids=%d", rids)))
- return nil, nil
- }
- // formatStreamFullInfo 格式化流信息
- func (d *Dao) formatStreamFullInfo(c context.Context, official []*model.OfficialStream, backup []*model.StreamBase, main *model.MainStream) (*model.StreamFullInfo, error) {
- resp := &model.StreamFullInfo{}
- resp.List = []*model.StreamBase{}
- var roomID int64
- roomID = official[0].RoomID
- resp.RoomID = official[0].RoomID
- base := &model.StreamBase{}
- base.StreamName = official[0].Name
- base.Type = 1
- base.Key = official[0].Key
- if main != nil {
- base.Options = main.Options
- if 4&base.Options == 4 {
- base.Wmask = true
- }
- if 8&base.Options == 8 {
- base.Mmask = true
- }
- }
- for _, item := range official {
- if item.UpRank == 1 {
- if val, ok := common.SrcMapBitwise[item.Src]; ok {
- // todo origin为main-stream取
- if main != nil {
- base.Origin = main.OriginUpstream
- } else {
- // 做个兜底逻辑, main-stream中没有这个数据,但是sv_ls_stream确实在播
- base.Origin = val
- }
- base.DefaultUpStream = val
- } else {
- // 如果上行不在现在的任意一家, 则重新设置上行
- if err := d.UpdateOfficialStreamStatus(c, roomID, common.BVCSrc); err == nil {
- if main != nil {
- base.Origin = main.OriginUpstream
- } else {
- base.Origin = common.BitWiseBVC
- }
- base.DefaultUpStream = common.BitWiseBVC
- go func(c context.Context, rid int64, fromOrigin int8, toOrigin int64, sname string) {
- d.UpdateStreamStatusCache(c, &model.StreamStatus{
- RoomID: rid,
- StreamName: sname,
- DefaultChange: true,
- DefaultUpStream: toOrigin,
- })
- // 插入日志
- d.InsertChangeLog(c, &model.StreamChangeLog{
- RoomID: rid,
- FromOrigin: int64(fromOrigin),
- ToOrigin: toOrigin,
- Reason: fmt.Sprintf("上行不在五家CDN,old origin=%d", fromOrigin),
- OperateName: "auto_change",
- Source: "background",
- })
- }(metadata.WithContext(c), roomID, item.Src, common.BitWiseBVC, item.Name)
- }
- }
- } else if item.UpRank == 2 {
- if val, ok := common.SrcMapBitwise[item.Src]; ok {
- base.Forward = append(base.Forward, val)
- }
- }
- }
- resp.List = append(resp.List, base)
- if len(backup) > 0 {
- for _, v := range backup {
- resp.List = append(resp.List, v)
- }
- }
- d.liveAside.Do(c, func(ctx context.Context) {
- d.diffStreamInfo(ctx, resp, main)
- })
- return resp, nil
- }
- // formatBackup2Base backup 格式化为base
- func (d *Dao) formatBackup2BaseInfo(c context.Context, back []*model.BackupStream) (resp []*model.StreamBase) {
- if len(back) > 0 {
- for _, b := range back {
- bs := &model.StreamBase{}
- bs.StreamName = b.StreamName
- bs.Type = 2
- bs.Key = b.Key
- // 原始上行
- bs.Origin = b.OriginUpstream
- bs.DefaultUpStream = b.DefaultVendor
- bs.Options = b.Options
- // 位运算:可满足9家cdn
- var n int64
- for n = 256; n > 0; n /= 2 {
- if (b.Streaming&n) == n && n != bs.Origin {
- bs.Forward = append(bs.Forward, n)
- }
- }
- resp = append(resp, bs)
- }
- }
- return
- }
- // 比较新表和老表
- func (d *Dao) diffStreamInfo(c context.Context, info *model.StreamFullInfo, mainStream *model.MainStream) {
- if info != nil && info.RoomID != 0 && len(info.List) > 0 {
- if mainStream == nil {
- d.syncMainStream(c, info.RoomID, "")
- log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:can find any info, room_id=%d", info.RoomID)))
- return
- }
- offical := info.List[0]
- if mainStream.StreamName != offical.StreamName {
- log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:stream name is different,room_id=%d", info.RoomID)))
- return
- }
- if mainStream.Key != offical.Key {
- log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:key is different,room_id=%d", info.RoomID)))
- return
- }
- if mainStream.DefaultVendor != offical.DefaultUpStream {
- log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:DefaultVendor is different,room_id=%d,main=%d,offical=%d", info.RoomID, mainStream.DefaultVendor, offical.DefaultUpStream)))
- return
- }
- if mainStream.OriginUpstream != 0 && (mainStream.OriginUpstream != mainStream.DefaultVendor) {
- log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:OriginUpstream is different,room_id=%d, main origin=%d, main default=%d", info.RoomID, mainStream.OriginUpstream, mainStream.DefaultVendor)))
- return
- }
- streaming := offical.DefaultUpStream
- for _, v := range offical.Forward {
- streaming += v
- }
- if mainStream.Streaming != streaming {
- log.Infov(c, log.KV("log", fmt.Sprintf("diff_err:Streaming is different,room_id=%d, main=%d, offical=%d", info.RoomID, mainStream.Streaming, streaming)))
- return
- }
- }
- }
- func (d *Dao) syncMainStream(c context.Context, roomID int64, streamName string) error {
- if roomID <= 0 && streamName == "" {
- return errors.New("invalid params")
- }
- var err error
- exists, err := d.GetMainStreamFromDB(c, roomID, streamName)
- if err != nil && err.Error() != "sql: no rows in result set" {
- log.Errorv(c, log.KV("log", fmt.Sprintf("sync_stream_data_error = %v", err)))
- return err
- }
- if exists != nil && (exists.RoomID == roomID || exists.StreamName == streamName) {
- return nil
- }
- var full *model.StreamFullInfo
- if roomID > 0 && streamName == "" {
- full, err = d.StreamFullInfo(c, roomID, "")
- } else if roomID <= 0 && streamName != "" {
- full, err = d.StreamFullInfo(c, 0, streamName)
- }
- if err != nil {
- return err
- }
- if full == nil {
- return errors.New("unknow response")
- }
- for _, ss := range full.List {
- if ss.Type == 1 {
- ms := &model.MainStream{
- RoomID: full.RoomID,
- StreamName: ss.StreamName,
- Key: ss.Key,
- DefaultVendor: ss.DefaultUpStream,
- Status: 1,
- }
- if ms.DefaultVendor == 0 {
- ms.DefaultVendor = 1
- }
- _, err := d.CreateNewStream(c, ms)
- if err != nil {
- log.Errorv(c, log.KV("log", fmt.Sprintf("sync_stream_data_error = %v", err)))
- }
- break
- }
- }
- return nil
- }
|