service.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package service
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "go-common/library/conf/env"
  7. "go-common/library/queue/databus"
  8. "reflect"
  9. "time"
  10. "go-common/app/job/bbq/video/conf"
  11. "go-common/app/job/bbq/video/dao"
  12. "go-common/library/log"
  13. topic "go-common/app/service/bbq/topic/api"
  14. "github.com/robfig/cron"
  15. )
  16. var (
  17. srvName string
  18. )
  19. // Service struct
  20. type Service struct {
  21. c *conf.Config
  22. dao *dao.Dao
  23. searchChan chan string
  24. videoSub *databus.Databus
  25. videoRep *databus.Databus
  26. bvcSub *databus.Databus
  27. scheFunc map[string]func()
  28. topicClient topic.TopicClient
  29. }
  30. func init() {
  31. flag.StringVar(&srvName, "srv", "", "service name")
  32. }
  33. func newTopicClient() topic.TopicClient {
  34. topicClient, err := topic.NewClient(nil)
  35. if err != nil {
  36. log.Errorw(context.Background(), "log", "get topic client fail")
  37. panic(err)
  38. }
  39. return topicClient
  40. }
  41. // New init
  42. func New(c *conf.Config) (s *Service) {
  43. s = &Service{
  44. c: c,
  45. dao: dao.New(c),
  46. searchChan: make(chan string, 1),
  47. topicClient: newTopicClient(),
  48. }
  49. s.scheFunc = s.initScheduleFunc()
  50. if srvName != "" {
  51. switch srvName {
  52. case "test":
  53. s.Test()
  54. case "syncsv2es":
  55. s.taskSyncVideo2ES()
  56. case "syncuserdmg":
  57. s.taskSyncUserDmg()
  58. case "rminvalides":
  59. s.taskRmInvalidES()
  60. case "regcmtall":
  61. s.AutoRegAll(context.Background())
  62. case "syncuserbase":
  63. s.taskSyncUsrBaseFromVideo(context.Background())
  64. case "syncuserbasic":
  65. s.taskSyncPegasusUserBasic()
  66. case "syncsearchvideo":
  67. s.SyncVideo2Search()
  68. case "syncsearchuser":
  69. s.SyncUser2Search()
  70. case "syncsearchsug":
  71. s.SyncSug2Search()
  72. case "upubface":
  73. s.UpdateUsrBaseFace()
  74. case "SysMsgTask":
  75. s.SysMsgTask()
  76. case "UserProfile":
  77. s.UserProfileUpdate()
  78. case "pushbvc":
  79. s.commitCID()
  80. case "cmscheckback":
  81. s.TransToCheckBack()
  82. }
  83. return s
  84. }
  85. //初始化databus
  86. s.initDatabus()
  87. //启动相关rountine
  88. s.launchCor()
  89. //定时任务启动
  90. if env.DeployEnv == env.DeployEnvProd {
  91. s.runScheduler(c.Scheduler)
  92. }
  93. return s
  94. }
  95. //runScheduler .1
  96. func (s *Service) runScheduler(c *conf.Scheduler) {
  97. sche := cron.New()
  98. t := reflect.TypeOf(*c)
  99. v := reflect.ValueOf(*c)
  100. for i := 0; i < v.NumField(); i++ {
  101. //排除配置为空的任务
  102. if job := v.Field(i).String(); job != "" {
  103. fn := t.Field(i).Name
  104. //从映射集中取出对应函数
  105. if f, ok := s.scheFunc[fn]; !ok {
  106. fmt.Printf("skip[%s]\n", fn)
  107. continue
  108. } else {
  109. fmt.Printf("run[%s]\n", fn)
  110. if err := sche.AddFunc(job, f); err != nil {
  111. panic(err)
  112. }
  113. }
  114. }
  115. }
  116. sche.Start()
  117. }
  118. //Test 测试
  119. func (s *Service) Test() {
  120. log.Info("HeartBeat:%s", time.Now())
  121. }
  122. // Ping Service
  123. func (s *Service) Ping(c context.Context) (err error) {
  124. return s.dao.Ping(c)
  125. }
  126. // Close Service
  127. func (s *Service) Close() {
  128. s.dao.Close()
  129. }
  130. // initScheduleFunc 任务结构与函数映射关系
  131. // map {key: conf.Scheduler结构体中字段名 value: 对应执行的函数}
  132. func (s *Service) initScheduleFunc() map[string]func() {
  133. return map[string]func(){
  134. "Test": s.Test,
  135. "CheckVideo2ES": s.taskSyncVideo2ES,
  136. "SyncUserDmg": s.taskSyncUserDmg,
  137. "SyncUpUserDmg": s.taskSyncUpUserDmg,
  138. "CheckVideo": s.taskCheckVideo,
  139. "CheckVideoSt": s.taskCheckVideoStatistics,
  140. "CheckVideoStHv": s.taskCheckVideoStatisticsHive,
  141. "CheckVideoTag": s.taskCheckVideoTag,
  142. "CheckTag": s.taskCheckTag,
  143. "SyncUsrSta": s.taskSyncUsrStaFromHive,
  144. "SysMsgTask": s.SysMsgTask,
  145. "UserProfileBbq": s.UserProfileUpdate,
  146. "TransToReview": s.TransToReview,
  147. "TransToCheckBack": s.TransToCheckBack,
  148. }
  149. }
  150. func (s *Service) launchCor() {
  151. time.Sleep(time.Second * 3)
  152. if env.DeployEnv == env.DeployEnvProd {
  153. go s.SyncSearch()
  154. }
  155. go s.videoBinlogSub()
  156. go s.videoRepositoryBinlogSub()
  157. go s.BvcTransSub()
  158. }
  159. func (s *Service) initDatabus() {
  160. s.videoSub = databus.New(conf.Conf.Databus["videosub"])
  161. s.videoRep = databus.New(conf.Conf.Databus["videorep"])
  162. s.bvcSub = databus.New(conf.Conf.Databus["bvcsub"])
  163. }