databus.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strconv"
  6. "go-common/app/interface/main/dm2/model"
  7. "go-common/library/log"
  8. )
  9. // PubDatabus pub cache update message to databus.
  10. func (d *Dao) PubDatabus(c context.Context, tp int32, pid, oid, cnt, n, duration int64) (err error) {
  11. var (
  12. jobParams = &model.JobParam{
  13. Type: tp,
  14. Oid: oid,
  15. Pid: pid,
  16. Cnt: cnt,
  17. Num: n,
  18. Duration: duration,
  19. }
  20. )
  21. value, err := json.Marshal(jobParams)
  22. if err != nil {
  23. log.Error("json.Marshal(%v) error(%v)", jobParams, err)
  24. return
  25. }
  26. msg := model.Action{Action: model.ActionIdx, Data: value}
  27. if err = d.databus.Send(c, strconv.FormatInt(oid, 10), msg); err != nil {
  28. log.Error("databus.Send(%v) error(%v)", msg, err)
  29. }
  30. return
  31. }
  32. // SendAction send action to job.
  33. func (d *Dao) SendAction(c context.Context, k string, act *model.Action) (err error) {
  34. if err = d.actionPub.Send(c, k, act); err != nil {
  35. log.Error("actionPub.Send(action:%s,data:%s) error(%v)", act.Action, act.Data, err)
  36. } else {
  37. log.Info("actionPub.Send(action:%s,data:%s) success", act.Action, act.Data)
  38. }
  39. return
  40. }
  41. // SendSubtitleCheck .
  42. func (d *Dao) SendSubtitleCheck(c context.Context, key string, msg *model.SubtitleCheckMsg) (err error) {
  43. if err = d.subtitleCheckPub.Send(c, key, msg); err != nil {
  44. log.Error("actionPub.Send(key:%s,msg:%+v) error(%v)", key, msg, err)
  45. } else {
  46. log.Error("actionPub.Send(key:%s,msg:%+v) success", key, msg)
  47. }
  48. return
  49. }