service.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/admin/main/workflow/dao"
  7. "go-common/app/admin/main/workflow/model"
  8. "go-common/library/conf/paladin"
  9. "go-common/library/log"
  10. )
  11. // Service is service.
  12. type Service struct {
  13. closed bool
  14. // dao
  15. dao *dao.Dao
  16. wg sync.WaitGroup
  17. jobCh chan func()
  18. // cache
  19. callbackCache map[int8]*model.Callback
  20. busAttrCache map[int8]*model.BusinessAttr
  21. reviewTypeName map[int64]string
  22. businessName map[string]int8
  23. tagListCache map[int8]map[int64]*model.TagMeta //map[bid]map[tid]*model.TagMeta
  24. roleCache map[int8]map[int8]string //map[bid]map[rid]name
  25. c *paladin.Map // application.toml conf
  26. }
  27. // New is workflow-admin service implementation.
  28. func New() (s *Service) {
  29. var ac = new(paladin.TOML)
  30. if err := paladin.Watch("application.toml", ac); err != nil {
  31. panic(err)
  32. }
  33. s = &Service{
  34. dao: dao.New(),
  35. wg: sync.WaitGroup{},
  36. callbackCache: make(map[int8]*model.Callback),
  37. c: ac,
  38. }
  39. s.jobCh = make(chan func(), paladin.Int(s.c.Get("chanSize"), 1024))
  40. go s.cacheproc()
  41. s.wg.Add(1)
  42. go s.jobproc()
  43. s.loadReviewTypeName()
  44. return s
  45. }
  46. // Ping check server ok.
  47. func (s *Service) Ping(c context.Context) (err error) {
  48. err = s.dao.Ping(c)
  49. return
  50. }
  51. // Close consumer close.
  52. func (s *Service) Close() {
  53. s.dao.Close()
  54. close(s.jobCh)
  55. s.closed = true
  56. s.wg.Wait()
  57. }
  58. func (s *Service) task(f func()) {
  59. select {
  60. case s.jobCh <- f:
  61. default:
  62. log.Warn("Failed to enqueue a task due to job channel is full")
  63. }
  64. }
  65. // jobproc is a job queue for executing closure.
  66. func (s *Service) jobproc() {
  67. defer s.wg.Done()
  68. for {
  69. f, ok := <-s.jobCh
  70. if !ok {
  71. log.Info("Stop job proc due to job channel is closed")
  72. return
  73. }
  74. f()
  75. }
  76. }
  77. // cacheproc goroutine
  78. func (s *Service) cacheproc() {
  79. for {
  80. s.loadCallbacks()
  81. s.loadBusAttrs()
  82. s.loadTagList()
  83. s.loadBusinessRole()
  84. time.Sleep(5 * time.Minute)
  85. }
  86. }
  87. func (s *Service) loadCallbacks() {
  88. cbs, err := s.dao.AllCallbacks(context.Background())
  89. if err != nil {
  90. log.Error("s.dao.AllCallbacks() error(%v)", err)
  91. return
  92. }
  93. cbMap := make(map[int8]*model.Callback, len(cbs))
  94. for _, cb := range cbs {
  95. cbMap[cb.Business] = cb
  96. }
  97. s.callbackCache = cbMap
  98. }
  99. // loadBusAttrs returns attributes of business
  100. func (s *Service) loadBusAttrs() (err error) {
  101. var busAttrs []*model.BusinessAttr
  102. if err = s.dao.ORM.Table("workflow_business_attr").Find(&busAttrs).Error; err != nil {
  103. log.Error("init business attr failed(%v)!", err)
  104. return
  105. }
  106. busAttrsMap := make(map[int8]*model.BusinessAttr, len(busAttrs))
  107. busName := make(map[string]int8)
  108. for _, attr := range busAttrs {
  109. busAttrsMap[int8(attr.BID)] = attr
  110. busName[attr.BusinessName] = int8(attr.BID)
  111. }
  112. s.busAttrCache = busAttrsMap
  113. s.businessName = busName
  114. return
  115. }
  116. func (s *Service) loadReviewTypeName() {
  117. s.reviewTypeName = make(map[int64]string)
  118. s.reviewTypeName[1] = "动画"
  119. s.reviewTypeName[2] = "电影"
  120. s.reviewTypeName[3] = "纪录片"
  121. s.reviewTypeName[4] = "国产动画"
  122. s.reviewTypeName[5] = "连续剧"
  123. }
  124. func (s *Service) loadTagList() (err error) {
  125. var tlc map[int8]map[int64]*model.TagMeta
  126. if tlc, err = s.dao.TagList(context.Background()); err != nil {
  127. log.Error("init manager tag failed(%v)!", err)
  128. return
  129. }
  130. s.tagListCache = tlc
  131. return
  132. }
  133. func (s *Service) loadBusinessRole() (err error) {
  134. var rc map[int8]map[int8]string
  135. if rc, err = s.dao.LoadRole(context.Background()); err != nil {
  136. log.Error("init business role failed(%v)!", err)
  137. return
  138. }
  139. s.roleCache = rc
  140. return
  141. }