room.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package service
  2. import (
  3. "time"
  4. "go-common/app/service/main/broadcast/libs/bytes"
  5. "go-common/app/service/main/broadcast/model"
  6. "go-common/library/log"
  7. )
  8. // RoomOptions room options.
  9. type RoomOptions struct {
  10. BatchNum int
  11. SignalTime time.Duration
  12. }
  13. // Room room.
  14. type Room struct {
  15. s *Service
  16. id string
  17. proto chan *model.Proto
  18. }
  19. var (
  20. roomReadyProto = new(model.Proto)
  21. )
  22. // NewRoom new a room struct, store channel room info.
  23. func NewRoom(s *Service, id string, options RoomOptions) (r *Room) {
  24. r = &Room{
  25. s: s,
  26. id: id,
  27. proto: make(chan *model.Proto, options.BatchNum*2),
  28. }
  29. go r.pushproc(options.BatchNum, options.SignalTime)
  30. return
  31. }
  32. // Push push msg to the room, if chan full discard it.
  33. func (r *Room) Push(op int32, msg []byte, contentType int32) (err error) {
  34. var p = &model.Proto{
  35. Ver: 1,
  36. Operation: op,
  37. ContentType: contentType,
  38. Body: msg,
  39. }
  40. select {
  41. case r.proto <- p:
  42. default:
  43. err = ErrRoomFull
  44. }
  45. return
  46. }
  47. // pushproc merge proto and push msgs in batch.
  48. func (r *Room) pushproc(batch int, sigTime time.Duration) {
  49. var (
  50. n int
  51. last time.Time
  52. p *model.Proto
  53. buf = bytes.NewWriterSize(int(model.MaxBodySize))
  54. )
  55. log.Info("start room:%s goroutine", r.id)
  56. td := time.AfterFunc(sigTime, func() {
  57. select {
  58. case r.proto <- roomReadyProto:
  59. default:
  60. }
  61. })
  62. defer td.Stop()
  63. for {
  64. if p = <-r.proto; p == nil {
  65. break // exit
  66. } else if p != roomReadyProto {
  67. // merge buffer ignore error, always nil
  68. p.WriteTo(buf)
  69. if n++; n == 1 {
  70. last = time.Now()
  71. td.Reset(sigTime)
  72. continue
  73. } else if n < batch {
  74. if sigTime > time.Since(last) {
  75. continue
  76. }
  77. }
  78. } else {
  79. if n == 0 {
  80. break
  81. }
  82. }
  83. r.s.broadcastRoomRawBytes(r.id, buf.Buffer())
  84. // TODO use reset buffer
  85. // after push to room channel, renew a buffer, let old buffer gc
  86. buf = bytes.NewWriterSize(buf.Size())
  87. n = 0
  88. if r.s.conf.Room.Idle != 0 {
  89. td.Reset(time.Duration(r.s.conf.Room.Idle))
  90. } else {
  91. td.Reset(time.Minute)
  92. }
  93. }
  94. r.s.roomsMutex.Lock()
  95. delete(r.s.rooms, r.id)
  96. r.s.roomsMutex.Unlock()
  97. log.Info("room:%s goroutine exit", r.id)
  98. }