dispatch.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package server
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/ipipdotnet/ipdb-go"
  6. "go-common/app/service/live/broadcast-proxy/conf"
  7. "go-common/app/service/live/broadcast-proxy/dispatch"
  8. "go-common/app/service/live/broadcast-proxy/grocery"
  9. "go-common/library/log"
  10. "sync"
  11. )
  12. type CometDispatcher struct {
  13. sven *grocery.SvenClient
  14. stopper chan struct{}
  15. wg sync.WaitGroup
  16. locker sync.RWMutex
  17. ipDataV4 *ipdb.City
  18. ipDataV6 *ipdb.City
  19. matcher *dispatch.Matcher
  20. config *conf.DispatchConfig
  21. }
  22. func NewCometDispatcher(ipipConfig *conf.IpipConfig,
  23. dispatchConfig *conf.DispatchConfig, svenConfig *conf.SvenConfig) (*CometDispatcher, error) {
  24. sven, err := grocery.NewSvenClient(svenConfig.TreeID, svenConfig.Zone, svenConfig.Env, svenConfig.Build,
  25. svenConfig.Token)
  26. if err != nil {
  27. return nil, err
  28. }
  29. ipDataV4, err := ipdb.NewCity(ipipConfig.V4)
  30. if err != nil {
  31. return nil, err
  32. }
  33. ipDataV6, err := ipdb.NewCity(ipipConfig.V6)
  34. if err != nil {
  35. return nil, err
  36. }
  37. dispatcher := &CometDispatcher{
  38. sven: sven,
  39. stopper: make(chan struct{}),
  40. ipDataV4: ipDataV4,
  41. ipDataV6: ipDataV6,
  42. config: dispatchConfig,
  43. }
  44. config := sven.Config()
  45. if data, ok := config.Config[dispatchConfig.FileName]; ok {
  46. dispatcher.updateDispatchConfig(data)
  47. } else {
  48. return nil, errors.New(fmt.Sprintf("cannot find %s in sven config", dispatchConfig.FileName))
  49. }
  50. dispatcher.wg.Add(1)
  51. go func() {
  52. defer dispatcher.wg.Done()
  53. dispatcher.configWatcherProcess(dispatchConfig.FileName)
  54. }()
  55. return dispatcher, nil
  56. }
  57. func (dispatcher *CometDispatcher) Close() {
  58. close(dispatcher.stopper)
  59. dispatcher.wg.Wait()
  60. dispatcher.sven.Close()
  61. }
  62. func (dispatcher *CometDispatcher) updateDispatchConfig(config string) error {
  63. matcher, err := dispatch.NewMatcher([]byte(config), dispatcher.ipDataV4, dispatcher.ipDataV6, dispatcher.config)
  64. if err != nil {
  65. log.Error("parse rule config error:%v, data:%s", err, config)
  66. return err
  67. }
  68. dispatcher.locker.Lock()
  69. dispatcher.matcher = matcher
  70. dispatcher.locker.Unlock()
  71. log.Info("parse rule config ok, data:%s", config)
  72. return nil
  73. }
  74. func (dispatcher *CometDispatcher) configWatcherProcess(filename string) {
  75. var wg sync.WaitGroup
  76. wg.Add(1)
  77. go func() {
  78. defer wg.Done()
  79. for config := range dispatcher.sven.ConfigNotify() {
  80. log.Info("[sven]New version:%d", config.Version)
  81. log.Info("[sven]New config: %v", config.Config)
  82. if data, ok := config.Config[filename]; ok {
  83. dispatcher.updateDispatchConfig(data)
  84. }
  85. }
  86. }()
  87. wg.Add(1)
  88. go func() {
  89. defer wg.Done()
  90. for e := range dispatcher.sven.LogNotify() {
  91. log.Info("[sven]log level:%v, message:%v", e.Level, e.Message)
  92. }
  93. }()
  94. wg.Wait()
  95. }
  96. func (dispatcher *CometDispatcher) Dispatch(ip string, uid int64) ([]string, []string) {
  97. var matcher *dispatch.Matcher
  98. dispatcher.locker.RLock()
  99. matcher = dispatcher.matcher
  100. dispatcher.locker.RUnlock()
  101. if matcher == nil {
  102. return []string{dispatcher.config.DefaultDomain}, []string{dispatcher.config.DefaultDomain}
  103. }
  104. return matcher.Dispatch(ip, uid)
  105. }