databus.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package dao
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. "go-common/library/log"
  7. )
  8. // PubBigData pub msg into databus.
  9. func (d *Dao) PubBigData(c context.Context, aid int64, msg interface{}) (err error) {
  10. key := strconv.FormatInt(aid, 10)
  11. if err = d.dbBigData.Send(c, key, msg); err != nil {
  12. log.Error("dbBigData.Pub(%s, %v) error (%v)", key, msg, err)
  13. PromError("dbus:PubBigData")
  14. }
  15. return
  16. }
  17. // PubCoinJob pub job msg into databus.
  18. func (d *Dao) PubCoinJob(c context.Context, aid int64, msg interface{}) (err error) {
  19. key := strconv.FormatInt(aid, 10)
  20. for i := 0; i < 3; i++ {
  21. if err = d.dbCoinJob.Send(c, key, msg); err != nil {
  22. log.Error("d.dbCoinJob.Pub(%s, %v) error (%v) times: %v", key, msg, err, i+1)
  23. PromError("dbus:PubCoinJob")
  24. time.Sleep(time.Millisecond * 50)
  25. continue
  26. }
  27. break
  28. }
  29. return
  30. }
  31. // PubStat pub stat msg into databus.
  32. func (d *Dao) PubStat(c context.Context, aid, tp, count int64) (err error) {
  33. var s = &struct {
  34. Type string `json:"type"`
  35. ID int64 `json:"id"`
  36. Count int64 `json:"count"`
  37. Timestamp int64 `json:"timestamp"`
  38. }{
  39. ID: aid,
  40. Count: count,
  41. Timestamp: time.Now().Unix(),
  42. }
  43. // double write new databus.
  44. if b, ok := d.Businesses[tp]; ok {
  45. s.Type = b.Name
  46. if err = d.stat.Send(c, strconv.FormatInt(aid, 10), s); err != nil {
  47. log.Error("d.stat.Pub(%+v) error(%v)", s, err)
  48. PromError("dbus:stat")
  49. }
  50. }
  51. return
  52. }