123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package service
- import (
- "time"
- "go-common/app/service/main/broadcast/libs/bytes"
- "go-common/app/service/main/broadcast/model"
- "go-common/library/log"
- )
- // RoomOptions room options.
- type RoomOptions struct {
- BatchNum int
- SignalTime time.Duration
- }
- // Room room.
- type Room struct {
- s *Service
- id string
- proto chan *model.Proto
- }
- var (
- roomReadyProto = new(model.Proto)
- )
- // NewRoom new a room struct, store channel room info.
- func NewRoom(s *Service, id string, options RoomOptions) (r *Room) {
- r = &Room{
- s: s,
- id: id,
- proto: make(chan *model.Proto, options.BatchNum*2),
- }
- go r.pushproc(options.BatchNum, options.SignalTime)
- return
- }
- // Push push msg to the room, if chan full discard it.
- func (r *Room) Push(op int32, msg []byte, contentType int32) (err error) {
- var p = &model.Proto{
- Ver: 1,
- Operation: op,
- ContentType: contentType,
- Body: msg,
- }
- select {
- case r.proto <- p:
- default:
- err = ErrRoomFull
- }
- return
- }
- // pushproc merge proto and push msgs in batch.
- func (r *Room) pushproc(batch int, sigTime time.Duration) {
- var (
- n int
- last time.Time
- p *model.Proto
- buf = bytes.NewWriterSize(int(model.MaxBodySize))
- )
- log.Info("start room:%s goroutine", r.id)
- td := time.AfterFunc(sigTime, func() {
- select {
- case r.proto <- roomReadyProto:
- default:
- }
- })
- defer td.Stop()
- for {
- if p = <-r.proto; p == nil {
- break // exit
- } else if p != roomReadyProto {
- // merge buffer ignore error, always nil
- p.WriteTo(buf)
- if n++; n == 1 {
- last = time.Now()
- td.Reset(sigTime)
- continue
- } else if n < batch {
- if sigTime > time.Since(last) {
- continue
- }
- }
- } else {
- if n == 0 {
- break
- }
- }
- r.s.broadcastRoomRawBytes(r.id, buf.Buffer())
- // TODO use reset buffer
- // after push to room channel, renew a buffer, let old buffer gc
- buf = bytes.NewWriterSize(buf.Size())
- n = 0
- if r.s.conf.Room.Idle != 0 {
- td.Reset(time.Duration(r.s.conf.Room.Idle))
- } else {
- td.Reset(time.Minute)
- }
- }
- r.s.roomsMutex.Lock()
- delete(r.s.rooms, r.id)
- r.s.roomsMutex.Unlock()
- log.Info("room:%s goroutine exit", r.id)
- }
|