main.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "math/rand"
  7. "net"
  8. "os"
  9. "os/signal"
  10. "runtime"
  11. "syscall"
  12. "time"
  13. "go-common/app/interface/main/broadcast/conf"
  14. "go-common/app/interface/main/broadcast/server"
  15. wardenServer "go-common/app/interface/main/broadcast/server/grpc"
  16. "go-common/app/interface/main/broadcast/server/http"
  17. "go-common/library/conf/env"
  18. "go-common/library/conf/paladin"
  19. "go-common/library/log"
  20. "go-common/library/naming"
  21. "go-common/library/naming/discovery"
  22. "go-common/library/net/ip"
  23. "go-common/library/net/rpc/warden/resolver"
  24. )
  25. const (
  26. ver = "v1.2.0"
  27. )
  28. func main() {
  29. flag.Parse()
  30. if err := paladin.Init(); err != nil {
  31. panic(err)
  32. }
  33. if err := paladin.Watch("broadcast.toml", conf.Conf); err != nil {
  34. panic(err)
  35. }
  36. runtime.GOMAXPROCS(conf.Conf.Broadcast.MaxProc)
  37. log.Init(conf.Conf.Log)
  38. defer log.Close()
  39. log.Info("broadcast %s start", ver)
  40. rand.Seed(time.Now().UTC().UnixNano())
  41. dis := discovery.New(conf.Conf.Discovery)
  42. resolver.Register(dis)
  43. // server
  44. srv := server.NewServer(conf.Conf)
  45. if err := server.InitWhitelist(conf.Conf.Whitelist); err != nil {
  46. panic(err)
  47. }
  48. if err := server.InitTCP(srv, conf.Conf.TCP.Bind, conf.Conf.Broadcast.MaxProc); err != nil {
  49. panic(err)
  50. }
  51. if err := server.InitWebsocket(srv, conf.Conf.WebSocket.Bind, conf.Conf.Broadcast.MaxProc); err != nil {
  52. panic(err)
  53. }
  54. if conf.Conf.WebSocket.TLSOpen {
  55. if err := server.InitWebsocketWithTLS(srv, conf.Conf.WebSocket.TLSBind, conf.Conf.WebSocket.CertFile, conf.Conf.WebSocket.PrivateFile, runtime.NumCPU()); err != nil {
  56. panic(err)
  57. }
  58. }
  59. // v1
  60. if conf.Conf.Broadcast.OpenPortV1 {
  61. if err := server.InitTCPV1(srv, conf.Conf.TCP.BindV1, conf.Conf.Broadcast.MaxProc); err != nil {
  62. panic(err)
  63. }
  64. if err := server.InitWebsocketV1(srv, conf.Conf.WebSocket.BindV1, conf.Conf.Broadcast.MaxProc); err != nil {
  65. panic(err)
  66. }
  67. if conf.Conf.WebSocket.TLSOpen {
  68. if err := server.InitWebsocketWithTLSV1(srv, conf.Conf.WebSocket.TLSBindV1, conf.Conf.WebSocket.CertFile, conf.Conf.WebSocket.PrivateFile, runtime.NumCPU()); err != nil {
  69. panic(err)
  70. }
  71. }
  72. }
  73. // http & grpc
  74. http.Init(conf.Conf)
  75. rpcSrv := wardenServer.New(conf.Conf, srv)
  76. var (
  77. err error
  78. disCancel context.CancelFunc
  79. )
  80. if env.IP == "" {
  81. addr := ip.InternalIP()
  82. _, port, _ := net.SplitHostPort(conf.Conf.WardenServer.Addr)
  83. ins := &naming.Instance{
  84. Region: env.Region,
  85. Zone: env.Zone,
  86. Env: env.DeployEnv,
  87. Hostname: env.Hostname,
  88. Status: 1,
  89. AppID: "push.interface.broadcast",
  90. Addrs: []string{
  91. "grpc://" + addr + ":" + port,
  92. },
  93. Metadata: map[string]string{
  94. "weight": os.Getenv("WEIGHT"),
  95. "ip_addrs": os.Getenv("IP_ADDRS"),
  96. "ip_addrs_v6": os.Getenv("IP_ADDRS_V6"),
  97. "offline": os.Getenv("OFFLINE"),
  98. "overseas": os.Getenv("OVERSEAS"),
  99. },
  100. }
  101. disCancel, err = dis.Register(context.Background(), ins)
  102. if err != nil {
  103. panic(err)
  104. }
  105. go func() {
  106. for {
  107. var (
  108. err error
  109. conns int
  110. ips = make(map[string]struct{})
  111. roomIPs = make(map[string]struct{})
  112. )
  113. for _, bucket := range srv.Buckets() {
  114. for ip := range bucket.IPCount() {
  115. ips[ip] = struct{}{}
  116. }
  117. for ip := range bucket.RoomIPCount() {
  118. roomIPs[ip] = struct{}{}
  119. }
  120. conns += bucket.ChannelCount()
  121. }
  122. ins.Metadata["conns"] = fmt.Sprint(conns)
  123. ins.Metadata["ips"] = fmt.Sprint(len(ips))
  124. ins.Metadata["room_ips"] = fmt.Sprint(len(roomIPs))
  125. if err = dis.Set(ins); err != nil {
  126. log.Error("dis.Set(%+v) error(%v)", ins, err)
  127. time.Sleep(time.Second)
  128. continue
  129. }
  130. time.Sleep(time.Duration(conf.Conf.Broadcast.ServerTick))
  131. }
  132. }()
  133. }
  134. c := make(chan os.Signal, 1)
  135. signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  136. for {
  137. s := <-c
  138. log.Info("broadcast get a signal %s", s.String())
  139. switch s {
  140. case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  141. log.Info("broadcast %s exit", ver)
  142. if disCancel != nil {
  143. disCancel()
  144. }
  145. srv.Close()
  146. rpcSrv.Shutdown(context.Background())
  147. return
  148. case syscall.SIGHUP:
  149. default:
  150. return
  151. }
  152. }
  153. }