redis.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "go-common/app/service/main/broadcast/model"
  9. "go-common/library/cache/redis"
  10. "go-common/library/log"
  11. )
  12. const (
  13. _prefixMidServer = "mid_%d" // mid -> key:server
  14. _prefixKeyServer = "key_%s" // key -> server
  15. _prefixServerOnline = "ol_%s" // server -> online
  16. _keyServers = "servers"
  17. _keyMigrateRooms = "migrate_rooms"
  18. _keyMigrateServers = "migrate_servers"
  19. )
  20. var (
  21. _redisExpire = int32(time.Minute * 30 / time.Second)
  22. )
  23. func keyMidServer(mid int64) string {
  24. return fmt.Sprintf(_prefixMidServer, mid)
  25. }
  26. func keyKeyServer(key string) string {
  27. return fmt.Sprintf(_prefixKeyServer, key)
  28. }
  29. func keyServerOnline(key string) string {
  30. return fmt.Sprintf(_prefixServerOnline, key)
  31. }
  32. // pingRedis check redis connection.
  33. func (d *Dao) pingRedis(c context.Context) (err error) {
  34. conn := d.redis.Get(c)
  35. _, err = conn.Do("SET", "PING", "PONG")
  36. conn.Close()
  37. return
  38. }
  39. // AddMapping add a mapping.
  40. // Mapping:
  41. // mid -> key_server
  42. // key -> server
  43. func (d *Dao) AddMapping(c context.Context, mid int64, key, server string) (err error) {
  44. conn := d.redis.Get(c)
  45. defer conn.Close()
  46. var n = 2
  47. if mid > 0 {
  48. if err = conn.Send("HSET", keyMidServer(mid), key, server); err != nil {
  49. log.Error("conn.Send(HSET %d,%s,%s) error(%v)", mid, server, key, err)
  50. return
  51. }
  52. if err = conn.Send("EXPIRE", keyMidServer(mid), _redisExpire); err != nil {
  53. log.Error("conn.Send(EXPIRE %d,%s,%s) error(%v)", mid, key, server, err)
  54. return
  55. }
  56. n += 2
  57. }
  58. if err = conn.Send("SET", keyKeyServer(key), server); err != nil {
  59. log.Error("conn.Send(HSET %d,%s,%s) error(%v)", mid, server, key, err)
  60. return
  61. }
  62. if err = conn.Send("EXPIRE", keyKeyServer(key), _redisExpire); err != nil {
  63. log.Error("conn.Send(EXPIRE %d,%s,%s) error(%v)", mid, key, server, err)
  64. return
  65. }
  66. if err = conn.Flush(); err != nil {
  67. log.Error("conn.Flush() error(%v)", err)
  68. return
  69. }
  70. for i := 0; i < n; i++ {
  71. if _, err = conn.Receive(); err != nil {
  72. log.Error("conn.Receive() error(%v)", err)
  73. return
  74. }
  75. }
  76. return
  77. }
  78. // ExpireMapping expire a mapping.
  79. func (d *Dao) ExpireMapping(c context.Context, mid int64, key string) (has bool, err error) {
  80. conn := d.redis.Get(c)
  81. defer conn.Close()
  82. var n = 1
  83. if mid > 0 {
  84. if err = conn.Send("EXPIRE", keyMidServer(mid), _redisExpire); err != nil {
  85. log.Error("conn.Send(EXPIRE %d,%s) error(%v)", mid, key, err)
  86. return
  87. }
  88. n++
  89. }
  90. if err = conn.Send("EXPIRE", keyKeyServer(key), _redisExpire); err != nil {
  91. log.Error("conn.Send(EXPIRE %d,%s) error(%v)", mid, key, err)
  92. return
  93. }
  94. if err = conn.Flush(); err != nil {
  95. log.Error("conn.Flush() error(%v)", err)
  96. return
  97. }
  98. for i := 0; i < n; i++ {
  99. if has, err = redis.Bool(conn.Receive()); err != nil {
  100. log.Error("conn.Receive() error(%v)", err)
  101. return
  102. }
  103. }
  104. return
  105. }
  106. // DelMapping del a mapping.
  107. func (d *Dao) DelMapping(c context.Context, mid int64, key, server string) (has bool, err error) {
  108. conn := d.redis.Get(c)
  109. defer conn.Close()
  110. n := 1
  111. if mid > 0 {
  112. if err = conn.Send("HDEL", keyMidServer(mid), key); err != nil {
  113. log.Error("conn.Send(HDEL %d,%s,%s) error(%v)", mid, key, server, err)
  114. return
  115. }
  116. n++
  117. }
  118. if err = conn.Send("DEL", keyKeyServer(key)); err != nil {
  119. log.Error("conn.Send(HDEL %d,%s,%s) error(%v)", mid, key, server, err)
  120. return
  121. }
  122. if err = conn.Flush(); err != nil {
  123. log.Error("conn.Flush() error(%v)", err)
  124. return
  125. }
  126. for i := 0; i < n; i++ {
  127. if has, err = redis.Bool(conn.Receive()); err != nil {
  128. log.Error("conn.Receive() error(%v)", err)
  129. return
  130. }
  131. }
  132. return
  133. }
  134. // ServersByKeys get a server by key.
  135. func (d *Dao) ServersByKeys(c context.Context, keys []string) (res []string, err error) {
  136. conn := d.redis.Get(c)
  137. defer conn.Close()
  138. var args []interface{}
  139. for _, key := range keys {
  140. args = append(args, keyKeyServer(key))
  141. }
  142. if res, err = redis.Strings(conn.Do("MGET", args...)); err != nil {
  143. log.Error("conn.Do(MGET %v) error(%v)", args, err)
  144. }
  145. return
  146. }
  147. // KeysByMids get a key server by mid.
  148. func (d *Dao) KeysByMids(c context.Context, mids []int64) (ress map[string]string, olMids []int64, err error) {
  149. conn := d.redis.Get(c)
  150. defer conn.Close()
  151. ress = make(map[string]string)
  152. for _, mid := range mids {
  153. if err = conn.Send("HGETALL", keyMidServer(mid)); err != nil {
  154. log.Error("conn.Do(HGETALL %d) error(%v)", mid, err)
  155. return
  156. }
  157. }
  158. if err = conn.Flush(); err != nil {
  159. log.Error("conn.Flush() error(%v)", err)
  160. return
  161. }
  162. for idx := 0; idx < len(mids); idx++ {
  163. var (
  164. res map[string]string
  165. )
  166. if res, err = redis.StringMap(conn.Receive()); err != nil {
  167. log.Error("conn.Receive() error(%v)", err)
  168. return
  169. }
  170. if len(res) > 0 {
  171. olMids = append(olMids, mids[idx])
  172. }
  173. for k, v := range res {
  174. ress[k] = v
  175. }
  176. }
  177. return
  178. }
  179. // AddServerOnline add server online.
  180. func (d *Dao) AddServerOnline(c context.Context, server string, sharding int32, online *model.Online) (err error) {
  181. conn := d.redis.Get(c)
  182. defer conn.Close()
  183. b, _ := json.Marshal(online)
  184. key := keyServerOnline(server)
  185. if err = conn.Send("HSET", key, strconv.FormatInt(int64(sharding), 10), b); err != nil {
  186. log.Error("conn.Send(SET %s,%d) error(%v)", key, sharding, err)
  187. return
  188. }
  189. if err = conn.Send("EXPIRE", key, _redisExpire); err != nil {
  190. log.Error("conn.Send(EXPIRE %s) error(%v)", key, err)
  191. return
  192. }
  193. if err = conn.Flush(); err != nil {
  194. log.Error("conn.Flush() error(%v)", err)
  195. return
  196. }
  197. for i := 0; i < 2; i++ {
  198. if _, err = conn.Receive(); err != nil {
  199. log.Error("conn.Receive() error(%v)", err)
  200. return
  201. }
  202. }
  203. return
  204. }
  205. // ServerOnline get a server online.
  206. func (d *Dao) ServerOnline(c context.Context, server string, shard int) (online *model.Online, err error) {
  207. conn := d.redis.Get(c)
  208. defer conn.Close()
  209. key := keyServerOnline(server)
  210. hashKey := fmt.Sprint(shard)
  211. b, err := redis.Bytes(conn.Do("HGET", key, hashKey))
  212. if err != nil {
  213. if err != redis.ErrNil {
  214. log.Error("conn.Do(HGET %s %s) error(%v)", key, hashKey, err)
  215. } else {
  216. err = nil
  217. }
  218. return
  219. }
  220. online = new(model.Online)
  221. if err = json.Unmarshal(b, online); err != nil {
  222. log.Error("serverOnline json.Unmarshal(%s) error(%v)", b, err)
  223. }
  224. return
  225. }
  226. // DelServerOnline del a server online.
  227. func (d *Dao) DelServerOnline(c context.Context, server string, shard int) (err error) {
  228. conn := d.redis.Get(c)
  229. defer conn.Close()
  230. key := keyServerOnline(server)
  231. hashKey := fmt.Sprint(shard)
  232. if _, err = conn.Do("HDEL", key, hashKey); err != nil {
  233. log.Error("conn.Do(DEL %s) error(%v)", key, err)
  234. }
  235. return
  236. }
  237. // SetServers set servers info.
  238. func (d *Dao) SetServers(c context.Context, srvs []*model.ServerInfo) (err error) {
  239. conn := d.redis.Get(c)
  240. defer conn.Close()
  241. b, _ := json.Marshal(srvs)
  242. if _, err = conn.Do("SET", _keyServers, b); err != nil {
  243. log.Error("conn.Do(SET %s,%s) error(%v)", _keyServers, b, err)
  244. }
  245. return
  246. }
  247. // Servers return servers.
  248. func (d *Dao) Servers(c context.Context) (srvs []*model.ServerInfo, err error) {
  249. conn := d.redis.Get(c)
  250. defer conn.Close()
  251. b, err := redis.Bytes(conn.Do("GET", _keyServers))
  252. if err != nil {
  253. if err != redis.ErrNil {
  254. log.Error("conn.Do(GET %s) error(%v)", _keyServers, err)
  255. } else {
  256. err = nil
  257. }
  258. return
  259. }
  260. if err = json.Unmarshal(b, &srvs); err != nil {
  261. log.Error("MigrateServers json.Unmarshal(%s) error(%v)", b, err)
  262. }
  263. return
  264. }
  265. // MigrateServers migrate servers.
  266. func (d *Dao) MigrateServers(c context.Context) (conns, ips int64, err error) {
  267. var servers struct {
  268. Conns int64 `json:"conn_count"`
  269. IPs int64 `json:"ip_count"`
  270. }
  271. conn := d.redis.Get(c)
  272. defer conn.Close()
  273. b, err := redis.Bytes(conn.Do("GET", _keyMigrateServers))
  274. if err != nil {
  275. if err != redis.ErrNil {
  276. log.Error("conn.Do(GET %s) error(%v)", _keyMigrateServers, err)
  277. } else {
  278. err = nil
  279. }
  280. return
  281. }
  282. if err = json.Unmarshal(b, &servers); err != nil {
  283. log.Error("MigrateServers json.Unmarshal(%s) error(%v)", b, err)
  284. return
  285. }
  286. conns = servers.Conns
  287. ips = servers.IPs
  288. return
  289. }
  290. // MigrateRooms migrate rooms.
  291. func (d *Dao) MigrateRooms(c context.Context, shard int) (rooms map[string]int32, err error) {
  292. conn := d.redis.Get(c)
  293. defer conn.Close()
  294. b, err := redis.Bytes(conn.Do("HGET", _keyMigrateRooms, fmt.Sprint(shard)))
  295. if err != nil {
  296. if err != redis.ErrNil {
  297. log.Error("conn.Do(HGET %s,%d) error(%v)", _keyMigrateRooms, shard, err)
  298. } else {
  299. err = nil
  300. }
  301. return
  302. }
  303. rooms = make(map[string]int32)
  304. if err = json.Unmarshal(b, &rooms); err != nil {
  305. log.Error("migrateRooms json.Unmarshal() error(%v)", err)
  306. }
  307. return
  308. }