channel.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package server
  2. import (
  3. "sync"
  4. "go-common/app/service/main/broadcast/libs/bufio"
  5. "go-common/app/service/main/broadcast/model"
  6. )
  7. // Channel used by message pusher send msg to write goroutine.
  8. type Channel struct {
  9. Room *Room
  10. CliProto Ring
  11. signal chan *model.Proto
  12. Writer bufio.Writer
  13. Reader bufio.Reader
  14. Next *Channel
  15. Prev *Channel
  16. Mid int64
  17. Key string
  18. IP string
  19. Platform string
  20. watchOps map[int32]struct{}
  21. mutex sync.RWMutex
  22. V1 bool
  23. }
  24. // NewChannel new a channel.
  25. func NewChannel(cli, svr int) *Channel {
  26. c := new(Channel)
  27. c.CliProto.Init(cli)
  28. c.signal = make(chan *model.Proto, svr)
  29. c.watchOps = make(map[int32]struct{})
  30. return c
  31. }
  32. // Watch watch a operation.
  33. func (c *Channel) Watch(accepts ...int32) {
  34. c.mutex.Lock()
  35. for _, op := range accepts {
  36. if op >= model.MinBusinessOp && op <= model.MaxBusinessOp {
  37. c.watchOps[op] = struct{}{}
  38. }
  39. }
  40. c.mutex.Unlock()
  41. }
  42. // UnWatch unwatch an operation
  43. func (c *Channel) UnWatch(accepts ...int32) {
  44. c.mutex.Lock()
  45. for _, op := range accepts {
  46. delete(c.watchOps, op)
  47. }
  48. c.mutex.Unlock()
  49. }
  50. // NeedPush verify if in watch.
  51. func (c *Channel) NeedPush(op int32, platform string) bool {
  52. if c.Platform != platform && platform != "" {
  53. return false
  54. }
  55. if op >= 0 && op < model.MinBusinessOp {
  56. return true
  57. }
  58. c.mutex.RLock()
  59. if _, ok := c.watchOps[op]; ok {
  60. c.mutex.RUnlock()
  61. return true
  62. }
  63. c.mutex.RUnlock()
  64. return false
  65. }
  66. // Push server push message.
  67. func (c *Channel) Push(p *model.Proto) (err error) {
  68. // NOTE: 兼容v1弹幕推送,等播放器接后可以去掉
  69. if c.V1 && p.Operation != 5 {
  70. return
  71. }
  72. select {
  73. case c.signal <- p:
  74. default:
  75. }
  76. return
  77. }
  78. // Ready check the channel ready or close?
  79. func (c *Channel) Ready() *model.Proto {
  80. return <-c.signal
  81. }
  82. // Signal send signal to the channel, protocol ready.
  83. func (c *Channel) Signal() {
  84. c.signal <- model.ProtoReady
  85. }
  86. // Close close the channel.
  87. func (c *Channel) Close() {
  88. c.signal <- model.ProtoFinish
  89. }