service.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "go-common/app/job/main/relation-cache/conf"
  7. "go-common/app/job/main/relation-cache/dao"
  8. "go-common/library/queue/databus"
  9. )
  10. // Service struct
  11. type Service struct {
  12. c *conf.Config
  13. dao *dao.Dao
  14. relationBinLog *databus.Databus
  15. waiter *sync.WaitGroup
  16. }
  17. // New init
  18. func New(c *conf.Config) (s *Service) {
  19. s = &Service{
  20. c: c,
  21. dao: dao.New(c),
  22. relationBinLog: databus.New(c.RelationBinLog),
  23. waiter: &sync.WaitGroup{},
  24. }
  25. s.Start(context.Background())
  26. return s
  27. }
  28. // Start to handle requests
  29. func (s *Service) Start(ctx context.Context) {
  30. for i := 0; i < 50; i++ {
  31. go s.relationBinLogproc(context.Background())
  32. }
  33. }
  34. // Ping Service
  35. func (s *Service) Ping(c context.Context) (err error) {
  36. return s.dao.Ping(c)
  37. }
  38. // Close Service
  39. func (s *Service) Close() {
  40. s.dao.Close()
  41. }
  42. // BeautifyMessage is
  43. func BeautifyMessage(msg *databus.Message) string {
  44. pmsg := struct {
  45. Key string `json:"key"`
  46. Value string `json:"value"`
  47. Topic string `json:"topic"`
  48. Partition int32 `json:"partition"`
  49. Offset int64 `json:"offset"`
  50. Timestamp int64 `json:"timestamp"`
  51. }{
  52. Key: msg.Key,
  53. Value: string(msg.Value),
  54. Topic: msg.Topic,
  55. Partition: msg.Partition,
  56. Offset: msg.Offset,
  57. Timestamp: msg.Timestamp,
  58. }
  59. return fmt.Sprintf("%+v", pmsg)
  60. }