1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- package databusutil
- import (
- "github.com/pkg/errors"
- "sync"
- "go-common/library/log"
- "go-common/library/queue/databus"
- )
- //NewDatabusHandler new handler
- func NewDatabusHandler() *DatabusHandler {
- return &DatabusHandler{
- closeCh: make(chan struct{}),
- databusMap: make(map[*databus.Databus]int, 10),
- }
- }
- //DatabusHandler handler
- type DatabusHandler struct {
- closeCh chan struct{}
- wg sync.WaitGroup
- databusMap map[*databus.Databus]int
- lock sync.Mutex
- }
- //Close close
- func (s *DatabusHandler) Close() {
- // close all databus
- s.lock.Lock()
- for k, v := range s.databusMap {
- log.Info("closing databus, %v, routine=%d", k, v)
- k.Close()
- }
- s.lock.Unlock()
- close(s.closeCh)
- s.wg.Wait()
- }
- func (s *DatabusHandler) incWatch(bus *databus.Databus) {
- s.lock.Lock()
- s.databusMap[bus] = s.databusMap[bus] + 1
- s.lock.Unlock()
- }
- func (s *DatabusHandler) decWatch(bus *databus.Databus) {
- s.lock.Lock()
- s.databusMap[bus] = s.databusMap[bus] - 1
- s.lock.Unlock()
- }
- //Watch watch without goroutine
- func (s *DatabusHandler) Watch(bus *databus.Databus, handler func(msg *databus.Message) error) {
- defer func() {
- s.wg.Done()
- s.decWatch(bus)
- if r := recover(); r != nil {
- r = errors.WithStack(r.(error))
- log.Error("Runtime error caught, try recover: %+v", r)
- s.wg.Add(1)
- go s.Watch(bus, handler)
- }
- }()
- var msgs = bus.Messages()
- s.incWatch(bus)
- log.Info("start watch databus, %+v", bus)
- for {
- var (
- msg *databus.Message
- ok bool
- err error
- )
- select {
- case msg, ok = <-msgs:
- if !ok {
- log.Error("s.archiveNotifyT.messages closed")
- return
- }
- case <-s.closeCh:
- log.Info("server close")
- return
- }
- msg.Commit()
- if err = handler(msg); err != nil {
- log.Error("handle databus error topic(%s) key(%s) value(%s) err(%v)", msg.Topic, msg.Key, msg.Value, err)
- continue
- }
- log.Info("handle databus topic(%s) key(%s) value(%s) finish", msg.Topic, msg.Key, msg.Value)
- }
- }
- //GoWatch watch with goroutine
- func (s *DatabusHandler) GoWatch(bus *databus.Databus, handler func(msg *databus.Message) error) {
- go s.Watch(bus, handler)
- }
|