redis.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. v1pb "go-common/app/service/live/dao-anchor/api/grpc/v1"
  10. "go-common/library/cache/redis"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. )
  14. //实时消费缓存设计,异步落地
  15. const VALUE = "value"
  16. const DATE = "date" //是否最新消息,是为1(需要刷新到DB) 否为0(不需要刷新到DB)
  17. const DATE_1 = "1"
  18. type ListIntValueInfo struct {
  19. Value int64 `json:"value"`
  20. Time int64 `json:"time"`
  21. }
  22. //Set 设置实时数据
  23. func (d *Dao) Set(ctx context.Context, redisKey string, value string, timeOut int) (err error) {
  24. conn := d.redis.Get(ctx)
  25. defer conn.Close()
  26. if _, err = conn.Do("HMSET", redisKey, VALUE, value, DATE, DATE_1); err != nil {
  27. log.Error("redis_set_err:key=%s;value=%s;err=%v", redisKey, value, err)
  28. return
  29. }
  30. conn.Do("EXPIRE", redisKey, timeOut)
  31. return
  32. }
  33. //Incr 设置增加数据
  34. func (d *Dao) Incr(ctx context.Context, redisKey string, value int64, timeOut int) (err error) {
  35. conn := d.redis.Get(ctx)
  36. defer conn.Close()
  37. if err = conn.Send("HINCRBY", redisKey, VALUE, value); err != nil {
  38. log.Error("redis_incr_err:key=%s;value=%d;err=%v", redisKey, value, err)
  39. return
  40. }
  41. if err = conn.Send("HSET", redisKey, DATE, DATE_1); err != nil {
  42. log.Error("redis_hset_err:key=%s;date_value=%s;err=%v", redisKey, DATE_1, err)
  43. return
  44. }
  45. conn.Send("EXPIRE", redisKey, timeOut)
  46. if err = conn.Flush(); err != nil {
  47. log.Error("redisIncreRoomInfo conn.Flush error(%v)", err)
  48. return
  49. }
  50. _, err = conn.Receive()
  51. _, err = conn.Receive()
  52. _, err = conn.Receive()
  53. return
  54. }
  55. func (d *Dao) HGet(ctx context.Context, redisKey string) (resp int64, err error) {
  56. conn := d.redis.Get(ctx)
  57. defer conn.Close()
  58. resp, err = redis.Int64(conn.Do("HGET", redisKey, VALUE))
  59. if err != nil {
  60. log.Error("redis_incr_err:key=%s;reply=%d;err=%v", redisKey, resp, err)
  61. return
  62. }
  63. return
  64. }
  65. func (d *Dao) SetList(ctx context.Context, redisKey string, value string, timeOut int) (err error) {
  66. conn := d.redis.Get(ctx)
  67. defer conn.Close()
  68. if _, err = conn.Do("LPUSH", redisKey, value); err != nil {
  69. log.Error("redis_setList_error:key=%s;value=%s;err=%v", redisKey, value, err)
  70. return
  71. }
  72. conn.Do("EXPIRE", redisKey, timeOut)
  73. return
  74. }
  75. func (d *Dao) GetList(ctx context.Context, redisKey string, start int, end int) (resp []*ListIntValueInfo, err error) {
  76. conn := d.redis.Get(ctx)
  77. defer conn.Close()
  78. res, err := redis.Values(conn.Do("LRANGE", redisKey, start, end))
  79. if err != nil {
  80. log.Error("redis_getList_error:key=%s;err=%v", redisKey, err)
  81. return
  82. }
  83. for _, sList := range res {
  84. list := &ListIntValueInfo{}
  85. if err = json.Unmarshal(sList.([]byte), &list); err != nil {
  86. log.Error("GetList_json_error")
  87. continue
  88. }
  89. resp = append(resp, list)
  90. }
  91. return
  92. }
  93. // GetRoomRecordsCurrent return a list of records corresponding to `content`.
  94. func (d *Dao) GetRoomRecordsCurrent(ctx context.Context, content string, roomIds []int64) (list []int64, err error) {
  95. conn := d.redis.Get(ctx)
  96. defer conn.Close()
  97. for _, roomId := range roomIds {
  98. key := d.SRoomRecordCurrent(content, roomId)
  99. if err = conn.Send("HGET", key.RedisKey, VALUE); err != nil {
  100. log.Error("GetRoomRecordsCurrent conn.Send(HGET, %s, %s) error(%v)", key.RedisKey, VALUE, err)
  101. return nil, err
  102. }
  103. }
  104. if err = conn.Flush(); err != nil {
  105. log.Error("GetRoomRecordsCurrent conn.Flush error(%v)", err)
  106. return nil, err
  107. }
  108. for i := 0; i < len(roomIds); i++ {
  109. var data int64
  110. if data, err = redis.Int64(conn.Receive()); err != nil {
  111. if err != redis.ErrNil {
  112. log.Error("GetRoomRecordsCurrent conn.Receive() %d error(%v)", i, err)
  113. return nil, err
  114. }
  115. }
  116. list = append(list, data)
  117. }
  118. return
  119. }
  120. func (d *Dao) DelRoomRecordsCurrent(ctx context.Context, content string, roomIds []int64) (err error) {
  121. conn := d.redis.Get(ctx)
  122. defer conn.Close()
  123. args := make([]interface{}, len(roomIds))
  124. for i, roomId := range roomIds {
  125. args[i] = d.SRoomRecordCurrent(content, roomId)
  126. }
  127. if _, err = conn.Do("DEL", args...); err != nil {
  128. log.Error("DelRoomRecordsCurrent_del_error:%v;roomIds=%v", err, roomIds)
  129. return
  130. }
  131. return
  132. }
  133. func (d *Dao) SetRoomRecordsList(ctx context.Context, roomIds []int64, keys map[int64]interface{}, values map[int64]string) (err error) {
  134. conn := d.redis.Get(ctx)
  135. defer conn.Close()
  136. for _, roomId := range roomIds {
  137. keyInfo := keys[roomId].(*redisKeyResp)
  138. value := values[roomId]
  139. if err = conn.Send("LPUSH", keyInfo.RedisKey, value); err != nil {
  140. log.Error("SetRoomRecordsList conn.Send(LPUSH, %s, %s) error(%v)", keyInfo.RedisKey, value, err)
  141. return err
  142. }
  143. if err = conn.Send("EXPIRE", keyInfo.RedisKey, keyInfo.TimeOut); err != nil {
  144. log.Error("SetRoomRecordsList conn.Send(EXPIRE, %s, %d) error(%v)", keyInfo.RedisKey, keyInfo.TimeOut, err)
  145. return err
  146. }
  147. }
  148. if err = conn.Flush(); err != nil {
  149. log.Error("SetRoomRecordsList conn.Flush() error(%v)", err)
  150. return err
  151. }
  152. for range roomIds {
  153. conn.Receive()
  154. conn.Receive()
  155. }
  156. return
  157. }
  158. // GetRoomLiveRecordsRange can partially succeed, and in this case, err is still nil.
  159. func (d *Dao) GetRoomLiveRecordsRange(ctx context.Context, content string, roomIds []int64, liveTime int64, start, end int) (resp map[int64][]*ListIntValueInfo, err error) {
  160. conn := d.redis.Get(ctx)
  161. defer conn.Close()
  162. okRoomIds := make([]int64, 0, len(roomIds))
  163. for _, roomId := range roomIds {
  164. keyInfo := d.LRoomLiveRecordList(content, roomId, liveTime)
  165. if err = conn.Send("LRANGE", keyInfo.RedisKey, start, end); err != nil {
  166. log.Error("GetRoomLiveRecordsRange conn.Send(LRANGE, %s, %d, %d) error(%v)", keyInfo.RedisKey, start, end, err)
  167. continue
  168. }
  169. okRoomIds = append(okRoomIds, roomId)
  170. }
  171. if err = conn.Flush(); err != nil {
  172. log.Error("GetRoomLiveRecordsRange conn.Flush() error(%v)", err)
  173. return nil, err
  174. }
  175. resp = make(map[int64][]*ListIntValueInfo)
  176. for i := 0; i < len(okRoomIds); i++ {
  177. values, err := redis.Values(conn.Receive())
  178. if err != nil {
  179. log.Error("GetRoomLiveRecordsRange redis.Values(conn.Receive()) error(%v)", err)
  180. continue
  181. }
  182. roomId := okRoomIds[i]
  183. for _, info := range values {
  184. valueInfo := &ListIntValueInfo{}
  185. if err = json.Unmarshal(info.([]byte), &valueInfo); err != nil {
  186. log.Error("GetRoomLiveRecordsRange json unmarshall error(%v)", err)
  187. continue
  188. }
  189. resp[roomId] = append(resp[roomId], valueInfo)
  190. }
  191. }
  192. return resp, nil
  193. }
  194. const (
  195. _roomInfoKey = "room_info_v3:%d"
  196. _anchorInfoKey = "anchor_info:%d"
  197. _onlineListKey = "online_list_v3:%d"
  198. _tagListKey = "tag_list_v3:%d"
  199. _onlineListAllArea = 0
  200. )
  201. var (
  202. _allRoomInfoFields = []string{"uid", "title", "cover", "tags", "background", "description", "live_start_time", "live_status", "live_screen_type", "live_type", "lock_status", "lock_time", "hidden_time", "hidden_status", "area_id", "parent_area_id", "anchor_profile_type", "anchor_round_switch", "anchor_record_switch", "anchor_exp", "popularity_count", "keyframe"}
  203. )
  204. func (d *Dao) filterOutTagList(fields []string) (resp []string, needTagList bool) {
  205. resp = make([]string, 0, len(fields))
  206. for _, f := range fields {
  207. if f == "tag_list" {
  208. needTagList = true
  209. } else {
  210. resp = append(resp, f)
  211. }
  212. }
  213. return
  214. }
  215. func (d *Dao) redisGetRoomInfo(ctx context.Context, roomID int64, fields []string) (data *v1pb.RoomData, err error) {
  216. if len(fields) <= 0 {
  217. return
  218. }
  219. conn := d.redis.Get(ctx)
  220. defer conn.Close()
  221. roomKey := fmt.Sprintf(_roomInfoKey, roomID)
  222. ok, err := redis.Bool(conn.Do("EXPIRE", roomKey, d.c.Common.ExpireTime))
  223. if err != nil && err != redis.ErrNil {
  224. log.Error("redisGetRoomInfo conn.Do(EXPIRE, %s) error(%v)", roomKey, err)
  225. return
  226. }
  227. if !ok {
  228. err = ecode.RoomNotFound
  229. return
  230. }
  231. for _, f := range fields {
  232. if f == "room_id" {
  233. continue
  234. }
  235. if err = conn.Send("HGET", roomKey, f); err != nil {
  236. log.Error("redisGetRoomInfo conn.Send(HGET, %s, %s) error(%v)", roomKey, f, err)
  237. return
  238. }
  239. }
  240. if err = conn.Flush(); err != nil {
  241. log.Error("redisGetRoomInfo conn.Flush error(%v)", err)
  242. return
  243. }
  244. data = &v1pb.RoomData{
  245. RoomId: roomID,
  246. AnchorLevel: new(v1pb.AnchorLevel),
  247. }
  248. for _, f := range fields {
  249. if f == "room_id" {
  250. continue
  251. }
  252. reply, err := conn.Receive()
  253. if e, ok := err.(*redis.Error); ok && strings.Index(e.Error(), "WRONGTYPE") != -1 {
  254. return data, err
  255. }
  256. switch f {
  257. case "uid":
  258. data.Uid, err = redis.Int64(reply, err)
  259. case "title":
  260. data.Title, err = redis.String(reply, err)
  261. case "cover":
  262. data.Cover, err = redis.String(reply, err)
  263. case "tags":
  264. data.Tags, err = redis.String(reply, err)
  265. case "background":
  266. data.Background, err = redis.String(reply, err)
  267. case "description":
  268. data.Description, err = redis.String(reply, err)
  269. case "live_start_time":
  270. data.LiveStartTime, err = redis.Int64(reply, err)
  271. case "live_status":
  272. data.LiveStatus, err = redis.Int64(reply, err)
  273. case "live_screen_type":
  274. data.LiveScreenType, err = redis.Int64(reply, err)
  275. case "live_type":
  276. data.LiveType, err = redis.Int64(reply, err)
  277. case "lock_status":
  278. data.LockStatus, err = redis.Int64(reply, err)
  279. case "lock_time":
  280. data.LockTime, err = redis.Int64(reply, err)
  281. case "hidden_time":
  282. data.HiddenTime, err = redis.Int64(reply, err)
  283. case "hidden_status":
  284. data.HiddenStatus, err = redis.Int64(reply, err)
  285. case "area_id":
  286. data.AreaId, err = redis.Int64(reply, err)
  287. case "parent_area_id":
  288. data.ParentAreaId, err = redis.Int64(reply, err)
  289. case "anchor_san":
  290. data.AnchorSan, err = redis.Int64(reply, err)
  291. case "anchor_profile_type":
  292. data.AnchorProfileType, err = redis.Int64(reply, err)
  293. case "anchor_round_switch":
  294. data.AnchorRoundSwitch, err = redis.Int64(reply, err)
  295. case "anchor_record_switch":
  296. data.AnchorRecordSwitch, err = redis.Int64(reply, err)
  297. case "anchor_exp":
  298. data.AnchorLevel.Score, err = redis.Int64(reply, err)
  299. case "popularity_count":
  300. data.PopularityCount, err = redis.Int64(reply, err)
  301. case "keyframe":
  302. data.Keyframe, err = redis.String(reply, err)
  303. default:
  304. log.Error("redisGetRoomInfo unsupported field(%v), roomID(%d)", f, roomID)
  305. err = ecode.InvalidParam
  306. return nil, err
  307. }
  308. if err != nil {
  309. log.Warn("redisGetRoomInfo conn.Receive() field(%v), error(%v)", f, err)
  310. return nil, err
  311. }
  312. }
  313. return
  314. }
  315. func (d *Dao) redisSetRoomInfo(ctx context.Context, roomID int64, fields []string, data *v1pb.RoomData, fastfail bool) (err error) {
  316. if len(fields) <= 0 {
  317. return
  318. }
  319. conn := d.redis.Get(ctx)
  320. defer conn.Close()
  321. roomKey := fmt.Sprintf(_roomInfoKey, roomID)
  322. ok, err := redis.Bool(conn.Do("EXPIRE", roomKey, d.c.Common.ExpireTime))
  323. if err != nil && err != redis.ErrNil {
  324. log.Error("redisSetRoomInfo conn.Do(EXPIRE, %s) error(%v)", roomKey, err)
  325. return
  326. }
  327. if !ok && fastfail {
  328. return
  329. }
  330. args := make([]interface{}, len(fields)*2+1)
  331. args[0] = roomKey
  332. for i, f := range fields {
  333. var v interface{}
  334. switch f {
  335. case "roomid":
  336. v = data.RoomId
  337. case "uid":
  338. v = data.Uid
  339. case "title":
  340. v = data.Title
  341. case "cover":
  342. v = data.Cover
  343. case "tags":
  344. v = data.Tags
  345. case "background":
  346. v = data.Background
  347. case "description":
  348. v = data.Description
  349. case "live_start_time":
  350. v = data.LiveStartTime
  351. case "live_status":
  352. v = data.LiveStatus
  353. case "live_screen_type":
  354. v = data.LiveScreenType
  355. case "live_type":
  356. v = data.LiveType
  357. case "lock_status":
  358. v = data.LockStatus
  359. case "lock_time":
  360. v = data.LockTime
  361. case "hidden_time":
  362. v = data.HiddenTime
  363. case "hidden_status":
  364. v = data.HiddenStatus
  365. case "area_id":
  366. v = data.AreaId
  367. case "parent_area_id":
  368. v = data.ParentAreaId
  369. case "anchor_san":
  370. v = data.AnchorSan
  371. case "anchor_profile_type":
  372. v = data.AnchorProfileType
  373. case "anchor_round_switch":
  374. v = data.AnchorRoundSwitch
  375. case "anchor_record_switch":
  376. v = data.AnchorRecordSwitch
  377. case "anchor_exp":
  378. v = data.AnchorRecordSwitch
  379. case "popularity_count":
  380. v = data.PopularityCount
  381. case "keyframe":
  382. v = data.Keyframe
  383. default:
  384. log.Error("redisSetRoomInfo unsupported field(%v), roomID(%d)", f, roomID)
  385. return ecode.InvalidParam
  386. }
  387. args[i*2+1] = f
  388. args[i*2+2] = v
  389. }
  390. if _, err = conn.Do("HMSET", args...); err != nil {
  391. log.Error("redisSetRoomInfo conn.Do(HMSET, %v) error(%v)", args, err)
  392. return
  393. }
  394. return
  395. }
  396. func (d *Dao) redisIncreRoomInfo(ctx context.Context, roomID int64, fields []string, data *v1pb.RoomData) (err error) {
  397. if len(fields) <= 0 {
  398. return
  399. }
  400. conn := d.redis.Get(ctx)
  401. defer conn.Close()
  402. roomKey := fmt.Sprintf(_roomInfoKey, roomID)
  403. ok, err := redis.Bool(conn.Do("EXPIRE", roomKey, d.c.Common.ExpireTime))
  404. if err != nil && err != redis.ErrNil {
  405. log.Error("redisIncreRoomInfo conn.Do(EXPIRE, %s) error(%v)", roomKey, err)
  406. return
  407. }
  408. // fast fail if key not exists
  409. if !ok {
  410. return
  411. }
  412. for _, f := range fields {
  413. var v int64
  414. switch f {
  415. case "anchor_san":
  416. v = data.AnchorSan
  417. case "anchor_exp":
  418. v = data.AnchorRecordSwitch
  419. case "popularity_count":
  420. v = data.PopularityCount
  421. default:
  422. log.Error("redisIncreRoomInfo unsupported field(%v), roomID(%d)", f, roomID)
  423. return ecode.InvalidParam
  424. }
  425. if err = conn.Send("HINCRBY", roomKey, f, v); err != nil {
  426. log.Error("redisIncreRoomInfo conn.Send(HINCRBY, %s, %s, %d) error(%v)", roomKey, f, v, err)
  427. return
  428. }
  429. }
  430. if err = conn.Flush(); err != nil {
  431. log.Error("redisIncreRoomInfo conn.Flush error(%v)", err)
  432. return
  433. }
  434. for _, f := range fields {
  435. _, err = conn.Receive()
  436. if err != nil && err != redis.ErrNil {
  437. log.Error("redisIncreRoomInfo conn.Receive() field(%v), error(%v)", f, err)
  438. return
  439. }
  440. }
  441. return
  442. }
  443. func (d *Dao) redisGetOnlineList(ctx context.Context, areaID int64) (list []int64, err error) {
  444. conn := d.redis.Get(ctx)
  445. defer conn.Close()
  446. key := fmt.Sprintf(_onlineListKey, areaID)
  447. ok, err := redis.Bool(conn.Do("EXPIRE", key, d.c.Common.ExpireTime))
  448. if err != nil && err != redis.ErrNil {
  449. log.Error("redisGetOnlineList conn.Do(EXPIRE, %s) error(%v)", key, err)
  450. return
  451. }
  452. if !ok {
  453. // 不存在或者在播列表为空都会重新去DB获取
  454. return
  455. }
  456. list = make([]int64, 0)
  457. roomids, err := redis.Strings(conn.Do("SMEMBERS", key))
  458. if err != nil && err != redis.ErrNil {
  459. if e, ok := err.(*redis.Error); ok && strings.Index(e.Error(), "WRONGTYPE") == -1 {
  460. log.Error("redisGetOnlineList conn.Do(SMEMBERS, %s) error(%v)", key, err)
  461. return
  462. }
  463. return list, nil
  464. }
  465. for _, id := range roomids {
  466. roomid, err := strconv.ParseInt(id, 10, 64)
  467. if err != nil {
  468. log.Warn("redisGetOnlineList ParseInt(%d) error(%v)", roomid, err)
  469. return nil, err
  470. }
  471. list = append(list, roomid)
  472. }
  473. return
  474. }
  475. func (d *Dao) redisSetOnlineList(ctx context.Context, areaID int64, list []int64) (err error) {
  476. conn := d.redis.Get(ctx)
  477. defer conn.Close()
  478. key := fmt.Sprintf(_onlineListKey, areaID)
  479. if len(list) <= 0 {
  480. // 设置哨兵
  481. if _, err = conn.Do("SETEX", key, d.c.Common.ExpireTime, "emptylist"); err != nil {
  482. log.Error("redisSetOnlineList conn.Do(SETEX, %s, %v) error(%v)", key, list, err)
  483. }
  484. return
  485. }
  486. if _, err = conn.Do("DEL", key); err != nil && err != redis.ErrNil {
  487. log.Error("redisSetOnlineList conn.Do(DEL, %s) error(%v)", key, err)
  488. return
  489. }
  490. args := make([]interface{}, len(list)+1)
  491. args[0] = key
  492. for i, id := range list {
  493. args[i+1] = id
  494. }
  495. if _, err = conn.Do("SADD", args...); err != nil {
  496. log.Error("redisSetOnlineList conn.Do(SADD, %s, %v) error(%v)", key, list, err)
  497. return
  498. }
  499. return
  500. }
  501. func (d *Dao) redisAddOnlineList(ctx context.Context, areaID int64, roomID int64) (err error) {
  502. conn := d.redis.Get(ctx)
  503. defer conn.Close()
  504. key := fmt.Sprintf(_onlineListKey, areaID)
  505. typ, err := redis.String(conn.Do("TYPE", key))
  506. if err != nil && err != redis.ErrNil {
  507. log.Error("redisAddOnlineList conn.Do(TYPE, %s) error(%v)", key, err)
  508. return
  509. }
  510. if strings.ToLower(typ) == "none" {
  511. // 不存在就不处理
  512. return
  513. } else if strings.ToLower(typ) != "set" {
  514. if _, err = conn.Do("DEL", key); err != nil && err != redis.ErrNil {
  515. log.Error("redisAddOnlineList conn.Do(DEL, %s) error(%v)", key, err)
  516. return
  517. }
  518. }
  519. if _, err = conn.Do("SADD", key, roomID); err != nil {
  520. log.Error("redisAddOnlineList conn.Do(SADD, %s, %d) error(%v)", key, roomID, err)
  521. return
  522. }
  523. return
  524. }
  525. func (d *Dao) redisDelOnlineList(ctx context.Context, areaID int64, roomID int64) (err error) {
  526. conn := d.redis.Get(ctx)
  527. defer conn.Close()
  528. key := fmt.Sprintf(_onlineListKey, areaID)
  529. typ, err := redis.String(conn.Do("TYPE", key))
  530. if err != nil && err != redis.ErrNil {
  531. log.Error("redisDelOnlineList conn.Do(TYPE, %s) error(%v)", key, err)
  532. return
  533. }
  534. if strings.ToLower(typ) != "set" {
  535. // 不存在就不处理
  536. return
  537. }
  538. if _, err = conn.Do("SREM", key, roomID); err != nil {
  539. log.Error("redisDelOnlineList conn.Do(SADD, %s, %d) error(%v)", key, roomID, err)
  540. return
  541. }
  542. return
  543. }
  544. func (d *Dao) redisGetTagList(ctx context.Context, roomID int64) (list []*v1pb.TagData, err error) {
  545. conn := d.redis.Get(ctx)
  546. defer conn.Close()
  547. key := fmt.Sprintf(_tagListKey, roomID)
  548. ok, err := redis.Bool(conn.Do("EXPIRE", key, d.c.Common.ExpireTime))
  549. if err != nil && err != redis.ErrNil {
  550. log.Error("redisGetTagList conn.Do(EXPIRE, %s) error(%v)", key, err)
  551. return nil, err
  552. }
  553. if !ok {
  554. err = ecode.RoomNotFound
  555. return
  556. }
  557. list = make([]*v1pb.TagData, 0)
  558. tags, err := redis.Strings(conn.Do("SMEMBERS", key))
  559. if err != nil && err != redis.ErrNil {
  560. if e, ok := err.(*redis.Error); ok && strings.Index(e.Error(), "WRONGTYPE") == -1 {
  561. log.Error("redisGetTagList conn.Do(SMEMBERS, %s) error(%v)", key, err)
  562. return
  563. }
  564. return list, nil
  565. }
  566. for _, tag := range tags {
  567. seg := strings.Split(tag, ":")
  568. if len(seg) < 5 {
  569. log.Error("redisGetTagList Split(%s) error(%v)", tag, err)
  570. return nil, err
  571. }
  572. data := &v1pb.TagData{
  573. TagExt: strings.Join(seg[4:], ":"),
  574. }
  575. data.TagExpireAt, _ = strconv.ParseInt(seg[3], 10, 64)
  576. if data.TagExpireAt > time.Now().Unix() {
  577. data.TagId, _ = strconv.ParseInt(seg[0], 10, 64)
  578. data.TagSubId, _ = strconv.ParseInt(seg[1], 10, 64)
  579. data.TagValue, _ = strconv.ParseInt(seg[2], 10, 64)
  580. list = append(list, data)
  581. }
  582. }
  583. return
  584. }
  585. func (d *Dao) redisAddTag(ctx context.Context, roomID int64, tag *v1pb.TagData) (err error) {
  586. conn := d.redis.Get(ctx)
  587. defer conn.Close()
  588. key := fmt.Sprintf(_tagListKey, roomID)
  589. typ, err := redis.String(conn.Do("TYPE", key))
  590. if err != nil && err != redis.ErrNil {
  591. log.Error("redisAddTag conn.Do(TYPE, %s) error(%v)", key, err)
  592. return
  593. }
  594. if strings.ToLower(typ) == "none" {
  595. // 不存在就不处理
  596. return
  597. } else if strings.ToLower(typ) != "set" {
  598. if _, err = conn.Do("DEL", key); err != nil && err != redis.ErrNil {
  599. log.Error("redisAddTag conn.Do(DEL, %s) error(%v)", key, err)
  600. return
  601. }
  602. }
  603. tagVal := fmt.Sprintf("%d:%d:%d:%d:%s", tag.TagId, tag.TagSubId, tag.TagValue, tag.TagExpireAt, tag.TagExt)
  604. if _, err = conn.Do("SADD", key, tagVal); err != nil {
  605. log.Error("redisAddTag conn.Do(SADD, %s, %s) error(%v)", key, tagVal, err)
  606. return
  607. }
  608. return
  609. }
  610. func (d *Dao) redisSetTagList(ctx context.Context, roomID int64, list []*v1pb.TagData) (err error) {
  611. conn := d.redis.Get(ctx)
  612. defer conn.Close()
  613. key := fmt.Sprintf(_tagListKey, roomID)
  614. if len(list) <= 0 {
  615. // 设置哨兵
  616. if _, err = conn.Do("SETEX", key, d.c.Common.ExpireTime, "emptylist"); err != nil {
  617. log.Error("redisSetTagList conn.Do(SETEX, %s, %v) error(%v)", key, list, err)
  618. }
  619. return
  620. }
  621. if _, err = conn.Do("DEL", key); err != nil && err != redis.ErrNil {
  622. log.Error("redisSetTagList conn.Do(DEL, %s) error(%v)", key, err)
  623. return
  624. }
  625. args := make([]interface{}, len(list)+1)
  626. args[0] = key
  627. for i, tag := range list {
  628. args[i+1] = fmt.Sprintf("%d:%d:%d:%d:%s", tag.TagId, tag.TagSubId, tag.TagValue, tag.TagExpireAt, tag.TagExt)
  629. }
  630. if _, err = conn.Do("SADD", args...); err != nil {
  631. log.Error("redisSetTagList conn.Do(SADD, %s, %v) error(%v)", key, list, err)
  632. return
  633. }
  634. return
  635. }