mysql.go 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/service/live/dao-anchor/model"
  7. v1pb "go-common/app/service/live/dao-anchor/api/grpc/v1"
  8. "go-common/library/ecode"
  9. "go-common/library/log"
  10. "github.com/pkg/errors"
  11. )
  12. //房間狀態常量
  13. const (
  14. LIVE_OPEN = 1
  15. LIVE_CLOSE = 0
  16. LIVE_ROUND = 2
  17. )
  18. //attr表相关常量定义
  19. const (
  20. ATTRID_POPULARITY = 1
  21. ATTRID_REVENUE = 2
  22. ATTRID_DANMU = 3
  23. ATTRID_RANK_LIST = 4
  24. ATTRID_VALID_LIVE_DAYS = 5
  25. )
  26. const (
  27. //ATTRSUBID_RANK_HOUR 小时榜
  28. ATTRSUBID_RANK_HOUR = 1
  29. )
  30. const (
  31. //ATTRSUBID_POPULARITY_REALTIME 实时人气值
  32. ATTRSUBID_POPULARITY_REALTIME = 1
  33. //ATTRSUBID_POPULARITY_MAX_TO_ARG_7 7日人气峰值的均值
  34. ATTRSUBID_POPULARITY_MAX_TO_ARG_7 = 2
  35. //ATTRSUBID_POPULARITY_MAX_TO_ARG_30 30日人气峰值的均值
  36. ATTRSUBID_POPULARITY_MAX_TO_ARG_30 = 3
  37. )
  38. const (
  39. //ATTRSUBID_REVENUE_MINUTE_NUM_15 15分钟营收
  40. ATTRSUBID_REVENUE_MINUTE_NUM_15 = 1
  41. //ATTRSUBID_REVENUE_MINUTE_NUM_30 30分钟营收
  42. ATTRSUBID_REVENUE_MINUTE_NUM_30 = 2
  43. //ATTRSUBID_REVENUE_MINUTE_NUM_45 45分钟营收
  44. ATTRSUBID_REVENUE_MINUTE_NUM_45 = 3
  45. //ATTRSUBID_REVENUE_MINUTE_NUM_60 60分钟营收
  46. ATTRSUBID_REVENUE_MINUTE_NUM_60 = 4
  47. )
  48. const (
  49. //ATTRSUBID_DANMU_MINUTE_NUM_15 15分钟弹幕
  50. ATTRSUBID_DANMU_MINUTE_NUM_15 = 1
  51. //ATTRSUBID_DANMU_MINUTE_NUM_30 30分钟弹幕
  52. ATTRSUBID_DANMU_MINUTE_NUM_30 = 2
  53. //ATTRSUBID_DANMU_MINUTE_NUM_45 45分钟弹幕
  54. ATTRSUBID_DANMU_MINUTE_NUM_45 = 3
  55. //ATTRSUBID_DANMU_MINUTE_NUM_60 60分钟弹幕
  56. ATTRSUBID_DANMU_MINUTE_NUM_60 = 4
  57. )
  58. //有效开播天数
  59. const (
  60. VALID_LIVE_DAYS_TYPE_1 = 1 //一次开播大于5分钟
  61. VALID_LIVE_DAYS_TYPE_2 = 2 // 累加大于等于120分钟
  62. )
  63. const (
  64. //ATTRSUBID_VALID_LIVE_DAYS_TYPE_1_DAY_7 近7天的有效天数,(有效天数:一次开播大于5分钟)
  65. ATTRSUBID_VALID_LIVE_DAYS_TYPE_1_DAY_7 = 1
  66. //ATTRSUBID_VALID_LIVE_DAYS_TYPE_1_DAY_14 近14天的有效天数,(有效天数:一次开播大于5分钟)
  67. ATTRSUBID_VALID_LIVE_DAYS_TYPE_1_DAY_14 = 2
  68. //ATTRSUBID_VALID_LIVE_DAYS_TYPE_2_DAY_7 近7天的有效天数,(有效天数:累加大于等于120分钟)
  69. ATTRSUBID_VALID_LIVE_DAYS_TYPE_2_DAY_7 = 3
  70. //ATTRSUBID_VALID_LIVE_DAYS_TYPE_2_DAY_30 近30天的有效天数,(有效天数:累加大于等于120分钟)
  71. ATTRSUBID_VALID_LIVE_DAYS_TYPE_2_DAY_30 = 4
  72. )
  73. //tag表相关常量定义
  74. const (
  75. TAGID_PK = 1
  76. TAGID_LOTTERY = 2
  77. ROOM_EXT_SHARDING = 10
  78. EXP_2_SCORE_RATE = 100
  79. )
  80. const (
  81. _roomTable = "room"
  82. _roomExtTablePrefix = "room_extend"
  83. _anchorTable = "anchor"
  84. _tagTable = "tag"
  85. _attrTable = "attr"
  86. _shortTable = "ap_short_room"
  87. _subAreaTable = "ap_room_area_v2"
  88. // add room info
  89. _addRoomInfo1 = "insert into `%s` (`uid`) values (?)"
  90. // add room info
  91. _addRoomInfo2 = "insert into `%s` (`uid`,`room_id`) values (?,?)"
  92. // add room extend info
  93. _addRoomExtInfo = "insert into `%s_%d` (`room_id`) values (?)"
  94. // add anchor info
  95. _addAnchorInfo = "insert into `%s` (`uid`,`room_id`,`san_score`) values (?,?,12)"
  96. // update room info
  97. _updateRoomInfo = "update `%s` set %s where `room_id`=?"
  98. // update room extend info
  99. _updateRoomExtInfo = "update `%s_%d` set %s where `room_id`=?"
  100. // update anchor info
  101. _updateAnchorInfo = "update `%s` set %s where `uid`=?"
  102. // tag create info
  103. _tagCreateInfo = "insert ignore into `%s` (`room_id`,`tag_id`,`tag_sub_id`,`tag_value`,`tag_ext`,`tag_expire_at`) values (?,?,?,?,?,?) on duplicate key update `tag_value`=?,`tag_ext`=?,`tag_expire_at`=?"
  104. // attr create info
  105. _attrCreateInfo = "insert ignore into `%s` (`room_id`,`attr_id`,`attr_sub_id`,`attr_value`,`attr_ext`) values (?,?,?,?,?) on duplicate key update `attr_value`=?,`attr_ext`=?"
  106. // attr set ex info
  107. _attrSetRoomId = "update `%s` set `room_id`=? where `attr_id`=? and `attr_sub_id`=? and `attr_value`=?"
  108. // attr set ex info
  109. _attrSetValue = "update `%s` set `attr_value`=? where `room_id`=? and `attr_id`=? and `attr_sub_id`=?"
  110. // attr select room_id
  111. _attrSelectRoomId = "select `room_id` from `%s` where `attr_id`=? and `attr_sub_id`=? and `attr_value`=?"
  112. //
  113. _attrSelectValue = "select `attr_value` from `%s` where `room_id`=? and `attr_id`=? and `attr_sub_id`=?"
  114. // query online room info
  115. _queryOnlineRoomInfo = "select `room_id`,`uid`,`title`,`description`,`tags`,`background`,`cover`,`lock_status`,`lock_time`,`hidden_time`,`record_switch`,`round_switch`,`live_start_time`,`live_screen_type`,`live_area_id`,`live_area_parent_id`,`live_type` from `room` where `live_start_time`!=0 order by `room_id` limit ?,?"
  116. // TODO 在播房间是否要处理轮播场景
  117. // query online room by area info
  118. _queryOnlineRoomByAreaInfo = "select `room_id` from `room` where `live_start_time`!=0 %s order by `live_start_time` desc"
  119. _queryOnlineRoomByAreaCond = "and (`live_area_id`=%d or `live_area_parent_id`=%d)"
  120. // query room info
  121. _queryRoomInfo = "select `room_id`,`uid`,`title`,`description`,`tags`,`background`,`cover`,`lock_status`,`lock_time`,`hidden_time`,`record_switch`,`round_switch`,`live_start_time`,`live_screen_type`,`live_area_id`,`live_area_parent_id`,`live_type` from `room` where `room_id` in (%s)"
  122. // query anchor info
  123. _queryAnchorInfo = "select `room_id`,`san_score`,`profile_type`,`round_status`,`record_status`,`exp` from `%s` where `uid` in (%s)"
  124. // query tag info
  125. _queryTagInfo = "select `room_id`,`tag_id`,`tag_sub_id`,`tag_value`,`tag_ext`,`tag_expire_at` from `%s` where `room_id` in (%s) and `tag_expire_at`>?"
  126. // query room ext info
  127. _queryRoomExtInfo = "select `room_id`,`keyframe`,`popularity_count` from `%s_%d` where `room_id` in (%s)"
  128. // get parent area id
  129. _queryParentAreaID = "select `parent_id` from `%s` where `id`=?"
  130. // get short id
  131. _queryShortID = "select `short_id`,`roomid` from `%s` where `roomid` in (%s)"
  132. // filter out short room-id and its corresponding room-id
  133. _filterShortID = "select `short_id`,`roomid` from `%s` where `short_id` in (%s) and `status`=1"
  134. // query attr info
  135. _queryAttrInfo = "select `room_id`,`attr_value` from `%s` where `attr_id`=? and `attr_sub_id`=? and `room_id` in (%s)"
  136. // query attr info
  137. _queryAttrInfo2 = "select `room_id`,`attr_sub_id`,`attr_value` from `%s` where `attr_id`=? order by `attr_sub_id`"
  138. // delete attr info
  139. _deleteAttrInfo = "delete from `%s` where `attr_id`=? and `attr_sub_id`=?"
  140. // query area info
  141. _queryAreaInfo = "select `id`,`name`,`parent_id` from `%s` where `id`=?"
  142. // query sub-areas for a given area
  143. _querySubAreaInfo = "select `id`,`name` from `%s` where `parent_id`=?"
  144. )
  145. // Turns short-id, if any, into room-id
  146. func (d *Dao) dbNormalizeRoomIDs(ctx context.Context, roomIDs []int64) (resp []int64, err error) {
  147. if len(roomIDs) <= 0 {
  148. return
  149. }
  150. resp = make([]int64, len(roomIDs))
  151. // Resort to cache first, and for miss roomid, we save pair (roomid, index) to preserve order
  152. // of our final result
  153. agnosticRoomIds := make(map[int64]int)
  154. for i, roomID := range roomIDs {
  155. xid, ok := d.shortIDMapping.Get(fmt.Sprintf("%d", roomID))
  156. if ok {
  157. resp[i] = xid.(int64)
  158. } else {
  159. agnosticRoomIds[roomID] = i
  160. }
  161. }
  162. // We are done.
  163. if len(agnosticRoomIds) <= 0 {
  164. return
  165. }
  166. var queryValues string
  167. for roomID := range agnosticRoomIds {
  168. if len(queryValues) > 0 {
  169. queryValues += ","
  170. }
  171. queryValues += fmt.Sprintf("%d", roomID)
  172. }
  173. sql := fmt.Sprintf(_filterShortID, _shortTable, queryValues)
  174. rows, err := d.dbLiveApp.Query(ctx, sql)
  175. if err != nil {
  176. log.Error("[dao.dao-anchor.mysql|dbNormalizeRoomIDs] query short id record error(%v), sql(%s)", err, sql)
  177. return nil, err
  178. }
  179. defer rows.Close()
  180. shortIDMapping := make(map[int64]int64)
  181. for rows.Next() {
  182. var shortID int64
  183. var roomID int64
  184. err = rows.Scan(&shortID, &roomID)
  185. if err != nil {
  186. log.Error("[dao.dao-anchor.mysql|dbNormalizeRoomIDs] scan short id record error(%v), roomIDs(%v)",
  187. err, roomIDs)
  188. return nil, err
  189. }
  190. shortIDMapping[shortID] = roomID
  191. }
  192. for roomID, index := range agnosticRoomIds {
  193. xid, ok := shortIDMapping[roomID]
  194. if ok {
  195. resp[index] = xid
  196. d.shortIDMapping.Put(fmt.Sprintf("%d", roomID), xid)
  197. } else {
  198. resp[index] = roomID
  199. d.shortIDMapping.Put(fmt.Sprintf("%d", roomID), roomID)
  200. }
  201. }
  202. return
  203. }
  204. func (d *Dao) dbDealWithStatus(ctx context.Context, data *v1pb.RoomData) (err error) {
  205. if data == nil {
  206. return
  207. }
  208. // 处理开播状态
  209. if data.LiveStartTime > 0 {
  210. // 如果开播
  211. data.LiveStatus = LIVE_OPEN
  212. } else if data.AnchorRoundSwitch == 1 {
  213. // 如果轮播
  214. data.LiveStatus = LIVE_ROUND
  215. } else {
  216. data.LiveStatus = LIVE_CLOSE
  217. }
  218. // 处理隐藏状态
  219. if data.HiddenTime > time.Now().Unix() {
  220. data.HiddenStatus = 1
  221. }
  222. // 获取分区名称
  223. areaInfo := &model.AreaInfo{}
  224. if data.AreaId > 0 {
  225. areaInfo, err = d.dbFetchAreaInfo(ctx, data.AreaId)
  226. if err != nil {
  227. log.Error("[dao.dao-anchor.mysql|dbDealWithStatus] fetch area info error(%v), data(%v), areaid(%v)", err, data, data.AreaId)
  228. return
  229. }
  230. data.AreaName = areaInfo.AreaName
  231. }
  232. if areaInfo.ParentAreaID > 0 {
  233. areaInfo, err = d.dbFetchAreaInfo(ctx, areaInfo.ParentAreaID)
  234. if err != nil {
  235. log.Error("[dao.dao-anchor.mysql|dbDealWithStatus] fetch area info error(%v), data(%v), parent areaid(%v)", err, data, areaInfo.ParentAreaID)
  236. return
  237. }
  238. data.ParentAreaName = areaInfo.AreaName
  239. }
  240. return
  241. }
  242. // dbFetchRoomIDByUID implementation
  243. // dbFetchRoomIDByUID 查询主播房间号
  244. func (d *Dao) dbFetchRoomIDByUID(ctx context.Context, uid int64) (roomID int64) {
  245. uids := []int64{uid}
  246. res := make(map[int64]*v1pb.RoomData)
  247. err := d.dbFetchAnchorInfo(ctx, uids, res, false)
  248. if err != nil {
  249. log.Error("[dao.dao-anchor.mysql|dbFetchRoomIDByUID] get room ID error(%v), uid(%v)", err, uid)
  250. return
  251. }
  252. if len(res) <= 1 {
  253. return
  254. }
  255. for _, v := range res {
  256. if v.Uid == uid {
  257. roomID = v.RoomId
  258. }
  259. }
  260. return
  261. }
  262. // dbFetchAreaInfo implementation
  263. // dbFetchAreaInfo 查询分区信息
  264. func (d *Dao) dbFetchAreaInfo(ctx context.Context, areaID int64) (info *model.AreaInfo, err error) {
  265. if areaID <= 0 {
  266. return
  267. }
  268. key := fmt.Sprintf("%d", areaID)
  269. res, ok := d.areaInfoMapping.Get(key)
  270. if ok {
  271. return res.(*model.AreaInfo), nil
  272. }
  273. info = &model.AreaInfo{}
  274. sql := fmt.Sprintf(_queryAreaInfo, _subAreaTable)
  275. err = d.dbLiveApp.QueryRow(ctx, sql, areaID).Scan(&info.AreaID, &info.AreaName, &info.ParentAreaID)
  276. if err == nil {
  277. d.areaInfoMapping.Put(key, info)
  278. } else {
  279. // 尝试从配置中心拿到一级分区名称
  280. if confInfo, ok := d.c.FirstAreas[key]; ok {
  281. info.AreaID = areaID
  282. info.AreaName = confInfo.Name
  283. info.ParentAreaID = 0
  284. err = nil
  285. }
  286. }
  287. return
  288. }
  289. func (d *Dao) dbFetchAnchorInfo(ctx context.Context, uids []int64, resp map[int64]*v1pb.RoomData, overwrite bool) (err error) {
  290. if len(uids) <= 0 {
  291. return
  292. }
  293. valueList := ""
  294. for i, id := range uids {
  295. if i > 0 {
  296. valueList += ","
  297. }
  298. valueList += fmt.Sprintf("%d", id)
  299. }
  300. sql := fmt.Sprintf(_queryAnchorInfo, _anchorTable, valueList)
  301. rows, err := d.db.Query(ctx, sql)
  302. if err != nil {
  303. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] get anchor record error(%v), uids(%v)", err, uids)
  304. return err
  305. }
  306. defer rows.Close()
  307. for rows.Next() {
  308. data := &v1pb.RoomData{
  309. AnchorLevel: new(v1pb.AnchorLevel),
  310. }
  311. var exp int64
  312. err = rows.Scan(&data.RoomId, &data.AnchorSan, &data.AnchorProfileType, &data.AnchorRoundStatus, &data.AnchorRecordStatus, &exp)
  313. if err != nil {
  314. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] scan anchor record error(%v), uids(%v)", err, uids)
  315. return err
  316. }
  317. data.AnchorLevel.Score = exp / EXP_2_SCORE_RATE
  318. data.AnchorLevel.Level, err = model.GetAnchorLevel(data.AnchorLevel.Score)
  319. if err != nil {
  320. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] Failed to get anchor level (%v), level(%d)", err, data.AnchorLevel.Score)
  321. return err
  322. }
  323. data.AnchorLevel.Left, data.AnchorLevel.Right, err = model.GetLevelScoreInfo(data.AnchorLevel.Level)
  324. if err != nil {
  325. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] Failed to get anchor level score (%v), level(%d)", err, data.AnchorLevel.Level)
  326. return err
  327. }
  328. data.AnchorLevel.Color, err = model.GetAnchorLevelColor(data.AnchorLevel.Level)
  329. if err != nil {
  330. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] Failed to get anchor level color (%v), level(%d)", err, data.AnchorLevel.Level)
  331. return err
  332. }
  333. data.AnchorLevel.MaxLevel = model.MaxAnchorLevel
  334. if overwrite {
  335. d := resp[data.RoomId]
  336. d.AnchorSan = data.AnchorSan
  337. d.AnchorProfileType = data.AnchorProfileType
  338. d.AnchorRoundStatus = data.AnchorRoundStatus
  339. d.AnchorRecordStatus = data.AnchorRecordStatus
  340. d.AnchorLevel = data.AnchorLevel
  341. } else {
  342. resp[data.RoomId] = data
  343. }
  344. }
  345. return
  346. }
  347. func (d *Dao) dbFetchRoomInfo(ctx context.Context, roomIDs []int64, resp map[int64]*v1pb.RoomData, overwrite bool) (err error) {
  348. if len(roomIDs) <= 0 {
  349. return
  350. }
  351. valueList := ""
  352. for i, id := range roomIDs {
  353. if i > 0 {
  354. valueList += ","
  355. }
  356. valueList += fmt.Sprintf("%d", id)
  357. }
  358. sql := fmt.Sprintf(_queryRoomInfo, valueList)
  359. rows, err := d.db.Query(ctx, sql)
  360. if err != nil {
  361. log.Error("[dao.dao-anchor.mysql|dbFetchRoomInfo] get room record error(%v), roomIDs(%v)", err, roomIDs)
  362. return err
  363. }
  364. defer rows.Close()
  365. for rows.Next() {
  366. data := &v1pb.RoomData{}
  367. err = rows.Scan(&data.RoomId, &data.Uid, &data.Title, &data.Description, &data.Tags, &data.Background, &data.Cover, &data.LockStatus, &data.LockTime, &data.HiddenTime, &data.AnchorRecordSwitch, &data.AnchorRoundSwitch, &data.LiveStartTime, &data.LiveScreenType, &data.AreaId, &data.ParentAreaId, &data.LiveType)
  368. if err != nil {
  369. log.Error("[dao.dao-anchor.mysql|dbFetchRoomInfo] scan room record error(%v), roomIDs(%v)", err, roomIDs)
  370. return err
  371. }
  372. if overwrite {
  373. d := resp[data.RoomId]
  374. d.Uid = data.Uid
  375. d.Title = data.Title
  376. d.Description = data.Description
  377. d.Tags = data.Tags
  378. d.Background = data.Background
  379. d.Cover = data.Cover
  380. d.LockStatus = data.LockStatus
  381. d.LockTime = data.LockTime
  382. d.HiddenTime = data.HiddenTime
  383. d.AnchorRecordSwitch = data.AnchorRecordSwitch
  384. d.AnchorRoundSwitch = data.AnchorRoundSwitch
  385. d.LiveStartTime = data.LiveStartTime
  386. d.LiveScreenType = data.LiveScreenType
  387. d.AreaId = data.AreaId
  388. d.ParentAreaId = data.ParentAreaId
  389. d.LiveType = data.LiveType
  390. } else {
  391. resp[data.RoomId] = data
  392. }
  393. d.dbDealWithStatus(ctx, resp[data.RoomId])
  394. }
  395. return
  396. }
  397. func (d *Dao) dbFetchTagInfo(ctx context.Context, roomIDs []int64, resp map[int64]*v1pb.RoomData) (err error) {
  398. if len(roomIDs) <= 0 {
  399. return
  400. }
  401. valueList := ""
  402. for i, id := range roomIDs {
  403. if i > 0 {
  404. valueList += ","
  405. }
  406. valueList += fmt.Sprintf("%d", id)
  407. }
  408. sql := fmt.Sprintf(_queryTagInfo, _tagTable, valueList)
  409. rows, err := d.db.Query(ctx, sql, time.Now().Unix())
  410. if err != nil {
  411. log.Error("[dao.dao-anchor.mysql|dbFetchTagInfo] get room record error(%v), roomIDs(%v)", err, roomIDs)
  412. return err
  413. }
  414. defer rows.Close()
  415. for rows.Next() {
  416. var roomID int64
  417. data := &v1pb.TagData{}
  418. err = rows.Scan(&roomID, &data.TagId, &data.TagSubId, &data.TagValue, &data.TagExt, &data.TagExpireAt)
  419. if err != nil {
  420. log.Error("[dao.dao-anchor.mysql|dbFetchTagInfo] scan room record error(%v), roomIDs(%v)", err, roomIDs)
  421. return err
  422. }
  423. if resp[roomID] == nil {
  424. resp[roomID] = &v1pb.RoomData{}
  425. }
  426. if resp[roomID].TagList == nil {
  427. resp[roomID].TagList = make([]*v1pb.TagData, 0)
  428. }
  429. resp[roomID].TagList = append(resp[roomID].TagList, data)
  430. }
  431. return
  432. }
  433. func (d *Dao) dbFetchExtInfo(ctx context.Context, roomIDs []int64, resp map[int64]*v1pb.RoomData) (err error) {
  434. if len(roomIDs) <= 0 {
  435. return
  436. }
  437. sharding := make(map[int64][]int64)
  438. for i := 0; i < 10; i++ {
  439. sharding[int64(i)] = make([]int64, 0, len(roomIDs))
  440. }
  441. for _, id := range roomIDs {
  442. sharding[id%ROOM_EXT_SHARDING] = append(sharding[id%ROOM_EXT_SHARDING], id)
  443. }
  444. for shard, ids := range sharding {
  445. if len(ids) <= 0 {
  446. continue
  447. }
  448. valueList := ""
  449. for i, id := range ids {
  450. if i > 0 {
  451. valueList += ","
  452. }
  453. valueList += fmt.Sprintf("%d", id)
  454. }
  455. sql := fmt.Sprintf(_queryRoomExtInfo, _roomExtTablePrefix, shard, valueList)
  456. rows, err := d.db.Query(ctx, sql)
  457. if err != nil {
  458. log.Error("[dao.dao-anchor.mysql|dbFetchExtInfo] get room ext record error(%v), roomIDs(%v)", err, ids)
  459. return err
  460. }
  461. defer rows.Close()
  462. for rows.Next() {
  463. var roomID int64
  464. var keyframe string
  465. var popularityCount int64
  466. err = rows.Scan(&roomID, &keyframe, &popularityCount)
  467. if err != nil {
  468. log.Error("[dao.dao-anchor.mysql|dbFetchExtInfo] scan room ext record error(%v), roomIDs(%v)", err, ids)
  469. return err
  470. }
  471. resp[roomID].Keyframe = keyframe
  472. resp[roomID].PopularityCount = popularityCount
  473. }
  474. }
  475. return
  476. }
  477. // dbFetchRoomByIDs implementation
  478. // dbFetchRoomByIDs 查询房间信息
  479. func (d *Dao) dbFetchRoomByIDs(ctx context.Context, req *v1pb.RoomByIDsReq) (resp *v1pb.RoomByIDsResp, err error) {
  480. resp = &v1pb.RoomByIDsResp{
  481. RoomDataSet: make(map[int64]*v1pb.RoomData),
  482. }
  483. roomIDs := make([]int64, 0, len(req.Uids))
  484. if len(req.RoomIds) > 0 {
  485. for _, id := range req.RoomIds {
  486. roomIDs = append(roomIDs, id)
  487. }
  488. // 先查room表
  489. err = d.dbFetchRoomInfo(ctx, roomIDs, resp.RoomDataSet, false)
  490. if err != nil {
  491. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] get room record error(%v), req(%v)", err, req)
  492. return nil, err
  493. }
  494. uids := make([]int64, 0, len(req.RoomIds))
  495. for _, v := range resp.RoomDataSet {
  496. uids = append(uids, v.Uid)
  497. }
  498. err = d.dbFetchAnchorInfo(ctx, uids, resp.RoomDataSet, true)
  499. if err != nil {
  500. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] get anchor record error(%v), req(%v)", err, req)
  501. return nil, err
  502. }
  503. } else if len(req.Uids) > 0 {
  504. // 先查anchor表
  505. uids := make([]int64, 0, len(req.Uids))
  506. for _, id := range req.Uids {
  507. uids = append(uids, id)
  508. }
  509. err = d.dbFetchAnchorInfo(ctx, uids, resp.RoomDataSet, false)
  510. if err != nil {
  511. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] get anchor record error(%v), req(%v)", err, req)
  512. return nil, err
  513. }
  514. for _, v := range resp.RoomDataSet {
  515. roomIDs = append(roomIDs, v.RoomId)
  516. }
  517. err = d.dbFetchRoomInfo(ctx, roomIDs, resp.RoomDataSet, true)
  518. if err != nil {
  519. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] get room record error(%v), req(%v)", err, req)
  520. return nil, err
  521. }
  522. }
  523. // Ext Info
  524. err = d.dbFetchExtInfo(ctx, roomIDs, resp.RoomDataSet)
  525. if err != nil {
  526. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] get room ext record error(%v), req(%v)", err, req)
  527. return nil, err
  528. }
  529. // Tag List
  530. err = d.dbFetchTagInfo(ctx, roomIDs, resp.RoomDataSet)
  531. if err != nil {
  532. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] get tag record error(%v), req(%v)", err, req)
  533. return nil, err
  534. }
  535. // TODO: @wangyao需要处理short_id
  536. return
  537. }
  538. // dbOnlineListByArea implementation
  539. // dbOnlineListByArea 分区在线房间列表
  540. func (d *Dao) dbOnlineListByArea(ctx context.Context, areaId int64) (roomIDs []int64, err error) {
  541. cond := ""
  542. if areaId > 0 {
  543. cond = fmt.Sprintf(_queryOnlineRoomByAreaCond, areaId, areaId)
  544. }
  545. sql := fmt.Sprintf(_queryOnlineRoomByAreaInfo, cond)
  546. rows, err := d.db.Query(ctx, sql)
  547. if err != nil {
  548. log.Error("[dao.dao-anchor.mysql|dbOnlineListByArea] get online room list by area error(%v), areaId(%v)", err, areaId)
  549. return nil, err
  550. }
  551. defer rows.Close()
  552. roomIDs = make([]int64, 0)
  553. for rows.Next() {
  554. var roomid int64
  555. err = rows.Scan(&roomid)
  556. if err != nil {
  557. log.Error("[dao.dao-anchor.mysql|dbOnlineListByArea] scan online room list by area error(%v), areaId(%v)", err, areaId)
  558. return nil, err
  559. }
  560. roomIDs = append(roomIDs, roomid)
  561. }
  562. return
  563. }
  564. // roomCreate implementation
  565. // roomCreate 房间创建
  566. func (d *Dao) roomCreate(ctx context.Context, req *v1pb.RoomCreateReq) (resp *v1pb.RoomCreateResp, err error) {
  567. resp = &v1pb.RoomCreateResp{}
  568. tx, err := d.db.Begin(ctx)
  569. if err != nil {
  570. err = errors.WithStack(err)
  571. return resp, err
  572. }
  573. if req.RoomId != 0 {
  574. resp.RoomId = req.RoomId
  575. sql := fmt.Sprintf(_addRoomInfo2, _roomTable)
  576. _, err = tx.Exec(sql, req.Uid, req.RoomId)
  577. if err != nil {
  578. if e := tx.Rollback(); e != nil {
  579. log.Error("[dao.dao-anchor.mysql|roomCreate] create room record rollback error(%v), req(%v)", e, req)
  580. }
  581. // unique key exists error
  582. log.Error("[dao.dao-anchor.mysql|roomCreate] create room record error(%v), req(%v)", err, req)
  583. return resp, err
  584. }
  585. } else {
  586. sql := fmt.Sprintf(_addRoomInfo1, _roomTable)
  587. res, err := tx.Exec(sql, req.Uid)
  588. if err != nil {
  589. if e := tx.Rollback(); e != nil {
  590. log.Error("[dao.dao-anchor.mysql|roomCreate] create room record rollback error(%v), req(%v)", e, req)
  591. }
  592. // unique key exists error
  593. log.Error("[dao.dao-anchor.mysql|roomCreate] create room record error(%v), req(%v)", err, req)
  594. return resp, err
  595. }
  596. if resp.RoomId, err = res.LastInsertId(); err != nil {
  597. err = errors.WithStack(err)
  598. log.Error("[dao.dao-anchor.mysql|AddGuard] get last insert id error(%v), req(%v)", err, req)
  599. }
  600. }
  601. sql := fmt.Sprintf(_addRoomExtInfo, _roomExtTablePrefix, resp.RoomId%ROOM_EXT_SHARDING)
  602. if _, err = tx.Exec(sql, resp.RoomId); err != nil {
  603. if e := tx.Rollback(); e != nil {
  604. log.Error("[dao.dao-anchor.mysql|roomCreate] create room extend record rollback error(%v), req(%v)", e, req)
  605. }
  606. log.Error("[dao.dao-anchor.mysql|roomCreate] create room extend record error(%v), req(%v)", err, req)
  607. return resp, err
  608. }
  609. sql = fmt.Sprintf(_addAnchorInfo, _anchorTable)
  610. if _, err = tx.Exec(sql, req.Uid, resp.RoomId); err != nil {
  611. if e := tx.Rollback(); e != nil {
  612. log.Error("[dao.dao-anchor.mysql|roomCreate] create anchor record rollback error(%v), req(%v)", e, req)
  613. }
  614. log.Error("[dao.dao-anchor.mysql|roomCreate] create anchor record error(%v), req(%v)", err, req)
  615. return resp, err
  616. }
  617. if err = tx.Commit(); err != nil {
  618. log.Error("[dao.dao-anchor.mysql|roomCreate] commit error(%v), req(%v)", err, req)
  619. return resp, err
  620. }
  621. return
  622. }
  623. // roomUpdate implementation
  624. // roomUpdate 房间信息更新
  625. func (d *Dao) roomUpdate(ctx context.Context, req *v1pb.RoomUpdateReq) (resp *v1pb.UpdateResp, err error) {
  626. updateSub := ""
  627. args := make([]interface{}, len(req.Fields)+1)
  628. args[len(req.Fields)] = req.RoomId
  629. for i, f := range req.Fields {
  630. switch f {
  631. case "title":
  632. args[i] = req.Title
  633. case "cover":
  634. args[i] = req.Cover
  635. case "tags":
  636. args[i] = req.Tags
  637. case "background":
  638. args[i] = req.Background
  639. case "description":
  640. args[i] = req.Description
  641. case "live_start_time":
  642. args[i] = req.LiveStartTime
  643. if req.LiveStartTime > 0 {
  644. if i > 0 {
  645. updateSub += ","
  646. }
  647. updateSub += "`live_mark`=1"
  648. if i == 0 {
  649. updateSub += ","
  650. }
  651. }
  652. case "live_screen_type":
  653. args[i] = req.LiveScreenType
  654. case "live_type":
  655. args[i] = req.LiveType
  656. case "lock_status":
  657. args[i] = req.LockStatus
  658. case "lock_time":
  659. args[i] = req.LockTime
  660. case "hidden_time":
  661. args[i] = req.HiddenTime
  662. case "area_id":
  663. f = "live_area_id"
  664. args[i] = req.AreaId
  665. areaInfo := &model.AreaInfo{}
  666. // 审核后台会设置成无分区
  667. if req.AreaId > 0 {
  668. areaInfo, err = d.dbFetchAreaInfo(ctx, req.AreaId)
  669. if err != nil {
  670. log.Error("[dao.dao-anchor.mysql|roomUpdate] fetch area info error(%v), req(%v), areaid(%v)", err, req, req.AreaId)
  671. return
  672. }
  673. }
  674. if i > 0 {
  675. updateSub += ","
  676. }
  677. updateSub += fmt.Sprintf("`live_area_parent_id`=%d", areaInfo.ParentAreaID)
  678. if i == 0 {
  679. updateSub += ","
  680. }
  681. case "anchor_round_switch":
  682. f = "round_switch"
  683. args[i] = req.AnchorRoundSwitch
  684. case "anchor_record_switch":
  685. f = "record_switch"
  686. args[i] = req.AnchorRecordSwitch
  687. default:
  688. log.Error("[dao.dao-anchor.mysql|roomUpdate] unsupported field(%v), req(%v)", f, req)
  689. err = ecode.InvalidParam
  690. return
  691. }
  692. if i > 0 {
  693. updateSub += ","
  694. }
  695. updateSub += fmt.Sprintf("`%s`=?", f)
  696. }
  697. resp = &v1pb.UpdateResp{}
  698. sql := fmt.Sprintf(_updateRoomInfo, _roomTable, updateSub)
  699. res, err := d.db.Exec(ctx, sql, args...)
  700. if err != nil {
  701. log.Error("[dao.dao-anchor.mysql|roomUpdate] update room record error(%v), req(%v)", err, req)
  702. return
  703. }
  704. resp.AffectedRows, err = res.RowsAffected()
  705. return
  706. }
  707. // roomExtendUpdate implementation
  708. // roomExtendUpdate 房间扩展信息更新
  709. func (d *Dao) roomExtendUpdate(ctx context.Context, req *v1pb.RoomExtendUpdateReq) (resp *v1pb.UpdateResp, err error) {
  710. if len(req.Fields) <= 0 {
  711. log.Error("[dao.dao-anchor.mysql|roomExtendUpdate] no fields, req(%v)", req)
  712. err = ecode.InvalidParam
  713. return
  714. }
  715. updateSub := ""
  716. args := make([]interface{}, len(req.Fields)+1)
  717. args[len(req.Fields)] = req.RoomId
  718. for i, f := range req.Fields {
  719. switch f {
  720. case "keyframe":
  721. args[i] = req.Keyframe
  722. case "danmu_count":
  723. args[i] = req.DanmuCount
  724. case "popularity_count":
  725. args[i] = req.PopularityCount
  726. case "audience_count":
  727. args[i] = req.AudienceCount
  728. case "gift_count":
  729. args[i] = req.GiftCount
  730. case "gift_gold_amount":
  731. args[i] = req.GiftGoldAmount
  732. case "gift_gold_count":
  733. args[i] = req.GiftGoldCount
  734. default:
  735. log.Error("[dao.dao-anchor.mysql|roomExtendUpdate] unsupported field(%v), req(%v)", f, req)
  736. err = ecode.InvalidParam
  737. return
  738. }
  739. if i > 0 {
  740. updateSub += ","
  741. }
  742. updateSub += fmt.Sprintf("`%s`=?", f)
  743. }
  744. resp = &v1pb.UpdateResp{}
  745. sql := fmt.Sprintf(_updateRoomExtInfo, _roomExtTablePrefix, req.RoomId%10, updateSub)
  746. res, err := d.db.Exec(ctx, sql, args...)
  747. if err != nil {
  748. log.Error("[dao.dao-anchor.mysql|roomExtendUpdate] update room extend record error(%v), req(%v)", err, req)
  749. return
  750. }
  751. resp.AffectedRows, err = res.RowsAffected()
  752. return
  753. }
  754. // roomExtendIncre implementation
  755. // roomExtendIncre 房间扩展信息增量更新
  756. func (d *Dao) roomExtendIncre(ctx context.Context, req *v1pb.RoomExtendIncreReq) (resp *v1pb.UpdateResp, err error) {
  757. if len(req.Fields) <= 0 {
  758. log.Error("[dao.dao-anchor.mysql|roomExtendIncre] no fields, req(%v)", req)
  759. err = ecode.InvalidParam
  760. return
  761. }
  762. // TODO: req_id
  763. updateSub := ""
  764. args := make([]interface{}, len(req.Fields)+1)
  765. args[len(req.Fields)] = req.RoomId
  766. for i, f := range req.Fields {
  767. switch f {
  768. case "danmu_count":
  769. args[i] = req.DanmuCount
  770. case "popularity_count":
  771. args[i] = req.PopularityCount
  772. case "audience_count":
  773. args[i] = req.AudienceCount
  774. case "gift_count":
  775. args[i] = req.GiftCount
  776. case "gift_gold_amount":
  777. args[i] = req.GiftGoldAmount
  778. case "gift_gold_count":
  779. args[i] = req.GiftGoldCount
  780. default:
  781. log.Error("[dao.dao-anchor.mysql|roomExtendIncre] unsupported field(%v), req(%v)", f, req)
  782. err = ecode.InvalidParam
  783. return
  784. }
  785. if i > 0 {
  786. updateSub += ","
  787. }
  788. updateSub += fmt.Sprintf("`%s`=`%s`+(?)", f, f)
  789. }
  790. resp = &v1pb.UpdateResp{}
  791. sql := fmt.Sprintf(_updateRoomExtInfo, _roomExtTablePrefix, req.RoomId%10, updateSub)
  792. res, err := d.db.Exec(ctx, sql, args...)
  793. if err != nil {
  794. log.Error("[dao.dao-anchor.mysql|roomExtendIncre] update room extend increment record error(%v), req(%v)", err, req)
  795. return
  796. }
  797. resp.AffectedRows, err = res.RowsAffected()
  798. return
  799. }
  800. // roomTagCreate implementation
  801. // roomTagCreate 房间Tag创建
  802. func (d *Dao) roomTagCreate(ctx context.Context, req *v1pb.RoomTagCreateReq) (resp *v1pb.UpdateResp, err error) {
  803. resp = &v1pb.UpdateResp{}
  804. sql := fmt.Sprintf(_tagCreateInfo, _tagTable)
  805. _, err = d.db.Exec(ctx, sql, req.RoomId, req.TagId, req.TagSubId, req.TagValue, req.TagExt, req.TagExpireAt, req.TagValue, req.TagExt, req.TagExpireAt)
  806. if err != nil {
  807. log.Error("[dao.dao-anchor.mysql|roomTagCreate] create room tag error(%v), req(%v)", err, req)
  808. return
  809. }
  810. resp.AffectedRows = 1
  811. return
  812. }
  813. // roomAttrCreate implementation
  814. // roomAttrCreate 房间Attr创建
  815. func (d *Dao) roomAttrCreate(ctx context.Context, req *v1pb.RoomAttrCreateReq) (resp *v1pb.UpdateResp, err error) {
  816. resp = &v1pb.UpdateResp{}
  817. sql := fmt.Sprintf(_attrCreateInfo, _attrTable)
  818. _, err = d.db.Exec(ctx, sql, req.RoomId, req.AttrId, req.AttrSubId, req.AttrValue, req.AttrExt, req.AttrValue, req.AttrExt)
  819. if err != nil {
  820. log.Error("[dao.dao-anchor.mysql|roomAttrCreate] create room attr error(%v), req(%v)", err, req)
  821. return
  822. }
  823. resp.AffectedRows = 1
  824. return
  825. }
  826. // roomAttrSetEx implementation
  827. // roomAttrSetEx 房间Attr更新/插入
  828. func (d *Dao) roomAttrSetEx(ctx context.Context, req *v1pb.RoomAttrSetExReq) (resp *v1pb.UpdateResp, err error) {
  829. needSet, err := RoomAttrNeedSet(d, ctx, req)
  830. if err != nil {
  831. log.Error("[dao.dao-anchor.mysql|roomAttrSetEx] set room attr error(%v), req(%v)", err, req)
  832. return
  833. }
  834. if needSet <= 0 {
  835. return
  836. }
  837. if needSet == NEED_INSERT {
  838. reqCreate := &v1pb.RoomAttrCreateReq{}
  839. reqCreate.RoomId = req.RoomId
  840. reqCreate.AttrId = req.AttrId
  841. reqCreate.AttrSubId = req.AttrSubId
  842. reqCreate.AttrValue = req.AttrValue
  843. reqCreate.AttrExt = req.AttrExt
  844. resp, err = d.roomAttrCreate(ctx, reqCreate)
  845. } else {
  846. if req.AttrId == ATTRID_RANK_LIST {
  847. resp, err = d.roomAttrSetRoomId(ctx, req)
  848. } else {
  849. resp, err = d.roomAttrSetValue(ctx, req)
  850. }
  851. }
  852. return
  853. }
  854. const (
  855. NEED_INSERT = 1
  856. NEED_UPDATE = 2
  857. )
  858. // roomAttrNeedSet 内部函数 0 不需要set 1 需要insert 2 需要update
  859. func RoomAttrNeedSet(d *Dao, ctx context.Context, req *v1pb.RoomAttrSetExReq) (resp int, err error) {
  860. attrId := req.AttrId
  861. //排行榜设计:更新roomID
  862. resp = 0
  863. if attrId == ATTRID_RANK_LIST {
  864. roomID := 0
  865. sql := fmt.Sprintf(_attrSelectRoomId, _attrTable)
  866. err = d.db.QueryRow(ctx, sql, req.AttrId, req.AttrSubId, req.AttrValue).Scan(&roomID)
  867. if err != nil && err.Error() == "sql: no rows in result set" {
  868. err = nil
  869. resp = NEED_INSERT
  870. return
  871. }
  872. if err != nil {
  873. log.Error("[dao.dao-anchor.mysql|RoomAttrNeedSet] set room attr_rank error(%v), req(%v)", err, req)
  874. return
  875. }
  876. if int64(roomID) != req.RoomId {
  877. resp = NEED_UPDATE
  878. }
  879. } else {
  880. value := 0
  881. sql := fmt.Sprintf(_attrSelectValue, _attrTable)
  882. err = d.db.QueryRow(ctx, sql, req.AttrId, req.AttrSubId, req.AttrValue).Scan(value)
  883. if err != nil && err.Error() == "sql: no rows in result set" {
  884. err = nil
  885. resp = NEED_INSERT
  886. return
  887. }
  888. if err != nil {
  889. log.Error("[dao.dao-anchor.mysql|RoomAttrNeedSet] set room attr error(%v), req(%v)", err, req)
  890. return
  891. }
  892. if int64(value) != req.AttrValue {
  893. resp = NEED_UPDATE
  894. }
  895. }
  896. return
  897. }
  898. //roomAttrSet 更新value
  899. func (d *Dao) roomAttrSetValue(ctx context.Context, req *v1pb.RoomAttrSetExReq) (resp *v1pb.UpdateResp, err error) {
  900. resp = &v1pb.UpdateResp{}
  901. sql := fmt.Sprintf(_attrSetValue, _attrTable)
  902. res, err := d.db.Exec(ctx, sql, req.AttrValue, req.RoomId, req.AttrId, req.AttrSubId)
  903. if err != nil {
  904. log.Error("[dao.dao-anchor.mysql|roomAttrSetEx] set room attr error(%v), req(%v)", err, req)
  905. return
  906. }
  907. resp.AffectedRows, err = res.RowsAffected()
  908. return
  909. }
  910. //roomAttrSetRoomId 更新roomID
  911. func (d *Dao) roomAttrSetRoomId(ctx context.Context, req *v1pb.RoomAttrSetExReq) (resp *v1pb.UpdateResp, err error) {
  912. resp = &v1pb.UpdateResp{}
  913. sql := fmt.Sprintf(_attrSetRoomId, _attrTable)
  914. res, err := d.db.Exec(ctx, sql, req.RoomId, req.AttrId, req.AttrSubId, req.AttrValue)
  915. if err != nil {
  916. log.Error("[dao.dao-anchor.mysql|roomAttrSetEx] set room attr error(%v), req(%v)", err, req)
  917. return
  918. }
  919. resp.AffectedRows, err = res.RowsAffected()
  920. return
  921. }
  922. // anchorUpdate implementation
  923. // anchorUpdate 主播信息更新
  924. func (d *Dao) anchorUpdate(ctx context.Context, req *v1pb.AnchorUpdateReq) (resp *v1pb.UpdateResp, err error) {
  925. updateSub := ""
  926. args := make([]interface{}, len(req.Fields)+1)
  927. args[len(req.Fields)] = req.Uid
  928. for i, f := range req.Fields {
  929. switch f {
  930. case "profile_type":
  931. args[i] = req.ProfileType
  932. case "san_score":
  933. args[i] = req.SanScore
  934. case "round_status":
  935. args[i] = req.RoundStatus
  936. case "record_status":
  937. args[i] = req.RecordStatus
  938. case "exp":
  939. args[i] = req.Exp
  940. default:
  941. log.Error("[dao.dao-anchor.mysql|anchorUpdate] unsupported field(%v), req(%v)", f, req)
  942. err = ecode.InvalidParam
  943. return
  944. }
  945. if i > 0 {
  946. updateSub += ","
  947. }
  948. updateSub += fmt.Sprintf("`%s`=?", f)
  949. }
  950. resp = &v1pb.UpdateResp{}
  951. sql := fmt.Sprintf(_updateAnchorInfo, _anchorTable, updateSub)
  952. res, err := d.db.Exec(ctx, sql, args...)
  953. if err != nil {
  954. log.Error("[dao.dao-anchor.mysql|anchorUpdate] update anchor record error(%v), req(%v)", err, req)
  955. return
  956. }
  957. resp.AffectedRows, err = res.RowsAffected()
  958. return
  959. }
  960. // anchorIncre implementation
  961. // anchorIncre 主播信息增量更新
  962. func (d *Dao) anchorIncre(ctx context.Context, req *v1pb.AnchorIncreReq) (resp *v1pb.UpdateResp, err error) {
  963. // TODO: req_id
  964. updateSub := ""
  965. args := make([]interface{}, len(req.Fields)+1)
  966. args[len(req.Fields)] = req.Uid
  967. for i, f := range req.Fields {
  968. switch f {
  969. case "san_score":
  970. args[i] = req.SanScore
  971. case "exp":
  972. args[i] = req.Exp
  973. default:
  974. log.Error("[dao.dao-anchor.mysql|anchorIncre] unsupported field(%v), req(%v)", f, req)
  975. err = ecode.InvalidParam
  976. return
  977. }
  978. if i > 0 {
  979. updateSub += ","
  980. }
  981. updateSub += fmt.Sprintf("`%s`=`%s`+(?)", f, f)
  982. }
  983. resp = &v1pb.UpdateResp{}
  984. sql := fmt.Sprintf(_updateAnchorInfo, _anchorTable, updateSub)
  985. res, err := d.db.Exec(ctx, sql, args...)
  986. if err != nil {
  987. log.Error("[dao.dao-anchor.mysql|anchorUpdate] update anchor increment record error(%v), req(%v)", err, req)
  988. return
  989. }
  990. resp.AffectedRows, err = res.RowsAffected()
  991. return
  992. }
  993. // fetchAreas implementation
  994. // fetchAreas 根据父分区号查询子分区
  995. // If the request area-id does not exist, the function returns with the `err` being set.
  996. func (d *Dao) fetchAreas(ctx context.Context, req *v1pb.FetchAreasReq) (resp *v1pb.FetchAreasResp, err error) {
  997. // Query parent area info first and fail fast in case the area doesn't exist.
  998. areaInfo, err := d.dbFetchAreaInfo(ctx, req.AreaId)
  999. if err != nil {
  1000. log.Error("[dao.dao-anchor.mysql|fetchAreas] fetch main area info error(%v), req area(%d)", err, req.AreaId)
  1001. return nil, err
  1002. }
  1003. resp = &v1pb.FetchAreasResp{
  1004. Info: &v1pb.AreaInfo{
  1005. AreaId: req.AreaId,
  1006. AreaName: areaInfo.AreaName,
  1007. },
  1008. }
  1009. sql := fmt.Sprintf(_querySubAreaInfo, _subAreaTable)
  1010. rows, err := d.dbLiveApp.Query(ctx, sql, req.AreaId)
  1011. if err != nil {
  1012. log.Error("[dao.dao-anchor.mysql|fetchAreas] fetch area records error(%v), req area(%d)", err, req.AreaId)
  1013. return nil, err
  1014. }
  1015. defer rows.Close()
  1016. for rows.Next() {
  1017. var subAreaID int64
  1018. var subAreaName string
  1019. if err = rows.Scan(&subAreaID, &subAreaName); err != nil {
  1020. log.Error("[dao.dao-anchor.mysql|fetchAreas] fetch subarea info error(%v), req area(%d)", err, req.AreaId)
  1021. return nil, err
  1022. }
  1023. resp.Areas = append(resp.Areas, &v1pb.AreaInfo{
  1024. AreaId: subAreaID,
  1025. AreaName: subAreaName,
  1026. })
  1027. }
  1028. return
  1029. }
  1030. // fetchAttrByIDs implementation
  1031. // fetchAttrByIDs 批量根据房间号查询指标
  1032. func (d *Dao) fetchAttrByIDs(ctx context.Context, req *v1pb.FetchAttrByIDsReq) (resp *v1pb.FetchAttrByIDsResp, err error) {
  1033. if len(req.RoomIds) <= 0 {
  1034. return
  1035. }
  1036. resp = &v1pb.FetchAttrByIDsResp{
  1037. Attrs: make(map[int64]*v1pb.AttrData),
  1038. }
  1039. valueList := ""
  1040. for i, id := range req.RoomIds {
  1041. if i > 0 {
  1042. valueList += ","
  1043. }
  1044. valueList += fmt.Sprintf("%d", id)
  1045. }
  1046. sql := fmt.Sprintf(_queryAttrInfo, _attrTable, valueList)
  1047. rows, err := d.db.Query(ctx, sql, req.AttrId, req.AttrSubId)
  1048. if err != nil {
  1049. log.Error("[dao.dao-anchor.mysql|fetchAttrByIDs] get attr record error(%v), req(%v)", err, req)
  1050. return nil, err
  1051. }
  1052. defer rows.Close()
  1053. for rows.Next() {
  1054. data := &v1pb.AttrData{
  1055. AttrId: req.AttrId,
  1056. AttrSubId: req.AttrSubId,
  1057. }
  1058. err = rows.Scan(&data.RoomId, &data.AttrValue)
  1059. if err != nil {
  1060. log.Error("[dao.dao-anchor.mysql|fetchAttrByIDs] scan attr record error(%v), req(%v)", err, req)
  1061. return nil, err
  1062. }
  1063. resp.Attrs[data.RoomId] = data
  1064. }
  1065. return
  1066. }
  1067. // deleteAttr implementation
  1068. // deleteAttr 删除一个指标
  1069. func (d *Dao) deleteAttr(ctx context.Context, req *v1pb.DeleteAttrReq) (resp *v1pb.UpdateResp, err error) {
  1070. sql := fmt.Sprintf(_deleteAttrInfo, _attrTable)
  1071. res, err := d.db.Exec(ctx, sql, req.AttrId, req.AttrSubId)
  1072. if err != nil {
  1073. log.Error("[dao.dao-anchor.mysql|roomExtendIncre] update room extend increment record error(%v), req(%v)", err, req)
  1074. return
  1075. }
  1076. resp = &v1pb.UpdateResp{}
  1077. resp.AffectedRows, err = res.RowsAffected()
  1078. return
  1079. }