mysql.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. v1pb "go-common/app/service/live/xanchor/api/grpc/v1"
  6. "go-common/library/ecode"
  7. "go-common/library/log"
  8. "github.com/pkg/errors"
  9. )
  10. const (
  11. _roomTable = "room"
  12. _roomExtTablePrefix = "room_extend"
  13. _anchorTable = "anchor"
  14. _tagTable = "tag"
  15. _areaTable = "ap_room_area_v2"
  16. // add room info
  17. _addRoomInfo1 = "insert into `%s` (`uid`) values (?)"
  18. // add room info
  19. _addRoomInfo2 = "insert into `%s` (`uid`,`room_id`) values (?,?)"
  20. // add room extend info
  21. _addRoomExtInfo = "insert into `%s_%d` (`room_id`) values (?)"
  22. // add anchor info
  23. _addAnchorInfo = "insert into `%s` (`uid`,`room_id`,`san_score`) values (?,?,12)"
  24. // update room info
  25. _updateRoomInfo = "update `%s` set %s where `room_id`=?"
  26. // update room extend info
  27. _updateRoomExtInfo = "update `%s_%d` set %s where `room_id`=?"
  28. // update anchor info
  29. _updateAnchorInfo = "update `%s` set %s where `uid`=?"
  30. // tag set info
  31. _tagSetInfo = "insert into `%s` set `target_type`=%d,%s on duplicate key update %s"
  32. // query room info
  33. _queryRoomInfo = "select `room`.`room_id`,`room`.`uid`,`room`.`title`,`room`.`description`,`room`.`tags`,`room`.`background`,`room`.`cover`,`room`.`lock_status`,`room`.`lock_time`,`room`.`hidden_time`,`room`.`record_switch`,`room`.`round_switch`,`room`.`live_start_time`,`room`.`live_screen_type`,`room`.`live_area_id`,`room`.`live_area_parent_id`,`room`.`live_type`,`anchor`.`san_score`,`anchor`.`profile_type`,`anchor`.`round_status`,`anchor`.`record_status`,`anchor`.`exp` from `room`,`anchor` where `room`.`uid`=`anchor`.`uid` and `%s` in (%s)"
  34. // query online room info
  35. _queryOnlineRoomInfo = "select `room`.`room_id`,`room`.`uid`,`room`.`title`,`room`.`description`,`room`.`tags`,`room`.`background`,`room`.`cover`,`room`.`lock_status`,`room`.`lock_time`,`room`.`hidden_time`,`room`.`record_switch`,`room`.`round_switch`,`room`.`live_start_time`,`room`.`live_screen_type`,`room`.`live_area_id`,`room`.`live_area_parent_id`,`room`.`live_type`,`anchor`.`san_score`,`anchor`.`profile_type`,`anchor`.`round_status`,`anchor`.`record_status`,`anchor`.`exp` from `room`,`anchor` where `room`.`uid`=`anchor`.`uid` and `live_start_time`!=0 order by id limit ?,?"
  36. // get parent area id
  37. _queryParentAreaID = "select `parent_id` from `%s` where `id`=?"
  38. )
  39. // fetchParentAreaID implementation
  40. // fetchParentAreaID 查询房间信息
  41. func (d *Dao) fetchParentAreaID(ctx context.Context, areaID int64) (parentAreaID int64, err error) {
  42. sql := fmt.Sprintf(_queryParentAreaID, _areaTable)
  43. err = d.dbLiveApp.QueryRow(ctx, sql, areaID).Scan(&parentAreaID)
  44. return
  45. }
  46. // fetchRoomByIDs implementation
  47. // fetchRoomByIDs 查询房间信息
  48. func (d *Dao) fetchRoomByIDs(ctx context.Context, req *v1pb.RoomByIDsReq) (resp *v1pb.RoomByIDsResp, err error) {
  49. condField := ""
  50. valueList := ""
  51. if len(req.RoomIds) > 0 {
  52. condField = "room`.`room_id"
  53. for i, id := range req.RoomIds {
  54. if i > 0 {
  55. valueList += ","
  56. }
  57. valueList += fmt.Sprintf("%d", id)
  58. }
  59. } else if len(req.Uids) > 0 {
  60. condField = "room`.`uid"
  61. for i, id := range req.Uids {
  62. if i > 0 {
  63. valueList += ","
  64. }
  65. valueList += fmt.Sprintf("%d", id)
  66. }
  67. }
  68. sql := fmt.Sprintf(_queryRoomInfo, condField, valueList)
  69. rows, err := d.db.Query(ctx, sql)
  70. if err != nil {
  71. log.Error("[dao.xanchor.mysql|fetchRoomByIDs] get room record error(%v), req(%v)", err, req)
  72. return nil, err
  73. }
  74. defer rows.Close()
  75. resp = &v1pb.RoomByIDsResp{}
  76. for rows.Next() {
  77. var data v1pb.RoomData
  78. err = rows.Scan(&data.RoomId, &data.Uid, &data.Title, &data.Description, &data.Tags, &data.Background, &data.Cover, &data.LockStatus, &data.LockTime, &data.HiddenTime, &data.AnchorRecordSwitch, &data.AnchorRoundSwitch, &data.LiveStartTime, &data.LiveScreenType, &data.AreaId, &data.ParentAreaId, &data.AnchorSan, &data.AnchorProfileType, &data.AnchorRoundStatus, &data.AnchorRecordStatus, &data.AnchorExp)
  79. // TODO area name
  80. if err != nil {
  81. log.Error("[dao.xanchor.mysql|fetchRoomByIDs] scan room record error(%v), req(%v)", err, req)
  82. return nil, err
  83. }
  84. resp.RoomDataSet[data.RoomId] = &data
  85. // TODO short-id
  86. }
  87. // TODO Tag List
  88. return
  89. }
  90. // roomOnlineList implementation
  91. // roomOnlineList 在线房间列表
  92. func (d *Dao) roomOnlineList(ctx context.Context, req *v1pb.RoomOnlineListReq) (resp *v1pb.RoomOnlineListResp, err error) {
  93. rows, err := d.db.Query(ctx, _queryOnlineRoomInfo, req.Page*req.PageSize, req.PageSize)
  94. if err != nil {
  95. log.Error("[dao.xanchor.mysql|roomOnlineList] get online room list error(%v), req(%v)", err, req)
  96. return nil, err
  97. }
  98. defer rows.Close()
  99. resp = &v1pb.RoomOnlineListResp{}
  100. for rows.Next() {
  101. var data v1pb.RoomData
  102. err = rows.Scan(&data.RoomId, &data.Uid, &data.Title, &data.Description, &data.Tags, &data.Background, &data.Cover, &data.LockStatus, &data.LockTime, &data.HiddenTime, &data.AnchorRecordSwitch, &data.AnchorRoundSwitch, &data.LiveStartTime, &data.LiveScreenType, &data.AreaId, &data.ParentAreaId, &data.AnchorSan, &data.AnchorProfileType, &data.AnchorRoundStatus, &data.AnchorRecordStatus, &data.AnchorExp)
  103. // TODO area name
  104. if err != nil {
  105. log.Error("[dao.xanchor.mysql|roomOnlineList] scan online room list error(%v), req(%v)", err, req)
  106. return nil, err
  107. }
  108. resp.RoomDataList[data.RoomId] = &data
  109. // TODO short-id
  110. }
  111. // TODO Tag List
  112. return
  113. }
  114. // roomCreate implementation
  115. // roomCreate 房间创建
  116. func (d *Dao) roomCreate(ctx context.Context, req *v1pb.RoomCreateReq) (resp *v1pb.RoomCreateResp, err error) {
  117. resp = &v1pb.RoomCreateResp{}
  118. tx, err := d.db.Begin(ctx)
  119. if err != nil {
  120. err = errors.WithStack(err)
  121. return resp, err
  122. }
  123. if req.RoomId != 0 {
  124. resp.RoomId = req.RoomId
  125. sql := fmt.Sprintf(_addRoomInfo2, _roomTable)
  126. _, err = tx.Exec(sql, req.Uid, req.RoomId)
  127. if err != nil {
  128. if e := tx.Rollback(); e != nil {
  129. log.Error("[dao.xanchor.mysql|roomCreate] create room record rollback error(%v), req(%v)", e, req)
  130. }
  131. // unique key exists error
  132. log.Error("[dao.xanchor.mysql|roomCreate] create room record error(%v), req(%v)", err, req)
  133. return resp, err
  134. }
  135. } else {
  136. sql := fmt.Sprintf(_addRoomInfo1, _roomTable)
  137. res, err := tx.Exec(sql, req.Uid)
  138. if err != nil {
  139. if e := tx.Rollback(); e != nil {
  140. log.Error("[dao.xanchor.mysql|roomCreate] create room record rollback error(%v), req(%v)", e, req)
  141. }
  142. // unique key exists error
  143. log.Error("[dao.xanchor.mysql|roomCreate] create room record error(%v), req(%v)", err, req)
  144. return resp, err
  145. }
  146. if resp.RoomId, err = res.LastInsertId(); err != nil {
  147. err = errors.WithStack(err)
  148. log.Error("[dao.xanchor.mysql|AddGuard] get last insert id error(%v), req(%v)", err, req)
  149. }
  150. }
  151. sql := fmt.Sprintf(_addRoomExtInfo, _roomExtTablePrefix, resp.RoomId%10)
  152. if _, err = tx.Exec(sql, resp.RoomId); err != nil {
  153. if e := tx.Rollback(); e != nil {
  154. log.Error("[dao.xanchor.mysql|roomCreate] create room extend record rollback error(%v), req(%v)", e, req)
  155. }
  156. log.Error("[dao.xanchor.mysql|roomCreate] create room extend record error(%v), req(%v)", err, req)
  157. return resp, err
  158. }
  159. sql = fmt.Sprintf(_addAnchorInfo, _anchorTable)
  160. if _, err = tx.Exec(sql, req.Uid, resp.RoomId); err != nil {
  161. if e := tx.Rollback(); e != nil {
  162. log.Error("[dao.xanchor.mysql|roomCreate] create anchor record rollback error(%v), req(%v)", e, req)
  163. }
  164. log.Error("[dao.xanchor.mysql|roomCreate] create anchor record error(%v), req(%v)", err, req)
  165. return resp, err
  166. }
  167. if err = tx.Commit(); err != nil {
  168. log.Error("[dao.xanchor.mysql|roomCreate] commit error(%v), req(%v)", err, req)
  169. return resp, err
  170. }
  171. return
  172. }
  173. // roomUpdate implementation
  174. // roomUpdate 房间信息更新
  175. func (d *Dao) roomUpdate(ctx context.Context, req *v1pb.RoomUpdateReq) (resp *v1pb.UpdateResp, err error) {
  176. updateSub := ""
  177. args := make([]interface{}, len(req.Fields)+1)
  178. args[len(req.Fields)] = req.RoomId
  179. for i, f := range req.Fields {
  180. switch f {
  181. case "title":
  182. args[i] = req.Title
  183. case "cover":
  184. args[i] = req.Cover
  185. case "tags":
  186. args[i] = req.Tags
  187. case "background":
  188. args[i] = req.Background
  189. case "description":
  190. args[i] = req.Description
  191. case "live_start_time":
  192. args[i] = req.LiveStartTime
  193. if req.LiveStartTime > 0 {
  194. if i > 0 {
  195. updateSub += ","
  196. }
  197. updateSub += "`live_mark`=1"
  198. if i == 0 {
  199. updateSub += ","
  200. }
  201. }
  202. case "live_screen_type":
  203. args[i] = req.LiveScreenType
  204. case "lock_status":
  205. args[i] = req.LockStatus
  206. case "lock_time":
  207. args[i] = req.LockTime
  208. case "hidden_time":
  209. args[i] = req.HiddenTime
  210. case "area_id":
  211. f = "live_area_id"
  212. args[i] = req.AreaId
  213. var parentAreaID int64
  214. parentAreaID, err = d.fetchParentAreaID(ctx, req.AreaId)
  215. if err != nil {
  216. log.Error("[dao.xanchor.mysql|roomUpdate] fetch parent area ID error(%v), req(%v)", err, req)
  217. err = ecode.InvalidParam
  218. return
  219. }
  220. if i > 0 {
  221. updateSub += ","
  222. }
  223. updateSub += fmt.Sprintf("`live_area_parent_id`=%d", parentAreaID)
  224. if i == 0 {
  225. updateSub += ","
  226. }
  227. case "anchor_round_switch":
  228. f = "round_switch"
  229. args[i] = req.AnchorRoundSwitch
  230. case "anchor_record_switch":
  231. f = "record_switch"
  232. args[i] = req.AnchorRecordSwitch
  233. default:
  234. log.Error("[dao.xanchor.mysql|roomUpdate] unsupported field(%v), req(%s)", f, req)
  235. err = ecode.InvalidParam
  236. return
  237. }
  238. if i > 0 {
  239. updateSub += ","
  240. }
  241. updateSub += fmt.Sprintf("`%s`=?", f)
  242. }
  243. resp = &v1pb.UpdateResp{}
  244. sql := fmt.Sprintf(_updateRoomInfo, _roomTable, updateSub)
  245. res, err := d.db.Exec(ctx, sql, args...)
  246. if err != nil {
  247. log.Error("[dao.xanchor.mysql|roomUpdate] update room record error(%v), req(%v)", err, req)
  248. return
  249. }
  250. resp.AffectedRows, err = res.RowsAffected()
  251. return
  252. }
  253. // roomBatchUpdate implementation
  254. // roomBatchUpdate 房间信息批量更新
  255. func (d *Dao) roomBatchUpdate(ctx context.Context, req *v1pb.RoomBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
  256. resp = &v1pb.UpdateResp{}
  257. for _, r := range req.Reqs {
  258. res, err := d.roomUpdate(ctx, r)
  259. if err != nil {
  260. log.Error("[dao.xanchor.mysql|roomBatchUpdate] update room record error(%v), req(%v)", err, r)
  261. return nil, err
  262. }
  263. resp.AffectedRows += res.AffectedRows
  264. }
  265. return
  266. }
  267. // roomExtendUpdate implementation
  268. // roomExtendUpdate 房间扩展信息更新
  269. func (d *Dao) roomExtendUpdate(ctx context.Context, req *v1pb.RoomExtendUpdateReq) (resp *v1pb.UpdateResp, err error) {
  270. updateSub := ""
  271. args := make([]interface{}, len(req.Fields)+1)
  272. args[len(req.Fields)] = req.RoomId
  273. for i, f := range req.Fields {
  274. switch f {
  275. case "key_frame":
  276. args[i] = req.KeyFrame
  277. case "danmu_count":
  278. args[i] = req.DanmuCount
  279. case "popularity_count":
  280. args[i] = req.PopularityCount
  281. case "audience_count":
  282. args[i] = req.AudienceCount
  283. case "gift_count":
  284. args[i] = req.GiftCount
  285. case "gift_gold_amount":
  286. args[i] = req.GiftGoldAmount
  287. case "gift_gold_count":
  288. args[i] = req.GiftGoldCount
  289. default:
  290. log.Error("[dao.xanchor.mysql|roomExtendUpdate] unsupported field(%v), req(%s)", f, req)
  291. err = ecode.InvalidParam
  292. return
  293. }
  294. if i > 0 {
  295. updateSub += ","
  296. }
  297. updateSub += fmt.Sprintf("`%s`=?", f)
  298. }
  299. resp = &v1pb.UpdateResp{}
  300. sql := fmt.Sprintf(_updateRoomExtInfo, _roomExtTablePrefix, req.RoomId%10, updateSub)
  301. res, err := d.db.Exec(ctx, sql, args...)
  302. if err != nil {
  303. log.Error("[dao.xanchor.mysql|roomExtendUpdate] update room extend record error(%v), req(%v)", err, req)
  304. return
  305. }
  306. resp.AffectedRows, err = res.RowsAffected()
  307. return
  308. }
  309. // roomExtendBatchUpdate implementation
  310. // roomExtendBatchUpdate 房间扩展信息批量更新
  311. func (d *Dao) roomExtendBatchUpdate(ctx context.Context, req *v1pb.RoomExtendBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
  312. resp = &v1pb.UpdateResp{}
  313. for _, r := range req.Reqs {
  314. res, err := d.roomExtendUpdate(ctx, r)
  315. if err != nil {
  316. log.Error("[dao.xanchor.mysql|roomExtendBatchUpdate] update room extend record error(%v), req(%v)", err, r)
  317. return nil, err
  318. }
  319. resp.AffectedRows += res.AffectedRows
  320. }
  321. return
  322. }
  323. // roomExtendIncre implementation
  324. // roomExtendIncre 房间扩展信息增量更新
  325. func (d *Dao) roomExtendIncre(ctx context.Context, req *v1pb.RoomExtendIncreReq) (resp *v1pb.UpdateResp, err error) {
  326. // TODO: req_id
  327. updateSub := ""
  328. args := make([]interface{}, len(req.Fields)+1)
  329. args[len(req.Fields)] = req.RoomId
  330. for i, f := range req.Fields {
  331. switch f {
  332. case "danmu_count":
  333. args[i] = req.DanmuCount
  334. case "popularity_count":
  335. args[i] = req.PopularityCount
  336. case "audience_count":
  337. args[i] = req.AudienceCount
  338. case "gift_count":
  339. args[i] = req.GiftCount
  340. case "gift_gold_amount":
  341. args[i] = req.GiftGoldAmount
  342. case "gift_gold_count":
  343. args[i] = req.GiftGoldCount
  344. default:
  345. log.Error("[dao.xanchor.mysql|roomExtendIncre] unsupported field(%v), req(%s)", f, req)
  346. err = ecode.InvalidParam
  347. return
  348. }
  349. if i > 0 {
  350. updateSub += ","
  351. }
  352. updateSub += fmt.Sprintf("`%s`=`%s`+(?)", f, f)
  353. }
  354. resp = &v1pb.UpdateResp{}
  355. sql := fmt.Sprintf(_updateRoomExtInfo, _roomExtTablePrefix, req.RoomId%10, updateSub)
  356. res, err := d.db.Exec(ctx, sql, args...)
  357. if err != nil {
  358. log.Error("[dao.xanchor.mysql|roomExtendIncre] update room extend increment record error(%v), req(%v)", err, req)
  359. return
  360. }
  361. resp.AffectedRows, err = res.RowsAffected()
  362. return
  363. }
  364. // roomExtendBatchIncre implementation
  365. // roomExtendBatchIncre 房间扩展信息批量更新
  366. func (d *Dao) roomExtendBatchIncre(ctx context.Context, req *v1pb.RoomExtendBatchIncreReq) (resp *v1pb.UpdateResp, err error) {
  367. resp = &v1pb.UpdateResp{}
  368. for _, r := range req.Reqs {
  369. res, err := d.roomExtendIncre(ctx, r)
  370. if err != nil {
  371. log.Error("[dao.xanchor.mysql|roomExtendBatchIncre] update room extend increment record error(%v), req(%v)", err, r)
  372. return nil, err
  373. }
  374. resp.AffectedRows += res.AffectedRows
  375. }
  376. return
  377. }
  378. // roomTagSet implementation
  379. // roomTagSet 房间Tag更新
  380. func (d *Dao) roomTagSet(ctx context.Context, req *v1pb.RoomTagSetReq) (resp *v1pb.UpdateResp, err error) {
  381. updateSub := ""
  382. args := make([]interface{}, len(req.Fields)*2+1)
  383. args[len(req.Fields)] = req.RoomId
  384. for i, f := range req.Fields {
  385. switch f {
  386. case "tag_value":
  387. args[i] = req.TagValue
  388. args[len(req.Fields)+i] = req.TagValue
  389. case "tag_attribute":
  390. args[i] = req.TagAttribute
  391. args[len(req.Fields)+i] = req.TagAttribute
  392. case "tag_expire_at":
  393. args[i] = req.TagExpireAt
  394. args[len(req.Fields)+i] = req.TagExpireAt
  395. default:
  396. log.Error("[dao.xanchor.mysql|roomTagSet] unsupported field(%v), req(%s)", f, req)
  397. err = ecode.InvalidParam
  398. return
  399. }
  400. if i > 0 {
  401. updateSub += ","
  402. }
  403. updateSub += fmt.Sprintf("`%s`=?", f)
  404. }
  405. resp = &v1pb.UpdateResp{}
  406. sql := fmt.Sprintf(_tagSetInfo, _tagTable, 2, updateSub, updateSub)
  407. res, err := d.db.Exec(ctx, sql, args...)
  408. if err != nil {
  409. log.Error("[dao.xanchor.mysql|roomTagSet] set room tag error(%v), req(%v)", err, req)
  410. return
  411. }
  412. resp.AffectedRows, err = res.RowsAffected()
  413. return
  414. }
  415. // anchorUpdate implementation
  416. // anchorUpdate 主播信息更新
  417. func (d *Dao) anchorUpdate(ctx context.Context, req *v1pb.AnchorUpdateReq) (resp *v1pb.UpdateResp, err error) {
  418. updateSub := ""
  419. args := make([]interface{}, len(req.Fields)+1)
  420. args[len(req.Fields)] = req.Uid
  421. for i, f := range req.Fields {
  422. switch f {
  423. case "profile_type":
  424. args[i] = req.ProfileType
  425. case "san_score":
  426. args[i] = req.SanScore
  427. case "round_status":
  428. args[i] = req.RoundStatus
  429. case "record_status":
  430. args[i] = req.RecordStatus
  431. case "exp":
  432. args[i] = req.Exp
  433. default:
  434. log.Error("[dao.xanchor.mysql|anchorUpdate] unsupported field(%v), req(%s)", f, req)
  435. err = ecode.InvalidParam
  436. return
  437. }
  438. if i > 0 {
  439. updateSub += ","
  440. }
  441. updateSub += fmt.Sprintf("`%s`=?", f)
  442. }
  443. resp = &v1pb.UpdateResp{}
  444. sql := fmt.Sprintf(_updateAnchorInfo, _anchorTable, updateSub)
  445. res, err := d.db.Exec(ctx, sql, args...)
  446. if err != nil {
  447. log.Error("[dao.xanchor.mysql|anchorUpdate] update anchor record error(%v), req(%v)", err, req)
  448. return
  449. }
  450. resp.AffectedRows, err = res.RowsAffected()
  451. return
  452. }
  453. // anchorBatchUpdate implementation
  454. // anchorBatchUpdate 主播信息批量更新
  455. func (d *Dao) anchorBatchUpdate(ctx context.Context, req *v1pb.AnchorBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
  456. resp = &v1pb.UpdateResp{}
  457. for _, r := range req.Reqs {
  458. res, err := d.anchorUpdate(ctx, r)
  459. if err != nil {
  460. log.Error("[dao.xanchor.mysql|anchorBatchUpdate] update anchor record error(%v), req(%v)", err, r)
  461. return nil, err
  462. }
  463. resp.AffectedRows += res.AffectedRows
  464. }
  465. return
  466. }
  467. // anchorIncre implementation
  468. // anchorIncre 主播信息增量更新
  469. func (d *Dao) anchorIncre(ctx context.Context, req *v1pb.AnchorIncreReq) (resp *v1pb.UpdateResp, err error) {
  470. // TODO: req_id
  471. updateSub := ""
  472. args := make([]interface{}, len(req.Fields)+1)
  473. args[len(req.Fields)] = req.Uid
  474. for i, f := range req.Fields {
  475. switch f {
  476. case "san_score":
  477. args[i] = req.SanScore
  478. case "exp":
  479. args[i] = req.Exp
  480. default:
  481. log.Error("[dao.xanchor.mysql|anchorIncre] unsupported field(%v), req(%s)", f, req)
  482. err = ecode.InvalidParam
  483. return
  484. }
  485. if i > 0 {
  486. updateSub += ","
  487. }
  488. updateSub += fmt.Sprintf("`%s`=`%s`+(?)", f, f)
  489. }
  490. resp = &v1pb.UpdateResp{}
  491. sql := fmt.Sprintf(_updateAnchorInfo, _anchorTable, updateSub)
  492. res, err := d.db.Exec(ctx, sql, args...)
  493. if err != nil {
  494. log.Error("[dao.xanchor.mysql|anchorUpdate] update anchor increment record error(%v), req(%v)", err, req)
  495. return
  496. }
  497. resp.AffectedRows, err = res.RowsAffected()
  498. return
  499. }
  500. // anchorBatchIncre implementation
  501. // anchorBatchIncre 主播信息批量增量更新
  502. func (d *Dao) anchorBatchIncre(ctx context.Context, req *v1pb.AnchorBatchIncreReq) (resp *v1pb.UpdateResp, err error) {
  503. resp = &v1pb.UpdateResp{}
  504. for _, r := range req.Reqs {
  505. res, err := d.anchorIncre(ctx, r)
  506. if err != nil {
  507. log.Error("[dao.xanchor.mysql|anchorBatchIncre] update anchor increment record error(%v), req(%v)", err, r)
  508. return nil, err
  509. }
  510. resp.AffectedRows += res.AffectedRows
  511. }
  512. return
  513. }
  514. // anchorTagSet implementation
  515. // anchorTagSet 主播Tag更新
  516. func (d *Dao) anchorTagSet(ctx context.Context, req *v1pb.AnchorTagSetReq) (resp *v1pb.UpdateResp, err error) {
  517. updateSub := ""
  518. args := make([]interface{}, len(req.Fields)*2+1)
  519. args[len(req.Fields)] = req.AnchorId
  520. for i, f := range req.Fields {
  521. switch f {
  522. case "tag_value":
  523. args[i] = req.TagValue
  524. args[len(req.Fields)+i] = req.TagValue
  525. case "tag_attribute":
  526. args[i] = req.TagAttribute
  527. args[len(req.Fields)+i] = req.TagAttribute
  528. case "tag_expire_at":
  529. args[i] = req.TagExpireAt
  530. args[len(req.Fields)+i] = req.TagExpireAt
  531. default:
  532. log.Error("[dao.xanchor.mysql|roomTagSet] unsupported field(%v), req(%s)", f, req)
  533. err = ecode.InvalidParam
  534. return
  535. }
  536. if i > 0 {
  537. updateSub += ","
  538. }
  539. updateSub += fmt.Sprintf("`%s`=?", f)
  540. }
  541. resp = &v1pb.UpdateResp{}
  542. sql := fmt.Sprintf(_tagSetInfo, _tagTable, 1, updateSub, updateSub)
  543. res, err := d.db.Exec(ctx, sql, args...)
  544. if err != nil {
  545. log.Error("[dao.xanchor.mysql|roomTagSet] set anchor tag error(%v), req(%v)", err, req)
  546. return
  547. }
  548. resp.AffectedRows, err = res.RowsAffected()
  549. return
  550. }