service.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "go-common/app/job/main/broadcast/conf"
  9. "go-common/library/conf/env"
  10. "go-common/library/log"
  11. "go-common/library/naming"
  12. "go-common/library/naming/discovery"
  13. "go-common/library/queue/databus"
  14. xtime "go-common/library/time"
  15. )
  16. const (
  17. broadcastAppID = "push.interface.broadcast"
  18. )
  19. var (
  20. // ErrComet commet error.
  21. ErrComet = errors.New("comet rpc is not available")
  22. // ErrCometFull comet chan full.
  23. ErrCometFull = errors.New("comet proto chan full")
  24. // ErrRoomFull room chan full.
  25. ErrRoomFull = errors.New("room proto chan full")
  26. )
  27. // Service is a service.
  28. type Service struct {
  29. conf *conf.Config
  30. consumer *databus.Databus
  31. cometServers map[string]*Comet
  32. rooms map[string]*Room
  33. roomsMutex sync.RWMutex
  34. options RoomOptions
  35. }
  36. // New new a service and return.
  37. func New(c *conf.Config) *Service {
  38. if c.Room.Refresh <= 0 {
  39. c.Room.Refresh = xtime.Duration(time.Second)
  40. }
  41. s := &Service{
  42. conf: c,
  43. consumer: databus.New(c.Databus),
  44. cometServers: make(map[string]*Comet),
  45. rooms: make(map[string]*Room, 1024),
  46. roomsMutex: sync.RWMutex{},
  47. options: RoomOptions{
  48. BatchNum: c.Room.Batch,
  49. SignalTime: time.Duration(c.Room.Signal),
  50. },
  51. }
  52. dis := discovery.New(c.Discovery)
  53. s.watchComet(dis.Build(broadcastAppID))
  54. go s.consume()
  55. return s
  56. }
  57. func (s *Service) consume() {
  58. msgs := s.consumer.Messages()
  59. for {
  60. msg, ok := <-msgs
  61. if !ok {
  62. log.Warn("[job] consumer has been closed")
  63. return
  64. }
  65. if msg.Topic != s.conf.Databus.Topic {
  66. log.Error("unknown message:%v", msg)
  67. continue
  68. }
  69. s.pushMsg(msg.Value)
  70. msg.Commit()
  71. }
  72. }
  73. // Close close the resources.
  74. func (s *Service) Close() error {
  75. if err := s.consumer.Close(); err != nil {
  76. return err
  77. }
  78. for _, c := range s.cometServers {
  79. if err := c.Close(); err != nil {
  80. log.Error("c.Close() error(%v)", err)
  81. }
  82. }
  83. return nil
  84. }
  85. func (s *Service) watchComet(resolver naming.Resolver) {
  86. event := resolver.Watch()
  87. select {
  88. case _, ok := <-event:
  89. if !ok {
  90. panic("watchComet init failed")
  91. }
  92. if ins, ok := resolver.Fetch(context.Background()); ok {
  93. if err := s.newAddress(ins); err != nil {
  94. panic(err)
  95. }
  96. log.Info("watchComet init newAddress:%+v", ins)
  97. }
  98. case <-time.After(10 * time.Second):
  99. log.Error("watchComet init instances timeout")
  100. }
  101. go func() {
  102. for {
  103. if _, ok := <-event; !ok {
  104. log.Info("watchComet exit")
  105. return
  106. }
  107. ins, ok := resolver.Fetch(context.Background())
  108. if ok {
  109. if err := s.newAddress(ins); err != nil {
  110. log.Error("watchComet newAddress(%+v) error(%+v)", ins, err)
  111. continue
  112. }
  113. log.Info("watchComet change newAddress:%+v", ins)
  114. }
  115. }
  116. }()
  117. }
  118. func (s *Service) newAddress(insMap map[string][]*naming.Instance) error {
  119. ins := insMap[env.Zone]
  120. if len(ins) == 0 {
  121. return fmt.Errorf("watchComet instance is empty")
  122. }
  123. comets := map[string]*Comet{}
  124. options := CometOptions{
  125. RoutineSize: s.conf.Routine.Size,
  126. RoutineChan: s.conf.Routine.Chan,
  127. }
  128. for _, data := range ins {
  129. if old, ok := s.cometServers[data.Hostname]; ok {
  130. comets[data.Hostname] = old
  131. continue
  132. }
  133. c, err := NewComet(data, s.conf, options)
  134. if err != nil {
  135. log.Error("watchComet NewComet(%+v) error(%v)", data, err)
  136. return err
  137. }
  138. comets[data.Hostname] = c
  139. log.Info("watchComet AddComet grpc:%+v", data)
  140. }
  141. for key, old := range s.cometServers {
  142. if _, ok := comets[key]; !ok {
  143. old.cancel()
  144. log.Info("watchComet DelComet:%s", key)
  145. }
  146. }
  147. s.cometServers = comets
  148. return nil
  149. }
  150. func (s *Service) room(roomID string) *Room {
  151. s.roomsMutex.RLock()
  152. room, ok := s.rooms[roomID]
  153. s.roomsMutex.RUnlock()
  154. if !ok {
  155. s.roomsMutex.Lock()
  156. if room, ok = s.rooms[roomID]; !ok {
  157. room = NewRoom(s, roomID, s.options)
  158. s.rooms[roomID] = room
  159. }
  160. s.roomsMutex.Unlock()
  161. log.Info("new a room:%s active:%d", roomID, len(s.rooms))
  162. }
  163. return room
  164. }