dao.go 701 B

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. package dao
  2. import (
  3. "bytes"
  4. "sync"
  5. "go-common/app/interface/main/report-click/conf"
  6. "go-common/library/queue/databus"
  7. )
  8. // Dao report-click dao
  9. type Dao struct {
  10. c *conf.Config
  11. merge *databus.Databus
  12. msgs chan []byte
  13. spliter []byte
  14. bfp sync.Pool
  15. }
  16. // New dao.
  17. func New(c *conf.Config) (d *Dao) {
  18. d = &Dao{
  19. c: c,
  20. merge: databus.New(c.DataBus.Merge),
  21. msgs: make(chan []byte, 1024),
  22. spliter: []byte("\001"),
  23. bfp: sync.Pool{
  24. New: func() interface{} {
  25. return bytes.NewBuffer([]byte{})
  26. },
  27. },
  28. }
  29. go d.pubproc()
  30. return
  31. }
  32. // Close close kafka connection.
  33. func (d *Dao) Close() {
  34. d.msgs <- d.spliter
  35. if d.merge != nil {
  36. d.merge.Close()
  37. }
  38. }