123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package service
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "time"
- "go-common/app/job/main/broadcast/conf"
- "go-common/library/conf/env"
- "go-common/library/log"
- "go-common/library/naming"
- "go-common/library/naming/discovery"
- "go-common/library/queue/databus"
- xtime "go-common/library/time"
- )
- const (
- broadcastAppID = "push.interface.broadcast"
- )
- var (
- // ErrComet commet error.
- ErrComet = errors.New("comet rpc is not available")
- // ErrCometFull comet chan full.
- ErrCometFull = errors.New("comet proto chan full")
- // ErrRoomFull room chan full.
- ErrRoomFull = errors.New("room proto chan full")
- )
- // Service is a service.
- type Service struct {
- conf *conf.Config
- consumer *databus.Databus
- cometServers map[string]*Comet
- rooms map[string]*Room
- roomsMutex sync.RWMutex
- options RoomOptions
- }
- // New new a service and return.
- func New(c *conf.Config) *Service {
- if c.Room.Refresh <= 0 {
- c.Room.Refresh = xtime.Duration(time.Second)
- }
- s := &Service{
- conf: c,
- consumer: databus.New(c.Databus),
- cometServers: make(map[string]*Comet),
- rooms: make(map[string]*Room, 1024),
- roomsMutex: sync.RWMutex{},
- options: RoomOptions{
- BatchNum: c.Room.Batch,
- SignalTime: time.Duration(c.Room.Signal),
- },
- }
- dis := discovery.New(c.Discovery)
- s.watchComet(dis.Build(broadcastAppID))
- go s.consume()
- return s
- }
- func (s *Service) consume() {
- msgs := s.consumer.Messages()
- for {
- msg, ok := <-msgs
- if !ok {
- log.Warn("[job] consumer has been closed")
- return
- }
- if msg.Topic != s.conf.Databus.Topic {
- log.Error("unknown message:%v", msg)
- continue
- }
- s.pushMsg(msg.Value)
- msg.Commit()
- }
- }
- // Close close the resources.
- func (s *Service) Close() error {
- if err := s.consumer.Close(); err != nil {
- return err
- }
- for _, c := range s.cometServers {
- if err := c.Close(); err != nil {
- log.Error("c.Close() error(%v)", err)
- }
- }
- return nil
- }
- func (s *Service) watchComet(resolver naming.Resolver) {
- event := resolver.Watch()
- select {
- case _, ok := <-event:
- if !ok {
- panic("watchComet init failed")
- }
- if ins, ok := resolver.Fetch(context.Background()); ok {
- if err := s.newAddress(ins); err != nil {
- panic(err)
- }
- log.Info("watchComet init newAddress:%+v", ins)
- }
- case <-time.After(10 * time.Second):
- log.Error("watchComet init instances timeout")
- }
- go func() {
- for {
- if _, ok := <-event; !ok {
- log.Info("watchComet exit")
- return
- }
- ins, ok := resolver.Fetch(context.Background())
- if ok {
- if err := s.newAddress(ins); err != nil {
- log.Error("watchComet newAddress(%+v) error(%+v)", ins, err)
- continue
- }
- log.Info("watchComet change newAddress:%+v", ins)
- }
- }
- }()
- }
- func (s *Service) newAddress(insMap map[string][]*naming.Instance) error {
- ins := insMap[env.Zone]
- if len(ins) == 0 {
- return fmt.Errorf("watchComet instance is empty")
- }
- comets := map[string]*Comet{}
- options := CometOptions{
- RoutineSize: s.conf.Routine.Size,
- RoutineChan: s.conf.Routine.Chan,
- }
- for _, data := range ins {
- if old, ok := s.cometServers[data.Hostname]; ok {
- comets[data.Hostname] = old
- continue
- }
- c, err := NewComet(data, s.conf, options)
- if err != nil {
- log.Error("watchComet NewComet(%+v) error(%v)", data, err)
- return err
- }
- comets[data.Hostname] = c
- log.Info("watchComet AddComet grpc:%+v", data)
- }
- for key, old := range s.cometServers {
- if _, ok := comets[key]; !ok {
- old.cancel()
- log.Info("watchComet DelComet:%s", key)
- }
- }
- s.cometServers = comets
- return nil
- }
- func (s *Service) room(roomID string) *Room {
- s.roomsMutex.RLock()
- room, ok := s.rooms[roomID]
- s.roomsMutex.RUnlock()
- if !ok {
- s.roomsMutex.Lock()
- if room, ok = s.rooms[roomID]; !ok {
- room = NewRoom(s, roomID, s.options)
- s.rooms[roomID] = room
- }
- s.roomsMutex.Unlock()
- log.Info("new a room:%s active:%d", roomID, len(s.rooms))
- }
- return room
- }
|