service.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package service
  2. import (
  3. "context"
  4. "math"
  5. "sync"
  6. "time"
  7. "go-common/app/job/main/videoup/conf"
  8. "go-common/app/job/main/videoup/dao/activity"
  9. "go-common/app/job/main/videoup/dao/archive"
  10. "go-common/app/job/main/videoup/dao/bvc"
  11. "go-common/app/job/main/videoup/dao/manager"
  12. "go-common/app/job/main/videoup/dao/message"
  13. "go-common/app/job/main/videoup/dao/monitor"
  14. "go-common/app/job/main/videoup/dao/redis"
  15. mngmdl "go-common/app/job/main/videoup/model/manager"
  16. accApi "go-common/app/service/main/account/api"
  17. "go-common/library/conf/env"
  18. "go-common/library/log"
  19. "go-common/library/queue/databus"
  20. "go-common/library/stat/prom"
  21. "github.com/pkg/errors"
  22. )
  23. // Service is service.
  24. type Service struct {
  25. c *conf.Config
  26. // wait group
  27. wg sync.WaitGroup
  28. // acc rpc
  29. accRPC accApi.AccountClient
  30. // dao
  31. arc *archive.Dao
  32. mng *manager.Dao
  33. msg *message.Dao
  34. redis *redis.Dao
  35. monitor *monitor.Dao
  36. activity *activity.Dao
  37. bvc *bvc.Dao
  38. // databus sub
  39. bvc2VuSub *databus.Databus
  40. videoupSub *databus.Databus
  41. arcResultSub *databus.Databus
  42. videoshotSub2 *databus.Databus
  43. // videoupSub 幂等判断
  44. videoupSubIdempotent map[int32]int64
  45. statSub *databus.Databus
  46. // databus pub
  47. videoupPub *databus.Databus
  48. blogPub *databus.Databus
  49. // cache: type, upper
  50. sfTpsCache map[int16]int16
  51. TypeMap map[int16]string
  52. adtTpsCache map[int16]struct{}
  53. thrTpsCache map[int16]int
  54. thrMin, thrMax int
  55. upperCache map[int8]map[int64]struct{}
  56. fansCache int
  57. roundTpsCache map[int16]struct{}
  58. roundDelayCache int64
  59. delayRoundMinTime time.Time
  60. specialUp map[int64]struct{}
  61. // monitor
  62. bvc2VuMo int64
  63. bvc2VuDelayMo int64
  64. videoupMo int64
  65. arcResultMo int64
  66. statMo int64
  67. //prom moni
  68. promDatabus *prom.Prom
  69. promRetry *prom.Prom
  70. //统计差值
  71. promVideoS *prom.Prom
  72. promVideoE *prom.Prom
  73. promPanic *prom.Prom
  74. // closed
  75. closed bool
  76. }
  77. // New is videoup service implementation.
  78. func New(c *conf.Config) (s *Service) {
  79. s = &Service{
  80. c: c,
  81. // dao
  82. arc: archive.New(c),
  83. mng: manager.New(c),
  84. msg: message.New(c),
  85. redis: redis.New(c),
  86. monitor: monitor.New(c),
  87. activity: activity.New(c),
  88. bvc: bvc.New(c),
  89. bvc2VuSub: databus.New(c.Bvc2VuSub),
  90. videoupSubIdempotent: make(map[int32]int64),
  91. videoupSub: databus.New(c.VideoupSub),
  92. arcResultSub: databus.New(c.ArcResultSub),
  93. statSub: databus.New(c.StatSub),
  94. // databus pub
  95. videoupPub: databus.New(c.VideoupPub),
  96. blogPub: databus.New(c.BlogPub),
  97. promDatabus: prom.BusinessInfoCount,
  98. promVideoS: prom.CacheHit,
  99. promVideoE: prom.CacheMiss,
  100. promPanic: prom.CacheMiss,
  101. promRetry: prom.BusinessErrCount,
  102. }
  103. var err error
  104. if s.accRPC, err = accApi.NewClient(c.AccRPC); err != nil {
  105. panic(err)
  106. }
  107. s.specialUp = make(map[int64]struct{}, len(c.SpecialUp))
  108. for _, mid := range c.SpecialUp {
  109. s.specialUp[mid] = struct{}{}
  110. }
  111. // load cache
  112. s.loadType()
  113. s.loadUpper()
  114. s.loadConf()
  115. s.wg.Add(1)
  116. go s.bvc2VuConsumer()
  117. s.wg.Add(1)
  118. go s.videoupConsumer()
  119. s.wg.Add(1)
  120. go s.statConsumer()
  121. s.wg.Add(1)
  122. go s.arcResultConsumer()
  123. if env.DeployEnv == env.DeployEnvProd {
  124. s.videoshotSub2 = databus.New(c.VideoshotSub2)
  125. s.wg.Add(1)
  126. go s.videoshotSHConsumer()
  127. }
  128. s.wg.Add(1)
  129. go s.retryproc()
  130. s.wg.Add(1)
  131. go s.QueueProc()
  132. s.wg.Add(1)
  133. go s.delayproc()
  134. s.wg.Add(1)
  135. go s.roundproc()
  136. go s.cacheproc()
  137. go s.monitorConsume()
  138. go s.edithistoryproc()
  139. return s
  140. }
  141. // Ping ping service.
  142. func (s *Service) Ping(c context.Context) (err error) {
  143. return s.arc.Ping(c)
  144. }
  145. //Rescue runtime panic rescue
  146. func (s *Service) Rescue(data interface{}) {
  147. r := recover()
  148. if r != nil {
  149. r = errors.WithStack(r.(error))
  150. log.Error("Runtime error caught: %+v and data is %+v", r, data)
  151. s.promPanic.Incr("panic")
  152. }
  153. }
  154. func (s *Service) edithistoryproc() {
  155. for {
  156. time.Sleep(nextDay(5))
  157. for {
  158. rows, _ := s.delArcEditHistory(100)
  159. time.Sleep(1 * time.Second)
  160. if rows == 0 {
  161. break
  162. }
  163. }
  164. for {
  165. rows, _ := s.delArcVideoEditHistory(100)
  166. time.Sleep(1 * time.Second)
  167. if rows == 0 {
  168. break
  169. }
  170. }
  171. }
  172. }
  173. // Until next day x hours
  174. func nextDay(hour int) time.Duration {
  175. n := time.Now().Add(24 * time.Hour)
  176. d := time.Date(n.Year(), n.Month(), n.Day(), hour, 0, 0, 0, n.Location())
  177. return time.Until(d)
  178. }
  179. // Close consumer close.
  180. func (s *Service) Close() {
  181. s.bvc2VuSub.Close()
  182. s.videoupSub.Close()
  183. s.arcResultSub.Close()
  184. s.statSub.Close()
  185. if env.DeployEnv == env.DeployEnvProd {
  186. s.videoshotSub2.Close()
  187. }
  188. s.closed = true
  189. s.wg.Wait()
  190. s.redis.Close()
  191. }
  192. func (s *Service) isMission(c context.Context, aid int64) bool {
  193. if addit, _ := s.arc.Addit(c, aid); addit != nil && addit.MissionID > 0 {
  194. return true
  195. }
  196. return false
  197. }
  198. func (s *Service) isWhite(mid int64) bool {
  199. if ups, ok := s.upperCache[mngmdl.UpperTypeWhite]; ok {
  200. _, isWhite := ups[mid]
  201. return isWhite
  202. }
  203. return false
  204. }
  205. func (s *Service) isBlack(mid int64) bool {
  206. if ups, ok := s.upperCache[mngmdl.UpperTypeBlack]; ok {
  207. _, isBlack := ups[mid]
  208. return isBlack
  209. }
  210. return false
  211. }
  212. func (s *Service) isAuditType(tpID int16) bool {
  213. _, isAt := s.adtTpsCache[tpID]
  214. return isAt
  215. }
  216. func (s *Service) loadType() {
  217. tpMap, err := s.arc.TypeMapping(context.TODO())
  218. if err != nil {
  219. log.Error("s.arc.TypeMapping error(%v)", err)
  220. return
  221. }
  222. s.sfTpsCache = tpMap
  223. log.Info("s.sfTpsCache Data is (%+v)", s.sfTpsCache)
  224. tpNaming, err := s.arc.TypeNaming(context.TODO())
  225. if err != nil {
  226. log.Error("s.arc.TypeNaming error(%v)", err)
  227. return
  228. }
  229. s.TypeMap = tpNaming
  230. log.Info("s.TypeMap Data is (%+v)", s.TypeMap)
  231. // audit types
  232. adt, err := s.arc.AuditTypesConf(context.TODO())
  233. if err != nil {
  234. log.Error("s.arc.AuditTypesConf error(%v)", err)
  235. return
  236. }
  237. s.adtTpsCache = adt
  238. log.Info("s.adtTpsCache Data is (%+v)", s.adtTpsCache)
  239. // threshold
  240. thr, err := s.arc.ThresholdConf(context.TODO())
  241. if err != nil {
  242. log.Error("s.arc.ThresholdConf error(%v)", err)
  243. return
  244. }
  245. s.thrTpsCache = thr
  246. log.Info("s.thrTpsCache Data is (%+v)", s.thrTpsCache)
  247. var min, max = math.MaxInt32, 0
  248. for _, t := range thr {
  249. if min > t {
  250. min = t
  251. }
  252. if max < t {
  253. max = t
  254. }
  255. }
  256. s.thrMin = min
  257. s.thrMax = max
  258. }
  259. func (s *Service) loadUpper() {
  260. var (
  261. c = context.TODO()
  262. )
  263. upm, err := s.mng.Uppers(c)
  264. if err != nil {
  265. log.Error("s.mng.Uppers error(%v)", err)
  266. return
  267. }
  268. s.upperCache = upm
  269. }
  270. func (s *Service) isRoundType(tpID int16) bool {
  271. _, in := s.roundTpsCache[tpID]
  272. return in
  273. }
  274. func (s *Service) loadConf() {
  275. var (
  276. fans int64
  277. days int64
  278. err error
  279. roundTypes map[int16]struct{}
  280. )
  281. if fans, err = s.arc.FansConf(context.TODO()); err != nil {
  282. log.Error("s.arc.FansConf error(%v)", err)
  283. return
  284. }
  285. s.fansCache = int(fans)
  286. if roundTypes, err = s.arc.RoundTypeConf(context.TODO()); err != nil {
  287. log.Error("s.arc.RoundTypeConf error(%v)", err)
  288. return
  289. }
  290. s.roundTpsCache = roundTypes
  291. if days, err = s.arc.RoundEndConf(context.TODO()); err != nil {
  292. log.Error("s.arc.RoundEndConf")
  293. return
  294. }
  295. s.roundDelayCache = days
  296. }
  297. func (s *Service) cacheproc() {
  298. for {
  299. time.Sleep(1 * time.Minute)
  300. s.loadType()
  301. s.loadUpper()
  302. s.loadConf()
  303. }
  304. }
  305. func (s *Service) monitorConsume() {
  306. if env.DeployEnv != env.DeployEnvProd {
  307. return
  308. }
  309. var bvc2Vu, videoup, arcResult, stat, bvc2VuDelay int64
  310. for {
  311. time.Sleep(1 * time.Minute)
  312. if s.bvc2VuMo-bvc2Vu == 0 {
  313. s.monitor.Send(context.TODO(), "video-job bvc2Video did not consume within a minute")
  314. }
  315. if s.videoupMo-videoup == 0 {
  316. s.monitor.Send(context.TODO(), "video-job videoup did not consume within a minute")
  317. }
  318. if s.arcResultMo-arcResult == 0 {
  319. s.monitor.Send(context.TODO(), "video-job arcResult did not consume within a minute")
  320. }
  321. if s.statMo-stat == 0 {
  322. s.monitor.Send(context.TODO(), "video-job stat did not consume within a minute")
  323. }
  324. if s.bvc2VuDelayMo-bvc2VuDelay > 0 {
  325. s.monitor.Send(context.TODO(), "video-job bvc2videoup consume delayed.")
  326. }
  327. bvc2Vu = s.bvc2VuMo
  328. videoup = s.videoupMo
  329. arcResult = s.arcResultMo
  330. stat = s.statMo
  331. bvc2VuDelay = s.bvc2VuDelayMo
  332. }
  333. }