service.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package block
  2. import (
  3. "context"
  4. "runtime/debug"
  5. "go-common/app/admin/main/member/conf"
  6. "go-common/app/admin/main/member/dao/block"
  7. account "go-common/app/service/main/account/api"
  8. rpcfigure "go-common/app/service/main/figure/rpc/client"
  9. rpcspy "go-common/app/service/main/spy/rpc/client"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. "go-common/library/sync/pipeline/fanout"
  13. )
  14. // Service struct
  15. type Service struct {
  16. conf *conf.Config
  17. dao *block.Dao
  18. cache *fanout.Fanout
  19. spyRPC *rpcspy.Service
  20. figureRPC *rpcfigure.Service
  21. accountClient account.AccountClient
  22. missch chan func()
  23. accountNotifyPub *databus.Databus
  24. }
  25. // New init
  26. func New(conf *conf.Config, dao *block.Dao, spyRPC *rpcspy.Service, figureRPC *rpcfigure.Service,
  27. accountClient account.AccountClient, accountNotifyPub *databus.Databus) (s *Service) {
  28. s = &Service{
  29. conf: conf,
  30. dao: dao,
  31. cache: fanout.New("memberAdminCache", fanout.Worker(1), fanout.Buffer(10240)),
  32. missch: make(chan func(), 10240),
  33. accountNotifyPub: accountNotifyPub,
  34. spyRPC: spyRPC,
  35. figureRPC: figureRPC,
  36. accountClient: accountClient,
  37. }
  38. go s.missproc()
  39. return s
  40. }
  41. func (s *Service) missproc() {
  42. defer func() {
  43. if x := recover(); x != nil {
  44. log.Error("service.missproc panic(%+v) : %s", x, debug.Stack())
  45. go s.missproc()
  46. }
  47. }()
  48. for {
  49. f := <-s.missch
  50. f()
  51. }
  52. }
  53. func (s *Service) mission(f func()) {
  54. select {
  55. case s.missch <- f:
  56. default:
  57. log.Error("s.missch full")
  58. }
  59. }
  60. // Ping Service
  61. func (s *Service) Ping(c context.Context) (err error) {
  62. return s.dao.Ping(c)
  63. }
  64. // Close Service
  65. func (s *Service) Close() {
  66. s.dao.Close()
  67. }