databus.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/library/log"
  6. )
  7. const (
  8. _pushMsg = "push"
  9. _broadcastMsg = "broadcast"
  10. _broadcastRoomMsg = "broadcast_room"
  11. )
  12. type pushMsg struct {
  13. Type string `json:"type,omitempty"`
  14. Operation int32 `json:"operation,omitempty"`
  15. Server string `json:"server,omitempty"`
  16. Keys []string `json:"keys,omitempty"`
  17. Room string `json:"room,omitempty"`
  18. Speed int32 `json:"speed,omitempty"`
  19. Platform string `json:"platform,omitempty"`
  20. ContentType int32 `json:"content_type,omitempty"`
  21. Message json.RawMessage `json:"message,omitempty"`
  22. }
  23. // PushMsg push a message to databus.
  24. func (d *Dao) PushMsg(c context.Context, op int32, server, msg string, keys []string, contentType int32) (err error) {
  25. pushMsg := &pushMsg{
  26. Type: _pushMsg,
  27. Operation: op,
  28. Server: server,
  29. Keys: keys,
  30. Message: []byte(msg),
  31. ContentType: contentType,
  32. }
  33. if err = d.pushBus.Send(c, keys[0], pushMsg); err != nil {
  34. log.Error("PushMsg.send(server:%v,pushMsg:%v).error(%v)", server, pushMsg, err)
  35. }
  36. return
  37. }
  38. // BroadcastRoomMsg push a message to databus.
  39. func (d *Dao) BroadcastRoomMsg(c context.Context, op int32, room, msg string, contentType int32) (err error) {
  40. pushMsg := &pushMsg{
  41. Type: _broadcastRoomMsg,
  42. Operation: op,
  43. Room: room,
  44. Message: []byte(msg),
  45. ContentType: contentType,
  46. }
  47. if err = d.pushBus.Send(c, room, pushMsg); err != nil {
  48. log.Error("BroadcastRoomMsg.send(room:%v,pushMsg:%v).error(%v)", room, pushMsg, err)
  49. }
  50. return
  51. }
  52. // BroadcastMsg push a message to databus.
  53. func (d *Dao) BroadcastMsg(c context.Context, op, speed int32, msg, platform string, contentType int32) (err error) {
  54. pushMsg := &pushMsg{
  55. Operation: op,
  56. Type: _broadcastMsg,
  57. Speed: speed,
  58. Message: []byte(msg),
  59. Platform: platform,
  60. ContentType: contentType,
  61. }
  62. if err = d.pushBus.Send(c, _broadcastMsg, pushMsg); err != nil {
  63. log.Error("BroadcastMsg.send(_broadcastMsg:%v,pushMsg:%v).error(%v)", _broadcastMsg, pushMsg, err)
  64. }
  65. return
  66. }