databus.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package stat
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. "go-common/app/job/main/reply/conf"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. )
  10. type statMsg struct {
  11. ID int64 `json:"id"`
  12. Timestamp int64 `json:"timestamp"`
  13. Count int `json:"count"`
  14. Type string `json:"type"`
  15. }
  16. // Dao stat dao.
  17. type Dao struct {
  18. // new databus stats
  19. types map[int8]string
  20. databus *databus.Databus
  21. }
  22. // New new a stat dao and return.
  23. func New(c *conf.Config) *Dao {
  24. d := new(Dao)
  25. // new databus stats
  26. d.types = make(map[int8]string)
  27. for name, typ := range c.StatTypes {
  28. d.types[typ] = name
  29. }
  30. d.databus = databus.New(c.Databus.Stats)
  31. return d
  32. }
  33. // Send update stat.
  34. func (d *Dao) Send(c context.Context, typ int8, oid int64, cnt int) (err error) {
  35. // new databus stats
  36. if name, ok := d.types[typ]; ok {
  37. m := &statMsg{
  38. ID: oid,
  39. Type: name,
  40. Count: cnt,
  41. Timestamp: time.Now().Unix(),
  42. }
  43. if err = d.databus.Send(c, strconv.FormatInt(oid, 10), m); err != nil {
  44. log.Error("d.databus.Send(%d,%d,%d) error(%v)", typ, oid, cnt, err)
  45. }
  46. }
  47. return
  48. }