service.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "go-common/app/job/main/passport-game-local/conf"
  6. "go-common/library/log"
  7. "go-common/library/queue/databus"
  8. )
  9. const (
  10. // table and duration
  11. _asoAccountTable = "aso_account"
  12. )
  13. // Service service.
  14. type Service struct {
  15. c *conf.Config
  16. // aso encrypt trans pub databus
  17. dsAsoEncryptTransPub *databus.Databus
  18. // aso binlog databus
  19. dsAsoBinLogSub *databus.Databus
  20. merges []chan *message
  21. done chan []*message
  22. // proc
  23. head, last *message
  24. mu sync.Mutex
  25. }
  26. type message struct {
  27. next *message
  28. data *databus.Message
  29. object interface{}
  30. done bool
  31. }
  32. // New new a service instance.
  33. func New(c *conf.Config) (s *Service) {
  34. s = &Service{
  35. c: c,
  36. dsAsoEncryptTransPub: databus.New(c.DataBus.EncryptTransPub),
  37. dsAsoBinLogSub: databus.New(c.DataBus.AsoBinLogSub),
  38. merges: make([]chan *message, c.Group.AsoBinLog.Num),
  39. done: make(chan []*message, c.Group.AsoBinLog.Chan),
  40. }
  41. go s.asobinlogcommitproc()
  42. for i := 0; i < c.Group.AsoBinLog.Num; i++ {
  43. ch := make(chan *message, c.Group.AsoBinLog.Chan)
  44. s.merges[i] = ch
  45. go s.asobinlogmergeproc(ch)
  46. }
  47. go s.asobinlogconsumeproc()
  48. return
  49. }
  50. // Ping check server ok.
  51. func (s *Service) Ping(c context.Context) (err error) {
  52. return
  53. }
  54. // Close close service, including databus and outer service.
  55. func (s *Service) Close() (err error) {
  56. if err = s.dsAsoBinLogSub.Close(); err != nil {
  57. log.Error("srv.asoBinLog.Close() error(%v)", err)
  58. }
  59. if err = s.dsAsoEncryptTransPub.Close(); err != nil {
  60. log.Error("srv.dsAsoEncryptTransPub.Close() error(%v)", err)
  61. }
  62. return
  63. }