video.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package archive
  2. import (
  3. "sync"
  4. xtime "time"
  5. "go-common/library/log"
  6. "go-common/library/time"
  7. )
  8. // const Video Status
  9. const (
  10. // video xcode and dispatch state.
  11. VideoUploadInfo = int8(0)
  12. VideoXcodeSDFail = int8(1)
  13. VideoXcodeSDFinish = int8(2)
  14. VideoXcodeHDFail = int8(3)
  15. VideoXcodeHDFinish = int8(4)
  16. VideoDispatchRunning = int8(5)
  17. VideoDispatchFinish = int8(6)
  18. // video status.
  19. VideoStatusOpen = int16(0)
  20. VideoStatusAccess = int16(10000)
  21. VideoStatusWait = int16(-1)
  22. VideoStatusRecicle = int16(-2)
  23. VideoStatusLock = int16(-4)
  24. VideoStatusXcodeFail = int16(-16)
  25. VideoStatusSubmit = int16(-30)
  26. VideoStatusUploadSubmit = int16(-50)
  27. VideoStatusDelete = int16(-100)
  28. // xcode fail
  29. XcodeFailZero = 0
  30. )
  31. // Video is archive_video model.
  32. type Video struct {
  33. ID int64 `json:"-"`
  34. Aid int64 `json:"aid"`
  35. Title string `json:"title"`
  36. Desc string `json:"desc"`
  37. Filename string `json:"filename"`
  38. SrcType string `json:"src_type"`
  39. Cid int64 `json:"cid"`
  40. Sid int64 `json:"-"`
  41. Duration int64 `json:"duration"`
  42. Filesize int64 `json:"-"`
  43. Resolutions string `json:"-"`
  44. Index int `json:"index"`
  45. Playurl string `json:"-"`
  46. Status int16 `json:"status"`
  47. FailCode int8 `json:"fail_code"`
  48. XcodeState int8 `json:"xcode_state"`
  49. Attribute int32 `json:"-"`
  50. RejectReason string `json:"reject_reason"`
  51. CTime time.Time `json:"ctime"`
  52. MTime time.Time `json:"-"`
  53. Dimension *Dimension `json:"dimension"`
  54. }
  55. // Dimension Archive video dimension
  56. type Dimension struct {
  57. Width int64 `json:"width"`
  58. Height int64 `json:"height"`
  59. Rotate int64 `json:"rotate"`
  60. }
  61. // SimpleVideo for Archive History
  62. type SimpleVideo struct {
  63. Cid int64 `json:"cid"`
  64. Index int `json:"part_id"`
  65. Title string `json:"part_name"`
  66. Status int16 `json:"status"`
  67. MTime time.Time `json:"dm_modified"`
  68. }
  69. // VideoFn for Archive Video table filename check
  70. type VideoFn struct {
  71. Cid int64 `json:"cid"`
  72. Filename string `json:"filename"`
  73. CTime time.Time `json:"ctime"`
  74. MTime time.Time `json:"mtime"`
  75. }
  76. type param struct {
  77. undoneCount int // 待完成数总和
  78. failCount int // 错误数总和
  79. timeout *xtime.Timer
  80. }
  81. /*VideosEditor 处理超多分p的稿件编辑
  82. * 0.对该稿件的编辑操作加锁
  83. * 1.将所有视频分组分事务更新
  84. * 2.全部更新成功后发送成功信号量
  85. * 3.更新错误的分组多次尝试,超时或超过次数后失败
  86. * 4.收到更新成功信号量进行回调-(1.同步信息给视频云 2.解锁该稿件编辑)
  87. * 5.收到更新失败信号量进行回调- (1.记录错误日志信息 2.推送错误消息 3.解锁该稿件编辑)
  88. */
  89. type VideosEditor struct {
  90. sync.Mutex
  91. failTH int
  92. closeFlag bool
  93. wg sync.WaitGroup
  94. params map[int64]*param
  95. cbSuccess map[int64]func()
  96. cbFail map[int64]func()
  97. sigSuccess chan int64
  98. sigFail chan int64
  99. chanRetry chan func() (int64, int, int, error)
  100. closechan, cbclose chan struct{}
  101. }
  102. // NewEditor new VideosEditor
  103. func NewEditor(failTH int) *VideosEditor {
  104. editor := &VideosEditor{
  105. failTH: failTH,
  106. wg: sync.WaitGroup{},
  107. params: make(map[int64]*param),
  108. cbSuccess: make(map[int64]func()),
  109. cbFail: make(map[int64]func()),
  110. sigSuccess: make(chan int64, 10),
  111. sigFail: make(chan int64, 10),
  112. chanRetry: make(chan func() (int64, int, int, error), 100),
  113. closechan: make(chan struct{}, 1),
  114. cbclose: make(chan struct{}),
  115. }
  116. editor.wg.Add(1)
  117. go editor.consumerRetry(&editor.wg)
  118. editor.wg.Add(1)
  119. go editor.consumercb(&editor.wg)
  120. return editor
  121. }
  122. // Close 等待所有消息消费完才退出
  123. func (m *VideosEditor) Close() {
  124. m.closeFlag = true
  125. m.closechan <- struct{}{}
  126. m.wg.Wait()
  127. }
  128. // Add add to editor
  129. func (m *VideosEditor) Add(aid int64, cbSuccess, cbFail func(), timeout xtime.Duration, retrys ...func() (int64, int, int, error)) {
  130. if m.closeFlag {
  131. log.Warn("VideosEditor closed")
  132. return
  133. }
  134. log.Info("VideosEditor Add(%d) len(%d)", aid, len(retrys))
  135. timer := xtime.AfterFunc(timeout, func() { m.notifyTimeout(aid) })
  136. m.params[aid] = &param{
  137. undoneCount: len(retrys),
  138. failCount: 0,
  139. timeout: timer,
  140. }
  141. m.cbSuccess[aid] = cbSuccess
  142. m.cbFail[aid] = cbFail
  143. for _, ry := range retrys {
  144. m.addRetry(ry, 0)
  145. }
  146. }
  147. // NotifySuccess notify success
  148. func (m *VideosEditor) notifySuccess(aid int64) {
  149. m.Lock()
  150. defer m.Unlock()
  151. param, ok := m.params[aid]
  152. if ok {
  153. param.undoneCount--
  154. if param.undoneCount <= 0 {
  155. log.Info("notifySuccess(%d) undoneCount(%d)", aid, param.undoneCount)
  156. m.sigSuccess <- aid
  157. }
  158. }
  159. }
  160. // NotifyFail 返回值表示达到触发阈值
  161. func (m *VideosEditor) notifyFail(aid int64) (retry bool) {
  162. m.Lock()
  163. defer m.Unlock()
  164. param, ok := m.params[aid]
  165. if ok {
  166. retry = true
  167. param.failCount++
  168. if param.failCount >= m.failTH {
  169. log.Warn("notifyFail(%d) failCount(%d)", aid, param.failCount)
  170. retry = false
  171. if _, ok := m.cbFail[aid]; ok {
  172. m.sigFail <- aid
  173. }
  174. }
  175. }
  176. return
  177. }
  178. func (m *VideosEditor) notifyTimeout(aid int64) {
  179. log.Warn("notifyTimeout(%d)", aid)
  180. m.Lock()
  181. defer m.Unlock()
  182. _, ok := m.params[aid]
  183. if ok {
  184. if _, ok := m.cbFail[aid]; ok {
  185. m.sigFail <- aid
  186. }
  187. }
  188. }
  189. func (m *VideosEditor) consumercb(g *sync.WaitGroup) {
  190. defer g.Done()
  191. for {
  192. select {
  193. case aid, ok := <-m.sigSuccess:
  194. if !ok {
  195. log.Info("consumercb close")
  196. return
  197. }
  198. if f, ok := m.cbSuccess[aid]; ok {
  199. f()
  200. m.release(aid)
  201. }
  202. case aid, ok := <-m.sigFail:
  203. if !ok {
  204. log.Info("consumercb close")
  205. return
  206. }
  207. if f, ok := m.cbFail[aid]; ok {
  208. f()
  209. m.release(aid)
  210. }
  211. case <-m.closechan:
  212. if len(m.cbFail) == 0 && len(m.cbSuccess) == 0 {
  213. m.cbclose <- struct{}{}
  214. log.Info("consumercb closechan")
  215. return
  216. }
  217. xtime.Sleep(50 * xtime.Millisecond)
  218. m.closechan <- struct{}{}
  219. }
  220. }
  221. }
  222. func (m *VideosEditor) consumerRetry(g *sync.WaitGroup) {
  223. defer g.Done()
  224. for {
  225. log.Info("consumerRetry")
  226. select {
  227. case <-m.cbclose:
  228. log.Info("consumerRetry closed")
  229. return
  230. case f, ok := <-m.chanRetry:
  231. if !ok {
  232. log.Info("m.chanRetry closed")
  233. break
  234. }
  235. id, head, tail, err := f()
  236. log.Info("consumerRetry(%d) head(%d) tail(%d) err(%v) ", id, head, tail, err)
  237. if err != nil {
  238. if m.notifyFail(id) {
  239. go func() {
  240. xtime.Sleep(3 * xtime.Second)
  241. m.addRetry(f, 3)
  242. }()
  243. }
  244. } else {
  245. m.notifySuccess(id)
  246. }
  247. }
  248. }
  249. }
  250. func (m *VideosEditor) addRetry(f func() (int64, int, int, error), asynctime int) {
  251. if asynctime == 0 {
  252. m.chanRetry <- f
  253. return
  254. }
  255. go func() {
  256. xtime.Sleep(3 * xtime.Second)
  257. m.chanRetry <- f
  258. }()
  259. }
  260. func (m *VideosEditor) release(aid int64) {
  261. if p, ok := m.params[aid]; ok {
  262. p.timeout.Stop()
  263. }
  264. delete(m.cbFail, aid)
  265. delete(m.cbSuccess, aid)
  266. }