tidb.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/job/main/thumbup/model"
  7. sql "go-common/library/database/tidb"
  8. "go-common/library/log"
  9. xtime "go-common/library/time"
  10. )
  11. const (
  12. _statSQL = "SELECT message_id, likes_count, dislikes_count, origin_id, likes_change, dislikes_change FROM counts WHERE business_id = ? AND origin_id = ? AND message_id = ?"
  13. _itemLikesSQL = "SELECT mid, mtime FROM likes WHERE business_id =? AND origin_id =? AND message_id =? AND type = ? ORDER BY mtime desc LIMIT ?"
  14. _likeStateSQL = "SELECT type FROM likes WHERE mid=? AND business_id=? AND origin_id=? AND message_id=?"
  15. _businessesSQL = "SELECT id, name, message_list_type, user_likes_limit, message_likes_limit, enable_originid, user_list_type FROM business WHERE dtime = '0000-00-00 00:00:00'"
  16. _updateCountsSQL = "INSERT INTO counts (business_id, origin_id, message_id, likes_count, dislikes_count, up_mid) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE "
  17. _userLikeListSQL = "SELECT message_id, mtime FROM likes WHERE mid = ? AND business_id =? AND type = ? ORDER BY mtime desc LIMIT ?"
  18. _updateLikeStateSQL = "INSERT INTO likes (business_id, origin_id, message_id, mid, type, mtime) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE type=?, mtime=?"
  19. )
  20. // Business .
  21. func (d *Dao) Business(c context.Context) (res []*model.Business, err error) {
  22. var rows *sql.Rows
  23. if rows, err = d.businessesStmt.Query(c); err != nil {
  24. return
  25. }
  26. defer rows.Close()
  27. for rows.Next() {
  28. b := &model.Business{}
  29. if err = rows.Scan(&b.ID, &b.Name, &b.MessageListType, &b.UserLikesLimit, &b.MessageLikesLimit, &b.EnableOriginID, &b.UserListType); err != nil {
  30. return
  31. }
  32. res = append(res, b)
  33. }
  34. err = rows.Err()
  35. return
  36. }
  37. // UserLikes .
  38. func (d *Dao) UserLikes(c context.Context, mid, businessID int64, typ int8, limit int) (res []*model.ItemLikeRecord, err error) {
  39. var rows *sql.Rows
  40. if rows, err = d.userLikesStmt.Query(c, mid, businessID, typ, limit); err != nil {
  41. log.Error("d.tidb.userList error(%v)", err)
  42. return
  43. }
  44. defer rows.Close()
  45. for rows.Next() {
  46. b := &model.ItemLikeRecord{}
  47. var t time.Time
  48. if err = rows.Scan(&b.MessageID, &t); err != nil {
  49. log.Error("tidb.rows.userList error(%v)", err)
  50. return
  51. }
  52. b.Time = xtime.Time(t.Unix())
  53. res = append(res, b)
  54. }
  55. err = rows.Err()
  56. return
  57. }
  58. // ItemLikes .
  59. func (d *Dao) ItemLikes(c context.Context, businessID, originID, messageID int64, typ int8, limit int) (res []*model.UserLikeRecord, err error) {
  60. var rows *sql.Rows
  61. if rows, err = d.itemLikesStmt.Query(c, businessID, originID, messageID, typ, limit); err != nil {
  62. return
  63. }
  64. defer rows.Close()
  65. for rows.Next() {
  66. b := &model.UserLikeRecord{}
  67. var t time.Time
  68. if err = rows.Scan(&b.Mid, &t); err != nil {
  69. return
  70. }
  71. b.Time = xtime.Time(t.Unix())
  72. res = append(res, b)
  73. }
  74. err = rows.Err()
  75. return
  76. }
  77. // LikeState .
  78. func (d *Dao) LikeState(c context.Context, mid, businessID, originID, messageID int64) (res int8, err error) {
  79. if err = d.likeStateStmt.QueryRow(c, mid, businessID, originID, messageID).Scan(&res); err != nil {
  80. if err == sql.ErrNoRows {
  81. err = nil
  82. return
  83. }
  84. log.Error("tidbLikeState(%d,%d,%d,%d) error(%v)", mid, businessID, originID, messageID, err)
  85. }
  86. return
  87. }
  88. // UpdateLikeState .
  89. func (d *Dao) UpdateLikeState(c context.Context, mid, businessID, originID, messageID int64, state int8, likeTime time.Time) (res model.Stats, err error) {
  90. var tx *sql.Tx
  91. if tx, err = d.tidb.Begin(c); err != nil {
  92. log.Error("d.tidb.Begin() error(%v)", err)
  93. return
  94. }
  95. if _, err = tx.Stmts(d.updateLikeStateStmt).Exec(c, businessID, originID, messageID, mid, state, likeTime, state, likeTime); err != nil {
  96. log.Error("d.tidbUpdateLikeState error(%v)", err)
  97. if err1 := tx.Rollback(); err1 != nil {
  98. log.Error("tx.Rollback error(%v)", err)
  99. }
  100. return
  101. }
  102. if res, err = d.tidbTxStat(c, tx, businessID, originID, messageID); err != nil {
  103. if err1 := tx.Rollback(); err1 != nil {
  104. log.Error("tx.Rollback() error(%v)", err1)
  105. }
  106. return
  107. }
  108. if err = tx.Commit(); err != nil {
  109. log.Error("tx.Commit() error(%v)", err)
  110. }
  111. return
  112. }
  113. // UpdateCounts .
  114. func (d *Dao) UpdateCounts(c context.Context, businessID, originID, messageID, likeCounts, dislikeCounts, upMid int64) (err error) {
  115. var tx *sql.Tx
  116. if tx, err = d.tidb.Begin(c); err != nil {
  117. log.Error("tx.tidb.Begin() error(%v)", err)
  118. return
  119. }
  120. var stat model.Stats
  121. if stat, err = d.tidbTxStat(c, tx, businessID, originID, messageID); err != nil {
  122. if err1 := tx.Rollback(); err1 != nil {
  123. log.Error("tx.Rollback() error(%v)", err1)
  124. }
  125. return
  126. }
  127. if stat.Likes+likeCounts < 0 {
  128. likeCounts = -stat.Likes
  129. }
  130. if stat.Dislikes+dislikeCounts < 0 {
  131. dislikeCounts = -stat.Dislikes
  132. }
  133. if err = d.tidbTxUpdateCounts(c, tx, businessID, originID, messageID, likeCounts, dislikeCounts, upMid); err != nil {
  134. if err1 := tx.Rollback(); err1 != nil {
  135. log.Error("tx.Rollback() error(%v)", err1)
  136. }
  137. return
  138. }
  139. if err = tx.Commit(); err != nil {
  140. log.Error("tx.Commit() error(%v)", err)
  141. }
  142. return
  143. }
  144. // tidbTxStat .
  145. func (d *Dao) tidbTxStat(c context.Context, tx *sql.Tx, businessID, originID, messageID int64) (res model.Stats, err error) {
  146. var likeChange, dislikeChange int64
  147. if err = tx.Stmts(d.statStmt).QueryRow(c, businessID, originID, messageID).Scan(&res.ID, &res.Likes, &res.Dislikes, &res.OriginID, &likeChange, &dislikeChange); err != nil {
  148. if err == sql.ErrNoRows {
  149. err = nil
  150. res.ID = messageID
  151. res.OriginID = originID
  152. return
  153. }
  154. log.Error("tidb: Tx db.Stat.Query error(%v,%v,%v,%v)", businessID, originID, messageID, err)
  155. return
  156. }
  157. res.Likes += likeChange
  158. res.Dislikes += dislikeChange
  159. return
  160. }
  161. // TxUpdateCounts .
  162. func (d *Dao) tidbTxUpdateCounts(c context.Context, tx *sql.Tx, businessID, originID, messageID int64, likesCount, dislikesCount, upMid int64) (err error) {
  163. likeSQL := _updateCountsSQL
  164. if (likesCount == 0) && (dislikesCount == 0) {
  165. return
  166. }
  167. if (likesCount != 0) && (dislikesCount == 0) {
  168. if likesCount > 0 {
  169. likeSQL += fmt.Sprintf("likes_count = likes_count + %v", likesCount)
  170. } else {
  171. likeSQL += fmt.Sprintf("likes_count = likes_count %v", likesCount)
  172. }
  173. } else if (likesCount == 0) && (dislikesCount != 0) {
  174. if dislikesCount > 0 {
  175. likeSQL += fmt.Sprintf("dislikes_count = dislikes_count + %v", dislikesCount)
  176. } else {
  177. likeSQL += fmt.Sprintf("dislikes_count = dislikes_count %v", dislikesCount)
  178. }
  179. } else {
  180. if likesCount > 0 {
  181. likeSQL += fmt.Sprintf("likes_count = likes_count + %v", likesCount)
  182. } else {
  183. likeSQL += fmt.Sprintf("likes_count = likes_count %v", likesCount)
  184. }
  185. if dislikesCount > 0 {
  186. likeSQL += fmt.Sprintf(", dislikes_count = dislikes_count + %v", dislikesCount)
  187. } else {
  188. likeSQL += fmt.Sprintf(", dislikes_count = dislikes_count %v", dislikesCount)
  189. }
  190. }
  191. if upMid > 0 {
  192. likeSQL += fmt.Sprintf(", up_mid = %d", upMid)
  193. }
  194. if _, err = tx.Exec(likeSQL, businessID, originID, messageID, likesCount, dislikesCount, upMid); err != nil {
  195. log.Error("dao.tidbTxUpdateCounts tx.Exec(%s, %v,%v,%v) error(%v)", likeSQL, businessID, originID, messageID, err)
  196. }
  197. return
  198. }
  199. // Stat .
  200. func (d *Dao) Stat(c context.Context, businessID, originID, messageID int64) (res model.Stats, err error) {
  201. var likeChange, dislikeChange int64
  202. if err = d.statStmt.QueryRow(c, businessID, originID, messageID).Scan(&res.ID, &res.Likes, &res.Dislikes, &res.OriginID, &likeChange, &dislikeChange); err != nil {
  203. if err == sql.ErrNoRows {
  204. err = nil
  205. res.ID = messageID
  206. res.OriginID = originID
  207. return
  208. }
  209. log.Error("tidb.Stat.Query error(%v)", err)
  210. }
  211. res.Likes += likeChange
  212. res.Dislikes += dislikeChange
  213. return
  214. }