server.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package server
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "sync/atomic"
  7. "time"
  8. "go-common/app/interface/main/broadcast/conf"
  9. pb "go-common/app/service/main/broadcast/api/grpc/v1"
  10. "go-common/library/conf/env"
  11. "go-common/library/log"
  12. bm "go-common/library/net/http/blademaster"
  13. "go-common/library/net/netutil"
  14. "github.com/google/uuid"
  15. "github.com/zhenjl/cityhash"
  16. )
  17. var (
  18. _maxInt = 1<<31 - 1
  19. _roomOnlineValue atomic.Value
  20. )
  21. const (
  22. _clientHeartbeat = time.Second * 90
  23. _minHeartbeatSecond = 600 // 10m
  24. _maxHeartbeatSecond = 1200 // 20m
  25. _roomsShard = 32
  26. )
  27. // Server .
  28. type Server struct {
  29. c *conf.Config
  30. round *Round // accept round store
  31. buckets []*Bucket // subkey bucket
  32. bucketIdx uint32
  33. serverID string
  34. rpcClient pb.ZergClient
  35. httpCli *bm.Client
  36. backoff *netutil.BackoffConfig
  37. }
  38. // NewServer returns a new Server.
  39. func NewServer(c *conf.Config) *Server {
  40. var err error
  41. s := new(Server)
  42. s.c = c
  43. s.serverID = env.Hostname
  44. s.rpcClient, err = pb.NewClient(c.WardenClient)
  45. if err != nil {
  46. panic(err)
  47. }
  48. s.httpCli = bm.NewClient(c.HTTPClient)
  49. s.round = NewRound(conf.Conf)
  50. // init bucket
  51. s.buckets = make([]*Bucket, c.Bucket.Size)
  52. s.bucketIdx = uint32(c.Bucket.Size)
  53. for i := 0; i < c.Bucket.Size; i++ {
  54. s.buckets[i] = NewBucket(c.Bucket)
  55. }
  56. s.backoff = &netutil.BackoffConfig{
  57. MaxDelay: 5 * time.Second,
  58. BaseDelay: 1.0 * time.Second,
  59. Factor: 1.6,
  60. Jitter: 0.2,
  61. }
  62. s.loadOnline()
  63. go s.onlineproc()
  64. return s
  65. }
  66. // Buckets return all buckets.
  67. func (s *Server) Buckets() []*Bucket {
  68. return s.buckets
  69. }
  70. // Bucket get the bucket by subkey.
  71. func (s *Server) Bucket(subKey string) *Bucket {
  72. idx := cityhash.CityHash32([]byte(subKey), uint32(len(subKey))) % s.bucketIdx
  73. if s.c.Broadcast.Debug {
  74. log.Info("%s hit channel bucket index: %d use cityhash", subKey, idx)
  75. }
  76. return s.buckets[idx]
  77. }
  78. // NextKey generate a server key.
  79. func (s *Server) NextKey() string {
  80. u, err := uuid.NewRandom()
  81. if err == nil {
  82. return u.String()
  83. }
  84. return fmt.Sprintf("%s-%d", s.serverID, time.Now().UnixNano())
  85. }
  86. // RandServerHearbeat rand server heartbeat.
  87. func (s *Server) RandServerHearbeat() time.Duration {
  88. return time.Duration(_minHeartbeatSecond+rand.Intn(_maxHeartbeatSecond-_minHeartbeatSecond)) * time.Second
  89. }
  90. // Close close the server.
  91. func (s *Server) Close() (err error) {
  92. return
  93. }
  94. func (s *Server) onlineproc() {
  95. var retry int
  96. for {
  97. if err := s.loadOnline(); err != nil {
  98. retry++
  99. time.Sleep(s.backoff.Backoff(retry))
  100. continue
  101. }
  102. retry = 0
  103. time.Sleep(time.Duration(s.c.Broadcast.OnlineTick))
  104. }
  105. }
  106. func (s *Server) loadOnline() (err error) {
  107. roomCountShard := make(map[uint32]map[string]int32)
  108. for _, bucket := range s.buckets {
  109. for roomID, count := range bucket.RoomsCount() {
  110. hash := cityhash.CityHash32([]byte(roomID), uint32(len(roomID))) % _roomsShard
  111. roomCount, ok := roomCountShard[hash]
  112. if !ok {
  113. roomCount = make(map[string]int32)
  114. roomCountShard[hash] = roomCount
  115. }
  116. roomCount[roomID] += count
  117. }
  118. }
  119. allRoomsCount := make(map[string]int32)
  120. for i := uint32(0); i < _roomsShard; i++ {
  121. var mergedRoomsCount map[string]int32
  122. mergedRoomsCount, err = s.RenewOnline(context.Background(), s.serverID, int32(i), roomCountShard[i])
  123. if err != nil {
  124. log.Error("s.RenewOnline(%s, %d, %d) error(%v)", s.serverID, i, len(roomCountShard[i]), err)
  125. return
  126. }
  127. for roomID, count := range mergedRoomsCount {
  128. allRoomsCount[roomID] = count
  129. }
  130. }
  131. for _, bucket := range s.buckets {
  132. bucket.UpRoomsCount(allRoomsCount)
  133. }
  134. _roomOnlineValue.Store(allRoomsCount)
  135. return
  136. }
  137. func roomOnline(rid string) int32 {
  138. online, ok := _roomOnlineValue.Load().(map[string]int32)
  139. if !ok {
  140. return 0
  141. }
  142. return online[rid]
  143. }