123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- package service
- import (
- "context"
- "strconv"
- "time"
- "go-common/app/admin/main/videoup/model/archive"
- "go-common/app/admin/main/videoup/model/message"
- "go-common/app/admin/main/videoup/model/up"
- "go-common/library/log"
- )
- func (s *Service) busFirstRound(aid int64, fn, encodePurpose string, videoDesign *message.VideoDesign, status int16, encoding int8, regionID, typeID int16, fans int64, adminChange bool) (err error) {
- var msg = &message.Videoup{
- Route: message.RouteFirstRound,
- Fans: fans,
- Timestamp: time.Now().Unix(),
- Aid: aid,
- Filename: fn,
- Status: status,
- Xcode: encoding,
- EncodeRegionID: regionID,
- VideoDesign: videoDesign,
- AdminChange: adminChange,
- EncodeTypeID: typeID,
- }
- if len(encodePurpose) != 0 {
- msg.EncodePurpose = encodePurpose
- }
- log.Info("filename(%s) start to send firstRound(%+v) to databus", fn, msg)
- if err = s.busSendMsg(msg); err != nil {
- s.msgCh <- msg
- }
- return
- }
- func (s *Service) busUGCFirstRound(aid int64, fn, encodePurpose string, videoDesign *message.VideoDesign, status int16, encoding int8, regionID, typeID int16, fans int64, adminChange bool) (err error) {
- var msg = &message.Videoup{
- Route: message.RouteUGCFirstRound,
- Fans: fans,
- Timestamp: time.Now().Unix(),
- Aid: aid,
- Filename: fn,
- Status: status,
- Xcode: encoding,
- EncodeRegionID: regionID,
- VideoDesign: videoDesign,
- AdminChange: adminChange,
- EncodeTypeID: typeID,
- }
- if len(encodePurpose) != 0 {
- msg.EncodePurpose = encodePurpose
- }
- log.Info("filename(%s) start to send UGCFirstRound(%+v) to databus", fn, msg)
- if err = s.busSendMsg(msg); err != nil {
- s.msgCh <- msg
- }
- return
- }
- func (s *Service) busArchiveForceSync(aid int64) (err error) {
- var msg = &message.Videoup{
- Route: message.RouteForceSync,
- Timestamp: time.Now().Unix(),
- Aid: aid,
- }
- log.Info("aid(%d) send busArchiveForceSync to databus ", aid)
- if err = s.busSendMsg(msg); err != nil {
- s.msgCh <- msg
- }
- return
- }
- func (s *Service) busModifyArchive(aid int64, aChange, vChange bool) (err error) {
- var msg = &message.Videoup{
- Route: message.RouteModifyArchive,
- Timestamp: time.Now().Unix(),
- Aid: aid,
- EditArchive: aChange,
- EditVideo: vChange,
- }
- log.Info("aid(%d) send modifyArchive to databus by EditArchive(%v) EditVideo(%v)", aid, aChange, vChange)
- if err = s.busSendMsg(msg); err != nil {
- s.msgCh <- msg
- }
- return
- }
- func (s *Service) busSecondRound(aid, missionID int64, notify bool, email, changeTypeID, changeCopyright, changeTitle, ChangeCover bool, fromList string, ap *archive.ArcParam) (err error) {
- sendEmail := true
- if ap != nil {
- sendEmail = !ap.NoEmail
- }
- var msg = &message.Videoup{
- Route: message.RouteSecondRound,
- Aid: aid,
- Notify: notify,
- MissionID: missionID,
- Timestamp: time.Now().Unix(),
- AdminChange: email,
- ChangeTypeID: changeTypeID,
- ChangeCopyright: changeCopyright,
- ChangeTitle: changeTitle,
- ChangeCover: ChangeCover,
- FromList: fromList,
- SendEmail: sendEmail,
- }
- log.Info("aid(%d) start to send secondRound msg(%+v) to databus", aid, msg)
- if err = s.busSendMsg(msg); err != nil {
- s.msgCh <- msg
- }
- return
- }
- func (s *Service) busSecondRoundUpCredit(aid, cid, mid, uid int64, state, round int8, reasonID int64, reason string) (err error) {
- if mid == 0 || aid == 0 {
- return
- }
- var msg = &up.CreditLog{
- Type: round,
- Optyte: state,
- Reason: reasonID,
- BusinessType: up.CreditBusinessTypeArchive,
- MID: mid,
- OID: aid,
- UID: uid,
- Content: reason,
- Ctime: time.Now().Unix(),
- Extra: map[string]interface{}{"cid": cid},
- }
- var c = context.TODO()
- log.Info("aid(%d) start to send busSecondRoundUpCredit msg(%+v) to databus", aid, msg)
- if err = s.upCreditPub.Send(c, string(msg.OID)+string(msg.UID), msg); err != nil {
- log.Error("aid(%d) s.upCreditPub.Send(%+v) error(%v)", msg.OID, msg, err)
- }
- return
- }
- //func (s *Service) busDeleteVideo(aid int64, filename string) (err error) {
- // var msg = &message.Videoup{
- // Route: message.RouteDeleteVideo,
- // Timestamp: time.Now().Unix(),
- // Aid: aid,
- // Filename: filename,
- // }
- // log.Info("aid(%d) filename(%s) start to send deleteVideo to databus", aid, filename)
- // if err = s.busSendMsg(msg); err != nil {
- // s.msgCh <- msg
- // }
- // return
- //}
- func (s *Service) busSendMsg(msg *message.Videoup) (err error) {
- var c = context.TODO()
- switch msg.Route {
- case message.RouteFirstRound, message.RouteUGCFirstRound, message.RouteDeleteVideo:
- if err = s.videoupPub.Send(c, msg.Filename, msg); err != nil {
- log.Error("filename(%s) %s s.videoupPub.Send(%+v) error(%v)", msg.Filename, msg.Route, msg, err)
- }
- case message.RouteSecondRound, message.RouteModifyArchive, message.RouteForceSync:
- if err = s.videoupPub.Send(c, strconv.FormatInt(msg.Aid, 10), msg); err != nil {
- log.Error("aid(%d) %s s.videoupPub.Send(%+v) error(%v)", msg.Aid, msg.Route, msg, err)
- }
- default:
- log.Warn("databuserr can't process the type (%s)", msg.Route)
- }
- return
- }
- // databus err proc
- func (s *Service) msgproc() {
- // NOTE: chan
- s.wg.Add(1)
- go func() {
- var (
- c = context.TODO()
- msg *message.Videoup
- ok bool
- err error
- )
- defer s.wg.Done()
- for {
- if msg, ok = <-s.msgCh; !ok {
- log.Info("msgproc s.msgCh stop")
- return
- }
- log.Info("aid(%d) filename(%s) get msg(%+v) from s.msgCh", msg.Aid, msg.Filename, msg)
- if err = s.busSendMsg(msg); err != nil {
- s.busCache.PushMsgCache(c, msg)
- time.Sleep(100 * time.Millisecond)
- }
- }
- }()
- // NOTE: from redis list when chan error
- s.wg.Add(1)
- go func() {
- var (
- c = context.TODO()
- msg *message.Videoup
- err error
- )
- defer s.wg.Done()
- for {
- if s.closed {
- log.Info("second msgproc service is close")
- return
- }
- if msg, err = s.busCache.PopMsgCache(c); err != nil {
- log.Error("msgproc s.busCache.PopMsgCache() error(%v)", err)
- time.Sleep(100 * time.Millisecond)
- continue
- }
- if msg == nil {
- select {
- case <-time.After(3 * time.Minute):
- continue
- case <-s.stop:
- return
- }
- }
- log.Info("aid(%d) filename(%s) get msg(%+v) from redis", msg.Aid, msg.Filename, msg)
- if err = s.busSendMsg(msg); err != nil {
- s.busCache.PushMsgCache(c, msg)
- time.Sleep(100 * time.Millisecond)
- }
- }
- }()
- }
|