databus.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package service
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. "go-common/app/admin/main/videoup/model/archive"
  7. "go-common/app/admin/main/videoup/model/message"
  8. "go-common/app/admin/main/videoup/model/up"
  9. "go-common/library/log"
  10. )
  11. func (s *Service) busFirstRound(aid int64, fn, encodePurpose string, videoDesign *message.VideoDesign, status int16, encoding int8, regionID, typeID int16, fans int64, adminChange bool) (err error) {
  12. var msg = &message.Videoup{
  13. Route: message.RouteFirstRound,
  14. Fans: fans,
  15. Timestamp: time.Now().Unix(),
  16. Aid: aid,
  17. Filename: fn,
  18. Status: status,
  19. Xcode: encoding,
  20. EncodeRegionID: regionID,
  21. VideoDesign: videoDesign,
  22. AdminChange: adminChange,
  23. EncodeTypeID: typeID,
  24. }
  25. if len(encodePurpose) != 0 {
  26. msg.EncodePurpose = encodePurpose
  27. }
  28. log.Info("filename(%s) start to send firstRound(%+v) to databus", fn, msg)
  29. if err = s.busSendMsg(msg); err != nil {
  30. s.msgCh <- msg
  31. }
  32. return
  33. }
  34. func (s *Service) busUGCFirstRound(aid int64, fn, encodePurpose string, videoDesign *message.VideoDesign, status int16, encoding int8, regionID, typeID int16, fans int64, adminChange bool) (err error) {
  35. var msg = &message.Videoup{
  36. Route: message.RouteUGCFirstRound,
  37. Fans: fans,
  38. Timestamp: time.Now().Unix(),
  39. Aid: aid,
  40. Filename: fn,
  41. Status: status,
  42. Xcode: encoding,
  43. EncodeRegionID: regionID,
  44. VideoDesign: videoDesign,
  45. AdminChange: adminChange,
  46. EncodeTypeID: typeID,
  47. }
  48. if len(encodePurpose) != 0 {
  49. msg.EncodePurpose = encodePurpose
  50. }
  51. log.Info("filename(%s) start to send UGCFirstRound(%+v) to databus", fn, msg)
  52. if err = s.busSendMsg(msg); err != nil {
  53. s.msgCh <- msg
  54. }
  55. return
  56. }
  57. func (s *Service) busArchiveForceSync(aid int64) (err error) {
  58. var msg = &message.Videoup{
  59. Route: message.RouteForceSync,
  60. Timestamp: time.Now().Unix(),
  61. Aid: aid,
  62. }
  63. log.Info("aid(%d) send busArchiveForceSync to databus ", aid)
  64. if err = s.busSendMsg(msg); err != nil {
  65. s.msgCh <- msg
  66. }
  67. return
  68. }
  69. func (s *Service) busModifyArchive(aid int64, aChange, vChange bool) (err error) {
  70. var msg = &message.Videoup{
  71. Route: message.RouteModifyArchive,
  72. Timestamp: time.Now().Unix(),
  73. Aid: aid,
  74. EditArchive: aChange,
  75. EditVideo: vChange,
  76. }
  77. log.Info("aid(%d) send modifyArchive to databus by EditArchive(%v) EditVideo(%v)", aid, aChange, vChange)
  78. if err = s.busSendMsg(msg); err != nil {
  79. s.msgCh <- msg
  80. }
  81. return
  82. }
  83. func (s *Service) busSecondRound(aid, missionID int64, notify bool, email, changeTypeID, changeCopyright, changeTitle, ChangeCover bool, fromList string, ap *archive.ArcParam) (err error) {
  84. sendEmail := true
  85. if ap != nil {
  86. sendEmail = !ap.NoEmail
  87. }
  88. var msg = &message.Videoup{
  89. Route: message.RouteSecondRound,
  90. Aid: aid,
  91. Notify: notify,
  92. MissionID: missionID,
  93. Timestamp: time.Now().Unix(),
  94. AdminChange: email,
  95. ChangeTypeID: changeTypeID,
  96. ChangeCopyright: changeCopyright,
  97. ChangeTitle: changeTitle,
  98. ChangeCover: ChangeCover,
  99. FromList: fromList,
  100. SendEmail: sendEmail,
  101. }
  102. log.Info("aid(%d) start to send secondRound msg(%+v) to databus", aid, msg)
  103. if err = s.busSendMsg(msg); err != nil {
  104. s.msgCh <- msg
  105. }
  106. return
  107. }
  108. func (s *Service) busSecondRoundUpCredit(aid, cid, mid, uid int64, state, round int8, reasonID int64, reason string) (err error) {
  109. if mid == 0 || aid == 0 {
  110. return
  111. }
  112. var msg = &up.CreditLog{
  113. Type: round,
  114. Optyte: state,
  115. Reason: reasonID,
  116. BusinessType: up.CreditBusinessTypeArchive,
  117. MID: mid,
  118. OID: aid,
  119. UID: uid,
  120. Content: reason,
  121. Ctime: time.Now().Unix(),
  122. Extra: map[string]interface{}{"cid": cid},
  123. }
  124. var c = context.TODO()
  125. log.Info("aid(%d) start to send busSecondRoundUpCredit msg(%+v) to databus", aid, msg)
  126. if err = s.upCreditPub.Send(c, string(msg.OID)+string(msg.UID), msg); err != nil {
  127. log.Error("aid(%d) s.upCreditPub.Send(%+v) error(%v)", msg.OID, msg, err)
  128. }
  129. return
  130. }
  131. //func (s *Service) busDeleteVideo(aid int64, filename string) (err error) {
  132. // var msg = &message.Videoup{
  133. // Route: message.RouteDeleteVideo,
  134. // Timestamp: time.Now().Unix(),
  135. // Aid: aid,
  136. // Filename: filename,
  137. // }
  138. // log.Info("aid(%d) filename(%s) start to send deleteVideo to databus", aid, filename)
  139. // if err = s.busSendMsg(msg); err != nil {
  140. // s.msgCh <- msg
  141. // }
  142. // return
  143. //}
  144. func (s *Service) busSendMsg(msg *message.Videoup) (err error) {
  145. var c = context.TODO()
  146. switch msg.Route {
  147. case message.RouteFirstRound, message.RouteUGCFirstRound, message.RouteDeleteVideo:
  148. if err = s.videoupPub.Send(c, msg.Filename, msg); err != nil {
  149. log.Error("filename(%s) %s s.videoupPub.Send(%+v) error(%v)", msg.Filename, msg.Route, msg, err)
  150. }
  151. case message.RouteSecondRound, message.RouteModifyArchive, message.RouteForceSync:
  152. if err = s.videoupPub.Send(c, strconv.FormatInt(msg.Aid, 10), msg); err != nil {
  153. log.Error("aid(%d) %s s.videoupPub.Send(%+v) error(%v)", msg.Aid, msg.Route, msg, err)
  154. }
  155. default:
  156. log.Warn("databuserr can't process the type (%s)", msg.Route)
  157. }
  158. return
  159. }
  160. // databus err proc
  161. func (s *Service) msgproc() {
  162. // NOTE: chan
  163. s.wg.Add(1)
  164. go func() {
  165. var (
  166. c = context.TODO()
  167. msg *message.Videoup
  168. ok bool
  169. err error
  170. )
  171. defer s.wg.Done()
  172. for {
  173. if msg, ok = <-s.msgCh; !ok {
  174. log.Info("msgproc s.msgCh stop")
  175. return
  176. }
  177. log.Info("aid(%d) filename(%s) get msg(%+v) from s.msgCh", msg.Aid, msg.Filename, msg)
  178. if err = s.busSendMsg(msg); err != nil {
  179. s.busCache.PushMsgCache(c, msg)
  180. time.Sleep(100 * time.Millisecond)
  181. }
  182. }
  183. }()
  184. // NOTE: from redis list when chan error
  185. s.wg.Add(1)
  186. go func() {
  187. var (
  188. c = context.TODO()
  189. msg *message.Videoup
  190. err error
  191. )
  192. defer s.wg.Done()
  193. for {
  194. if s.closed {
  195. log.Info("second msgproc service is close")
  196. return
  197. }
  198. if msg, err = s.busCache.PopMsgCache(c); err != nil {
  199. log.Error("msgproc s.busCache.PopMsgCache() error(%v)", err)
  200. time.Sleep(100 * time.Millisecond)
  201. continue
  202. }
  203. if msg == nil {
  204. select {
  205. case <-time.After(3 * time.Minute):
  206. continue
  207. case <-s.stop:
  208. return
  209. }
  210. }
  211. log.Info("aid(%d) filename(%s) get msg(%+v) from redis", msg.Aid, msg.Filename, msg)
  212. if err = s.busSendMsg(msg); err != nil {
  213. s.busCache.PushMsgCache(c, msg)
  214. time.Sleep(100 * time.Millisecond)
  215. }
  216. }
  217. }()
  218. }