bucket.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package server
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "go-common/app/interface/main/broadcast/conf"
  6. "go-common/app/service/main/broadcast/model"
  7. )
  8. // ProtoRoom room proto.
  9. type ProtoRoom struct {
  10. RoomID string
  11. Proto *model.Proto
  12. }
  13. // Bucket is a channel holder.
  14. type Bucket struct {
  15. c *conf.Bucket
  16. cLock sync.RWMutex // protect the channels for chs
  17. chs map[string]*Channel // map sub key to a channel
  18. // room
  19. rooms map[string]*Room // bucket room channels
  20. routines []chan ProtoRoom
  21. routinesNum uint64
  22. ipCnts map[string]int32
  23. roomIPCnts map[string]int32
  24. }
  25. // NewBucket new a bucket struct. store the key with im channel.
  26. func NewBucket(c *conf.Bucket) (b *Bucket) {
  27. b = new(Bucket)
  28. b.chs = make(map[string]*Channel, c.Channel)
  29. b.ipCnts = make(map[string]int32)
  30. b.roomIPCnts = make(map[string]int32)
  31. b.c = c
  32. b.rooms = make(map[string]*Room, c.Room)
  33. b.routines = make([]chan ProtoRoom, c.RoutineAmount)
  34. for i := uint64(0); i < c.RoutineAmount; i++ {
  35. c := make(chan ProtoRoom, c.RoutineSize)
  36. b.routines[i] = c
  37. go b.roomproc(c)
  38. }
  39. return
  40. }
  41. // ChannelCount channel count in the bucket
  42. func (b *Bucket) ChannelCount() int {
  43. return len(b.chs)
  44. }
  45. // RoomCount room count in the bucket
  46. func (b *Bucket) RoomCount() int {
  47. return len(b.rooms)
  48. }
  49. // RoomsCount get all room id where online number > 0.
  50. func (b *Bucket) RoomsCount() (res map[string]int32) {
  51. var (
  52. roomID string
  53. room *Room
  54. )
  55. b.cLock.RLock()
  56. res = make(map[string]int32)
  57. for roomID, room = range b.rooms {
  58. if room.Online > 0 {
  59. res[roomID] = room.Online
  60. }
  61. }
  62. b.cLock.RUnlock()
  63. return
  64. }
  65. // ChangeRoom change ro room
  66. func (b *Bucket) ChangeRoom(nrid string, ch *Channel) (err error) {
  67. var (
  68. nroom *Room
  69. ok bool
  70. oroom = ch.Room
  71. )
  72. // change to no room
  73. if nrid == model.NoRoom {
  74. if oroom != nil && oroom.Del(ch) {
  75. b.DelRoom(oroom)
  76. }
  77. ch.Room = nil
  78. return
  79. }
  80. b.cLock.Lock()
  81. if nroom, ok = b.rooms[nrid]; !ok {
  82. nroom = NewRoom(nrid)
  83. b.rooms[nrid] = nroom
  84. }
  85. b.cLock.Unlock()
  86. if err = nroom.Put(ch); err != nil {
  87. return
  88. }
  89. ch.Room = nroom
  90. if oroom != nil && oroom.Del(ch) {
  91. b.DelRoom(oroom)
  92. }
  93. return
  94. }
  95. // Put put a channel according with sub key.
  96. func (b *Bucket) Put(rid string, ch *Channel) (err error) {
  97. var (
  98. room *Room
  99. ok bool
  100. )
  101. b.cLock.Lock()
  102. // close old channel
  103. if dch := b.chs[ch.Key]; dch != nil {
  104. dch.Close()
  105. }
  106. b.chs[ch.Key] = ch
  107. if rid != model.NoRoom && rid != "" {
  108. if room, ok = b.rooms[rid]; !ok {
  109. room = NewRoom(rid)
  110. b.rooms[rid] = room
  111. }
  112. ch.Room = room
  113. b.roomIPCnts[ch.IP]++
  114. }
  115. b.ipCnts[ch.IP]++
  116. b.cLock.Unlock()
  117. if room != nil {
  118. err = room.Put(ch)
  119. }
  120. return
  121. }
  122. // Del delete the channel by sub key.
  123. func (b *Bucket) Del(dch *Channel) {
  124. var (
  125. ok bool
  126. ch *Channel
  127. room *Room
  128. )
  129. b.cLock.Lock()
  130. if ch, ok = b.chs[dch.Key]; ok {
  131. if room = ch.Room; room != nil {
  132. if b.roomIPCnts[ch.IP] > 1 {
  133. b.roomIPCnts[ch.IP]--
  134. } else {
  135. delete(b.roomIPCnts, ch.IP)
  136. }
  137. }
  138. if ch == dch {
  139. delete(b.chs, ch.Key)
  140. }
  141. // ip counter
  142. if b.ipCnts[ch.IP] > 1 {
  143. b.ipCnts[ch.IP]--
  144. } else {
  145. delete(b.ipCnts, ch.IP)
  146. }
  147. }
  148. b.cLock.Unlock()
  149. if room != nil && room.Del(ch) {
  150. // if empty room, must delete from bucket
  151. b.DelRoom(room)
  152. }
  153. }
  154. // Channel get a channel by sub key.
  155. func (b *Bucket) Channel(key string) (ch *Channel) {
  156. b.cLock.RLock()
  157. ch = b.chs[key]
  158. b.cLock.RUnlock()
  159. return
  160. }
  161. // Broadcast push msgs to all channels in the bucket.
  162. func (b *Bucket) Broadcast(p *model.Proto, op int32, platform string) {
  163. var ch *Channel
  164. b.cLock.RLock()
  165. for _, ch = range b.chs {
  166. if !ch.NeedPush(op, platform) {
  167. continue
  168. }
  169. ch.Push(p)
  170. }
  171. b.cLock.RUnlock()
  172. }
  173. // Room get a room by roomid.
  174. func (b *Bucket) Room(rid string) (room *Room) {
  175. b.cLock.RLock()
  176. room = b.rooms[rid]
  177. b.cLock.RUnlock()
  178. return
  179. }
  180. // DelRoom delete a room by roomid.
  181. func (b *Bucket) DelRoom(room *Room) {
  182. b.cLock.Lock()
  183. delete(b.rooms, room.ID)
  184. b.cLock.Unlock()
  185. room.Close()
  186. }
  187. // BroadcastRoom broadcast a message to specified room
  188. func (b *Bucket) BroadcastRoom(arg ProtoRoom) {
  189. num := atomic.AddUint64(&b.routinesNum, 1) % b.c.RoutineAmount
  190. b.routines[num] <- arg
  191. }
  192. // Rooms get all room id where online number > 0.
  193. func (b *Bucket) Rooms() (res map[string]struct{}) {
  194. var (
  195. roomID string
  196. room *Room
  197. )
  198. res = make(map[string]struct{})
  199. b.cLock.RLock()
  200. for roomID, room = range b.rooms {
  201. if room.Online > 0 {
  202. res[roomID] = struct{}{}
  203. }
  204. }
  205. b.cLock.RUnlock()
  206. return
  207. }
  208. // IPCount get ip count.
  209. func (b *Bucket) IPCount() (res map[string]struct{}) {
  210. var (
  211. ip string
  212. )
  213. b.cLock.RLock()
  214. res = make(map[string]struct{}, len(b.ipCnts))
  215. for ip = range b.ipCnts {
  216. res[ip] = struct{}{}
  217. }
  218. b.cLock.RUnlock()
  219. return
  220. }
  221. // RoomIPCount get ip count.
  222. func (b *Bucket) RoomIPCount() (res map[string]struct{}) {
  223. var (
  224. ip string
  225. )
  226. b.cLock.RLock()
  227. res = make(map[string]struct{}, len(b.roomIPCnts))
  228. for ip = range b.roomIPCnts {
  229. res[ip] = struct{}{}
  230. }
  231. b.cLock.RUnlock()
  232. return
  233. }
  234. // UpRoomsCount update all room count
  235. func (b *Bucket) UpRoomsCount(roomCountMap map[string]int32) {
  236. var (
  237. roomID string
  238. room *Room
  239. )
  240. b.cLock.RLock()
  241. for roomID, room = range b.rooms {
  242. room.AllOnline = roomCountMap[roomID]
  243. }
  244. b.cLock.RUnlock()
  245. }
  246. // roomproc
  247. func (b *Bucket) roomproc(c chan ProtoRoom) {
  248. for {
  249. arg := <-c
  250. if room := b.Room(arg.RoomID); room != nil {
  251. room.Push(arg.Proto)
  252. }
  253. }
  254. }