123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "strings"
- "time"
- "go-common/app/job/main/videoup/model/archive"
- "go-common/app/job/main/videoup/model/message"
- "go-common/app/job/main/videoup/model/redis"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- )
- // bvc2VuConsumer is bvc 2 videoup message consumer.
- func (s *Service) bvc2VuConsumer() {
- defer s.wg.Done()
- var (
- msgs = s.bvc2VuSub.Messages()
- err error
- c = context.TODO()
- )
- for {
- func() {
- msg, ok := <-msgs
- if !ok {
- log.Error("s,bvc2VuSub.Message closed")
- return
- }
- defer s.Rescue(string(msg.Value))
- msg.Commit()
- s.bvc2VuMo++
- m := &message.BvcVideo{}
- if err = json.Unmarshal(msg.Value, m); err != nil {
- log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err)
- return
- }
- if time.Now().Unix()-m.Timestamp > s.c.BvcConsumeTimeout {
- log.Info("bvcMessage consume delayed! key(%s) value(%s) partition(%d) offset(%d) route(%s) commit start", msg.Key, msg.Value, msg.Partition, msg.Offset, m.Route)
- s.bvc2VuDelayMo++
- }
- log.Info("bvcMessage key(%s) value(%s) partition(%d) offset(%d) route(%s) commit start", msg.Key, msg.Value, msg.Partition, msg.Offset, m.Route)
- s.promDatabus.Incr(m.Route)
- switch m.Route {
- case message.RouteUploadInfo:
- s.promVideoS.Incr("xcode_sd")
- s.uploadInfo(c, m)
- case message.RouteXcodeSDFail:
- err = s.xcodeSDFail(c, m)
- case message.RouteXcodeSdFinish:
- s.promVideoE.Incr("xcode_sd")
- s.promVideoS.Incr("first_round")
- err = s.xcodeSDFinish(c, m)
- case message.RouteXcodeHDFail:
- log.Info("bvcMessage key(%s) value(%s) partition(%d) offset(%d) route(%s) before enter func", msg.Key, msg.Value, msg.Partition, msg.Offset, m.Route)
- err = s.xcodeHDFail(c, m)
- case message.RouteXcodeHDFinish:
- s.promVideoE.Incr("xcode_hd")
- s.promVideoS.Incr("dispatch")
- err = s.xcodeHDFinish(c, m)
- case message.RouteDispatchRunning:
- err = s.dispatchRunning(c, m)
- case message.RouteDispatchFinish:
- s.promVideoE.Incr("dispatch")
- err = s.dispatchFinish(c, m)
- case message.RouteVideoshotpv:
- err = s.videoshotPv(c, m)
- default:
- log.Warn("bvc2VuConsumer unknown route(%s) message(%s)", m.Route, msg.Value)
- }
- if err == nil {
- log.Info("bvcMessage key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
- } else {
- log.Error("bvcMessage key(%s) value(%s) partition(%d) offset(%d) no commit error(%v)", msg.Key, msg.Value, msg.Partition, msg.Offset, err)
- }
- }()
- }
- }
- func (s *Service) videoshotPv(c context.Context, m *message.BvcVideo) (err error) {
- var count = len(m.ImgURLs)
- if count == 0 {
- return
- }
- var v *archive.Video
- if v, err = s.arc.NewVideo(c, m.Filename); err != nil {
- return
- }
- if v == nil {
- log.Warn("filename(%s) videoshotpv video not exist", m.Filename)
- return
- }
- s.arc.AddVideoShot(c, v.Cid, count)
- // double write...
- //s.videoshotAdd(v.Cid, m.BinURL, m.ImgURLs)
- return
- }
- func (s *Service) uploadInfo(c context.Context, m *message.BvcVideo) (err error) {
- s.redis.AddFilename(c, m.Filename)
- log.Info("filename(%s) upload success", m.Filename)
- return
- }
- func (s *Service) xcodeSDFail(c context.Context, m *message.BvcVideo) (err error) {
- var (
- v *archive.Video
- a *archive.Archive
- )
- if v, a, err = s.archiveVideo(c, m.Filename); err != nil {
- log.Error("s.archiveVideo(%s) error(%v)", m.Filename, err)
- return
- }
- if v.XcodeState >= archive.VideoXcodeSDFail {
- // NOTE: xcodeFail=1, xcodeState must uploadInfo=0
- log.Warn("archive(%d) video(%s) already(%d)", a.Aid, m.Filename, v.XcodeState)
- return // NOTE: is or not return???
- }
- v.Status = archive.VideoStatusXcodeFail
- v.XcodeState = archive.VideoXcodeSDFail
- v.FailCode = archive.XcodeFailCodes[m.FailInfo]
- // begin transcation
- var (
- tx *xsql.Tx
- change bool
- )
- if tx, err = s.arc.BeginTran(c); err != nil {
- log.Error("s.arc.BeginTran archive(%d) filename(%s) error(%v)", a.Aid, m.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) begin sd_fail transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = s.tranVideo(c, tx, a, v); err != nil {
- tx.Rollback()
- log.Error("s.tranVideo(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) sd_fail tranVideo fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if change, err = s.tranArchive(c, tx, a, v, nil); err != nil {
- tx.Rollback()
- log.Error("s.tranArchive(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) sd_fail tranArchive fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) end sd_fail transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if change {
- s.sendMsg(c, a, v)
- }
- return
- }
- func (s *Service) xcodeSDFinish(c context.Context, m *message.BvcVideo) (err error) {
- var (
- v *archive.Video
- a *archive.Archive
- )
- if v, a, err = s.archiveVideo(c, m.Filename); err != nil {
- log.Error("s.archiveVideo(%s) error(%v)", m.Filename, err)
- return
- }
- // if v.Playurl == m.PlayURL && v.Duration == m.Duration { // NOTE: check playurl&duration or xcode_state???
- if v.XcodeState >= archive.VideoXcodeSDFinish {
- // NOTE: sdFinish=2, xcideState must uploadInfo=0||xcodesdfail=1
- log.Warn("archive(%d) video(%s) already(%d)", a.Aid, m.Filename, v.XcodeState)
- return // NOTE: is or not return???
- }
- // if video already deleted, no dispatch no update
- if v.Status == archive.VideoStatusDelete {
- log.Info("xcodeSDFinish archive(%d) video(%s) video already deleted", a.Aid, m.Filename)
- v.Status = archive.VideoStatusDelete
- } else {
- v.Status = archive.VideoStatusWait // NOTE: default -1
- }
- // if archive already deleted, video state should be?
- if a.State == archive.StateForbidUpDelete {
- log.Info("xcodeSDFinish archive(%d) video(%s) archive already deleted", a.Aid, m.Filename)
- v.Status = archive.VideoStatusDelete
- }
- v.XcodeState = archive.VideoXcodeSDFinish
- v.Playurl = m.PlayURL
- v.Duration = m.Duration
- // begin transcation
- var (
- tx *xsql.Tx
- )
- if tx, err = s.arc.BeginTran(c); err != nil {
- log.Error("s.arc.BeginTran archive(%d) filename(%s) error(%v)", a.Aid, m.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) begin sd_finish transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = s.tranVideo(c, tx, a, v); err != nil {
- tx.Rollback()
- log.Error("s.tranVideo(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- if v.Status != archive.VideoStatusDelete {
- log.Info("archive(%d) filename(%s) sd_finish tranVideo fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if _, err = s.tranArchive(c, tx, a, v, nil); err != nil {
- tx.Rollback()
- log.Error("s.tranArchive(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- }
- if err = s.tranArcCover(c, tx, a, v); err != nil {
- tx.Rollback()
- log.Error("s.tranArcCover(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) sd_finish tranArchive fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if _, err = s.arc.TranVideoOper(c, tx, a.Aid, v.ID, v.Status, v.Attribute); err != nil {
- tx.Rollback()
- log.Error("s.arc.TranVideoOper(%d, %d, %d) error(%v)", a.Aid, v.ID, v.Status, err)
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- if s.canDo(a.Mid) {
- s.syncRetry(context.TODO(), a.Aid, 0, redis.ActionForVideocovers, a.Cover, a.Cover)
- }
- log.Info("archive(%d) filename(%s) end sd_finish transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- return
- }
- func (s *Service) xcodeHDFail(c context.Context, m *message.BvcVideo) (err error) {
- log.Info("xcode hd fail filename (%s)", m.Filename)
- var (
- v *archive.Video
- a *archive.Archive
- )
- if v, a, err = s.archiveVideo(c, m.Filename); err != nil {
- log.Error("s.archiveVideo(%s) error(%v)", m.Filename, err)
- return
- }
- if v.XcodeState >= archive.VideoXcodeHDFail {
- // NOTE: hdfail=3, xcodeState must uploadInfo=0||xcodesdfail=1||sdfinish=2
- log.Warn("archive(%d) video(%s) already(%d)", a.Aid, m.Filename, v.XcodeState)
- return // NOTE: is or not return???
- }
- v.Status = archive.VideoStatusXcodeFail
- v.XcodeState = archive.VideoXcodeHDFail
- v.FailCode = archive.XcodeFailCodes[m.FailInfo]
- // begin transcation
- var (
- tx *xsql.Tx
- change bool
- )
- if tx, err = s.arc.BeginTran(c); err != nil {
- log.Error("s.arc.BeginTran archive(%d) filename(%s) error(%v)", a.Aid, m.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) begin hd_fail transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = s.tranVideo(c, tx, a, v); err != nil {
- tx.Rollback()
- log.Error("s.tranVideo(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) hd_fail tranVideo fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if change, err = s.tranArchive(c, tx, a, v, nil); err != nil {
- tx.Rollback()
- log.Error("s.tranArchive(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) hd_fail tranArchive fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) end hd_fail transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if change {
- s.sendMsg(c, a, v)
- }
- return
- }
- func (s *Service) xcodeHDFinish(c context.Context, m *message.BvcVideo) (err error) {
- var (
- v *archive.Video
- a *archive.Archive
- )
- if v, a, err = s.archiveVideo(c, m.Filename); err != nil {
- log.Error("s.archiveVideo(%s) error(%v)", m.Filename, err)
- return
- }
- if v.XcodeState >= archive.VideoXcodeHDFinish {
- // NOTE: hdFinish=3, xcodeState must uploadInfo=0||xcodesdfail=1||sdfinish=2||hdfail=3
- log.Warn("archive(%d) video(%s) already(%d)", a.Aid, m.Filename, v.XcodeState)
- return // NOTE: is or not return???
- }
- // make sure filename not exist in redis, otherwise videoup can not submit!!!
- s.redis.DelFilename(c, m.Filename)
- log.Info("filename(%s) del_filename from redis success", m.Filename)
- // start deal hd finish
- v.XcodeState = archive.VideoXcodeHDFinish
- v.Resolutions = m.Resolutions
- v.Filesize = m.Filesize
- v.Duration = m.Duration
- v.Dimensions = fmt.Sprintf("%d,%d,%d", m.Width, m.Height, m.Rotate)
- // begin transcation
- var (
- tx *xsql.Tx
- )
- if tx, err = s.arc.BeginTran(c); err != nil {
- log.Error("s.arc.BeginTran archive(%d) filename(%s) error(%v)", a.Aid, m.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) begin hd_finish transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = s.tranVideo(c, tx, a, v); err != nil {
- tx.Rollback()
- log.Error("s.tranVideo(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) hd_finish tranVideo fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- // only hd5???
- var attr int32
- if strings.Contains(m.Resolutions, "hdflv2") || strings.Contains(m.Resolutions, "112") {
- attr = archive.AttrYes
- } else {
- attr = archive.AttrNo
- }
- if _, err = s.arc.TxUpAttrBit(tx, a.Aid, attr, archive.AttrBitHasHD5); err != nil {
- tx.Rollback()
- log.Error("s.arc.TxUpAttrBit(%d, %d, hd5) error(%v)", a.Aid, attr, err)
- return
- }
- log.Info("archive(%d) filename(%s) hd_finish attrBitHD5 fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) end hd_finish transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- return
- }
- func (s *Service) dispatchRunning(c context.Context, m *message.BvcVideo) (err error) {
- var (
- v *archive.Video
- a *archive.Archive
- )
- if v, a, err = s.archiveVideo(c, m.Filename); err != nil {
- log.Error("s.archiveVideo(%s) error(%v)", m.Filename, err)
- return
- }
- if v.XcodeState >= archive.VideoDispatchRunning {
- // NOTE: dispathRun=4, xcodeState must uploadInfo=0||xcodefail=1||sdfinish=2||xcodehdfail=3||hdFinish=4
- log.Warn("archive(%d) video(%s) already(%d)", a.Aid, m.Filename, v.XcodeState)
- return // NOTE: is or not return???
- }
- v.XcodeState = archive.VideoDispatchRunning
- // begin transcation
- var (
- tx *xsql.Tx
- )
- if tx, err = s.arc.BeginTran(c); err != nil {
- log.Error("s.arc.BeginTran archive(%d) filename(%s) error(%v)", a.Aid, m.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) begin dispatch_run transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = s.tranVideo(c, tx, a, v); err != nil {
- tx.Rollback()
- log.Error("s.tranVideo(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) dispatch_run tranVideo fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if _, err = s.tranArchive(c, tx, a, v, nil); err != nil {
- tx.Rollback()
- log.Error("s.tranArchive(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) dispatch_run tranArchive fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) end dispatch_run transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- return
- }
- func (s *Service) dispatchFinish(c context.Context, m *message.BvcVideo) (err error) {
- var (
- v *archive.Video
- a *archive.Archive
- )
- if v, a, err = s.archiveVideo(c, m.Filename); err != nil {
- log.Error("s.archiveVideo(%s) error(%v)", m.Filename, err)
- return
- }
- if v.XcodeState >= archive.VideoDispatchFinish {
- // NOTE: dispathFinish=5, xcodeState must uploadInfo=0||xcodefail=1||sdfinish=2||hdFinish=3||dispathRun=4
- log.Warn("archive(%d) video(%s) already(%d)", a.Aid, m.Filename, v.XcodeState)
- return // NOTE: is or not return???
- }
- v.XcodeState = archive.VideoDispatchFinish
- // begin transcation
- var (
- tx *xsql.Tx
- sChange, rChange bool
- )
- if tx, err = s.arc.BeginTran(c); err != nil {
- log.Error("s.arc.BeginTran archive(%d) filename(%s) error(%v)", a.Aid, m.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) begin dispatch_finish transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if err = s.tranVideo(c, tx, a, v); err != nil {
- tx.Rollback()
- log.Error("s.tranVideo(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) dispatch_finish tranVideo fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if sChange, err = s.tranArchive(c, tx, a, v, nil); err != nil {
- tx.Rollback()
- log.Error("s.tranArchive(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) dispatch_finish tranArchive fininsh a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- var round int8
- if round, err = s.tranRound(c, tx, a); err != nil {
- tx.Rollback()
- return
- }
- rChange = round != a.Round
- log.Info("archive(%d) filename(%s) dispatch_finish tranRound fininsh old_round(%d) new_round(%d)", a.Aid, v.Filename, a.Round, round)
- a.Round = round
- if sChange || rChange {
- if err = s.tranArchiveOper(tx, a); err != nil {
- tx.Rollback()
- return
- }
- }
- log.Info("archive(%d) filename(%s) dispatch_finish round_opr fininsh round(%d)", a.Aid, v.Filename, a.Round)
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit(%d, %s) error(%v)", a.Aid, v.Filename, err)
- return
- }
- log.Info("archive(%d) filename(%s) end dispatch_finish transcation a_state(%d) v_status(%d)", a.Aid, v.Filename, a.State, v.Status)
- if sChange {
- //稿件二审in/out量监控 每个aid 只统计一次 (1,自动过审2,手动审核需分开处理)(多P也只统计一次) start,end
- var had bool
- if archive.NormalState(a.State) {
- //monitor second_round 自动开放 in/out diff
- s.promVideoS.Incr("second_round")
- s.promVideoE.Incr("second_round")
- //auto open
- s.syncBVC(c, a)
- s.sendAuditMsg(c, message.RouteAutoOpen, a.Aid)
- if is, _ := s.IsUpperFirstPass(c, a.Mid, a.Aid); is {
- go s.sendNewUpperMsg(c, a.Mid, a.Aid)
- }
- } else if had, _ = s.redis.SetMonitorCache(c, a.Aid); had {
- s.promVideoS.Incr("second_round")
- }
- s.sendMsg(c, a, v)
- if a.State == archive.StateForbidFixed {
- s.addClickToRedis(c, a.Aid)
- }
- }
- return
- }
|