service.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "go-common/app/job/main/passport-encrypt/conf"
  6. "go-common/app/job/main/passport-encrypt/dao"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. )
  10. const (
  11. // table and duration
  12. _asoAccountTable = "aso_account"
  13. _insertAction = "insert"
  14. _updateAction = "update"
  15. _deleteAction = "delete"
  16. )
  17. // Service service.
  18. type Service struct {
  19. c *conf.Config
  20. d *dao.Dao
  21. // aso binlog databus
  22. dsAsoBinLogSub *databus.Databus
  23. merges []chan *message
  24. done chan []*message
  25. // proc
  26. head, last *message
  27. mu sync.Mutex
  28. }
  29. type message struct {
  30. next *message
  31. data *databus.Message
  32. object interface{}
  33. done bool
  34. }
  35. // New new a service instance.
  36. func New(c *conf.Config) (s *Service) {
  37. s = &Service{
  38. c: c,
  39. d: dao.New(c),
  40. dsAsoBinLogSub: databus.New(c.DataBus.AsoBinLogSub),
  41. merges: make([]chan *message, c.Group.AsoBinLog.Num),
  42. done: make(chan []*message, c.Group.AsoBinLog.Chan),
  43. }
  44. if c.DataSwitch.Full {
  45. go s.fullMigration(c.StepGroup.Group1.Start, c.StepGroup.Group1.End, c.StepGroup.Group1.Inc, c.StepGroup.Group1.Limit, "group1")
  46. go s.fullMigration(c.StepGroup.Group2.Start, c.StepGroup.Group2.End, c.StepGroup.Group2.Inc, c.StepGroup.Group2.Limit, "group2")
  47. go s.fullMigration(c.StepGroup.Group3.Start, c.StepGroup.Group3.End, c.StepGroup.Group3.Inc, c.StepGroup.Group3.Limit, "group3")
  48. go s.fullMigration(c.StepGroup.Group4.Start, c.StepGroup.Group4.End, c.StepGroup.Group4.Inc, c.StepGroup.Group4.Limit, "group4")
  49. }
  50. if c.DataSwitch.Inc {
  51. go s.asobinlogcommitproc()
  52. for i := 0; i < c.Group.AsoBinLog.Num; i++ {
  53. ch := make(chan *message, c.Group.AsoBinLog.Chan)
  54. s.merges[i] = ch
  55. go s.asobinlogmergeproc(ch)
  56. }
  57. go s.asobinlogconsumeproc()
  58. }
  59. return
  60. }
  61. // Ping check server ok.
  62. func (s *Service) Ping(c context.Context) (err error) {
  63. return
  64. }
  65. // Close close service, including databus and outer service.
  66. func (s *Service) Close() (err error) {
  67. if err = s.dsAsoBinLogSub.Close(); err != nil {
  68. log.Error("srv.asoBinLog.Close() error(%v)", err)
  69. }
  70. return
  71. }