service.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/admin/main/usersuit/conf"
  7. "go-common/app/admin/main/usersuit/dao"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. account "go-common/app/service/main/account/api"
  11. )
  12. // Service struct of service.
  13. type Service struct {
  14. d *dao.Dao
  15. // wait group
  16. wg sync.WaitGroup
  17. // conf
  18. c *conf.Config
  19. accountClient account.AccountClient
  20. // databus pub
  21. accountNotifyPub *databus.Databus
  22. Managers map[int64]string
  23. asynch chan func()
  24. }
  25. // New create service instance and return.
  26. func New(c *conf.Config) (s *Service) {
  27. s = &Service{
  28. c: c,
  29. d: dao.New(c),
  30. asynch: make(chan func(), 102400),
  31. accountNotifyPub: databus.New(c.AccountNotify),
  32. }
  33. var err error
  34. if s.accountClient, err = account.NewClient(c.AccountGRPC); err != nil {
  35. panic(err)
  36. }
  37. s.loadManager()
  38. s.wg.Add(1)
  39. go s.asynproc()
  40. go s.loadmanagerproc()
  41. return
  42. }
  43. func (s *Service) loadmanagerproc() {
  44. for {
  45. time.Sleep(1 * time.Hour)
  46. s.loadManager()
  47. }
  48. }
  49. func (s *Service) loadManager() {
  50. managers, err := s.d.Managers(context.TODO())
  51. if err != nil {
  52. log.Error("s.Managers error(%v)", err)
  53. return
  54. }
  55. s.Managers = managers
  56. }
  57. // Close dao.
  58. func (s *Service) Close() {
  59. s.d.Close()
  60. close(s.asynch)
  61. time.Sleep(1 * time.Second)
  62. s.wg.Wait()
  63. }
  64. // Ping check server ok.
  65. func (s *Service) Ping(c context.Context) (err error) {
  66. err = s.d.Ping(c)
  67. return
  68. }
  69. func (s *Service) addAsyn(f func()) {
  70. select {
  71. case s.asynch <- f:
  72. default:
  73. log.Warn("asynproc chan full")
  74. }
  75. }
  76. // cacheproc is a routine for executing closure.
  77. func (s *Service) asynproc() {
  78. defer s.wg.Done()
  79. for {
  80. f, ok := <-s.asynch
  81. if !ok {
  82. return
  83. }
  84. f()
  85. }
  86. }