service.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "go-common/app/job/main/workflow/conf"
  6. "go-common/app/job/main/workflow/dao"
  7. "go-common/app/job/main/workflow/model"
  8. "go-common/library/sync/pipeline/fanout"
  9. )
  10. // Service struct of service.
  11. type Service struct {
  12. c *conf.Config
  13. dao *dao.Dao
  14. wg *sync.WaitGroup
  15. closeCh chan struct{}
  16. businessAttr []*model.BusinessAttr
  17. // cache
  18. cache *fanout.Fanout
  19. }
  20. // New create service instance and return.
  21. func New(c *conf.Config) (s *Service) {
  22. s = &Service{
  23. c: c,
  24. dao: dao.New(c),
  25. wg: &sync.WaitGroup{},
  26. closeCh: make(chan struct{}),
  27. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  28. }
  29. var err error
  30. if s.businessAttr, err = s.dao.BusinessAttr(context.Background()); err != nil {
  31. panic(err)
  32. }
  33. //s.wg.Add(1)
  34. //go s.expireproc(context.Background())
  35. go s.queueproc(context.Background(), _feedbackDealType)
  36. go s.taskExpireproc(context.Background(), _feedbackDealType)
  37. go s.repairQueueproc(context.Background(), _feedbackDealType)
  38. // push
  39. go s.notifyproc(context.Background())
  40. // 单条申诉过期
  41. go s.singleExpireproc()
  42. // 整体申诉过期
  43. go s.overallExpireproc()
  44. // 释放用户未评价反馈
  45. go s.releaseExpireproc()
  46. // 刷新权重值
  47. go s.refreshWeightproc()
  48. // 进任务池
  49. go s.enterPoolproc()
  50. return
  51. }
  52. // Ping check service health.
  53. func (s *Service) Ping(c context.Context) error {
  54. return s.dao.Ping(c)
  55. }
  56. // Close related backend.
  57. func (s *Service) Close() (err error) {
  58. err = s.dao.Close()
  59. close(s.closeCh)
  60. s.wg.Wait()
  61. return
  62. }