comet.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "sync/atomic"
  7. "time"
  8. client "go-common/app/interface/main/broadcast/api/grpc/v1"
  9. "go-common/app/job/main/broadcast/conf"
  10. "go-common/library/log"
  11. "go-common/library/naming"
  12. )
  13. // CometOptions comet options.
  14. type CometOptions struct {
  15. RoutineSize uint64
  16. RoutineChan uint64
  17. }
  18. // Comet is a broadcast comet.
  19. type Comet struct {
  20. serverID string
  21. broadcastClient client.ZergClient
  22. pushChan []chan *client.PushMsgReq
  23. roomChan []chan *client.BroadcastRoomReq
  24. broadcastChan chan *client.BroadcastReq
  25. pushChanNum uint64
  26. roomChanNum uint64
  27. options CometOptions
  28. ctx context.Context
  29. cancel context.CancelFunc
  30. }
  31. // Push push a user message.
  32. func (c *Comet) Push(arg *client.PushMsgReq) (err error) {
  33. idx := atomic.AddUint64(&c.pushChanNum, 1) % c.options.RoutineSize
  34. c.pushChan[idx] <- arg
  35. return
  36. }
  37. // BroadcastRoom broadcast a room message.
  38. func (c *Comet) BroadcastRoom(arg *client.BroadcastRoomReq) (err error) {
  39. idx := atomic.AddUint64(&c.roomChanNum, 1) % c.options.RoutineSize
  40. c.roomChan[idx] <- arg
  41. return
  42. }
  43. // Broadcast broadcast a message.
  44. func (c *Comet) Broadcast(arg *client.BroadcastReq) (err error) {
  45. c.broadcastChan <- arg
  46. return
  47. }
  48. // process
  49. func (c *Comet) process(pushChan chan *client.PushMsgReq, roomChan chan *client.BroadcastRoomReq, broadcastChan chan *client.BroadcastReq) {
  50. var err error
  51. for {
  52. select {
  53. case broadcastArg := <-broadcastChan:
  54. _, err = c.broadcastClient.Broadcast(context.Background(), &client.BroadcastReq{
  55. Proto: broadcastArg.Proto,
  56. ProtoOp: broadcastArg.ProtoOp,
  57. Speed: broadcastArg.Speed,
  58. Platform: broadcastArg.Platform,
  59. })
  60. if err != nil {
  61. log.Error("c.broadcastClient.Broadcast(%s, %v, reply) serverId:%d error(%v)", broadcastArg, c.serverID, err)
  62. }
  63. case roomArg := <-roomChan:
  64. _, err = c.broadcastClient.BroadcastRoom(context.Background(), &client.BroadcastRoomReq{
  65. RoomID: roomArg.RoomID,
  66. Proto: roomArg.Proto,
  67. })
  68. if err != nil {
  69. log.Error("c.broadcastClient.BroadcastRoom(%s, %v, reply) serverId:%d error(%v)", roomArg, c.serverID, err)
  70. }
  71. case pushArg := <-pushChan:
  72. _, err = c.broadcastClient.PushMsg(context.Background(), &client.PushMsgReq{
  73. Keys: pushArg.Keys,
  74. Proto: pushArg.Proto,
  75. ProtoOp: pushArg.ProtoOp,
  76. })
  77. if err != nil {
  78. log.Error("c.broadcastClient.PushMsg(%s, %v, reply) serverId:%d error(%v)", pushArg, c.serverID, err)
  79. }
  80. case <-c.ctx.Done():
  81. return
  82. }
  83. }
  84. }
  85. // Close close the resouces.
  86. func (c *Comet) Close() (err error) {
  87. finish := make(chan bool)
  88. go func() {
  89. for {
  90. n := len(c.broadcastChan)
  91. for _, ch := range c.pushChan {
  92. n += len(ch)
  93. }
  94. for _, ch := range c.roomChan {
  95. n += len(ch)
  96. }
  97. if n == 0 {
  98. finish <- true
  99. return
  100. }
  101. time.Sleep(time.Second)
  102. }
  103. }()
  104. select {
  105. case <-finish:
  106. log.Info("close comet finish")
  107. case <-time.After(5 * time.Second):
  108. err = fmt.Errorf("close comet(server:%s push:%d room:%d broadcast:%d) timeout", c.serverID, len(c.pushChan), len(c.roomChan), len(c.broadcastChan))
  109. }
  110. c.cancel()
  111. return
  112. }
  113. // NewComet new a comet.
  114. func NewComet(data *naming.Instance, conf *conf.Config, options CometOptions) (*Comet, error) {
  115. c := &Comet{
  116. serverID: data.Hostname,
  117. pushChan: make([]chan *client.PushMsgReq, options.RoutineSize),
  118. roomChan: make([]chan *client.BroadcastRoomReq, options.RoutineSize),
  119. broadcastChan: make(chan *client.BroadcastReq, options.RoutineSize),
  120. options: options,
  121. }
  122. var grpcAddr string
  123. for _, addrs := range data.Addrs {
  124. u, err := url.Parse(addrs)
  125. if err == nil && u.Scheme == "grpc" {
  126. grpcAddr = u.Host
  127. }
  128. }
  129. if grpcAddr == "" {
  130. return nil, fmt.Errorf("invalid grpc address:%v", data.Addrs)
  131. }
  132. var err error
  133. if c.broadcastClient, err = client.NewClient(grpcAddr, conf.RPC); err != nil {
  134. return nil, err
  135. }
  136. c.ctx, c.cancel = context.WithCancel(context.Background())
  137. for i := uint64(0); i < options.RoutineSize; i++ {
  138. c.pushChan[i] = make(chan *client.PushMsgReq, options.RoutineChan)
  139. c.roomChan[i] = make(chan *client.BroadcastRoomReq, options.RoutineChan)
  140. go c.process(c.pushChan[i], c.roomChan[i], c.broadcastChan)
  141. }
  142. return c, nil
  143. }