databushandler.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package databusutil
  2. import (
  3. "github.com/pkg/errors"
  4. "sync"
  5. "go-common/library/log"
  6. "go-common/library/queue/databus"
  7. )
  8. //NewDatabusHandler new handler
  9. func NewDatabusHandler() *DatabusHandler {
  10. return &DatabusHandler{
  11. closeCh: make(chan struct{}),
  12. databusMap: make(map[*databus.Databus]int, 10),
  13. }
  14. }
  15. //DatabusHandler handler
  16. type DatabusHandler struct {
  17. closeCh chan struct{}
  18. wg sync.WaitGroup
  19. databusMap map[*databus.Databus]int
  20. lock sync.Mutex
  21. }
  22. //Close close
  23. func (s *DatabusHandler) Close() {
  24. // close all databus
  25. s.lock.Lock()
  26. for k, v := range s.databusMap {
  27. log.Info("closing databus, %v, routine=%d", k, v)
  28. k.Close()
  29. }
  30. s.lock.Unlock()
  31. close(s.closeCh)
  32. s.wg.Wait()
  33. }
  34. func (s *DatabusHandler) incWatch(bus *databus.Databus) {
  35. s.lock.Lock()
  36. s.databusMap[bus] = s.databusMap[bus] + 1
  37. s.lock.Unlock()
  38. }
  39. func (s *DatabusHandler) decWatch(bus *databus.Databus) {
  40. s.lock.Lock()
  41. s.databusMap[bus] = s.databusMap[bus] - 1
  42. s.lock.Unlock()
  43. }
  44. //Watch watch without goroutine
  45. func (s *DatabusHandler) Watch(bus *databus.Databus, handler func(msg *databus.Message) error) {
  46. defer func() {
  47. s.wg.Done()
  48. s.decWatch(bus)
  49. if r := recover(); r != nil {
  50. r = errors.WithStack(r.(error))
  51. log.Error("Runtime error caught, try recover: %+v", r)
  52. s.wg.Add(1)
  53. go s.Watch(bus, handler)
  54. }
  55. }()
  56. var msgs = bus.Messages()
  57. s.incWatch(bus)
  58. log.Info("start watch databus, %+v", bus)
  59. for {
  60. var (
  61. msg *databus.Message
  62. ok bool
  63. err error
  64. )
  65. select {
  66. case msg, ok = <-msgs:
  67. if !ok {
  68. log.Error("s.archiveNotifyT.messages closed")
  69. return
  70. }
  71. case <-s.closeCh:
  72. log.Info("server close")
  73. return
  74. }
  75. msg.Commit()
  76. if err = handler(msg); err != nil {
  77. log.Error("handle databus error topic(%s) key(%s) value(%s) err(%v)", msg.Topic, msg.Key, msg.Value, err)
  78. continue
  79. }
  80. log.Info("handle databus topic(%s) key(%s) value(%s) finish", msg.Topic, msg.Key, msg.Value)
  81. }
  82. }
  83. //GoWatch watch with goroutine
  84. func (s *DatabusHandler) GoWatch(bus *databus.Databus, handler func(msg *databus.Message) error) {
  85. go s.Watch(bus, handler)
  86. }