reply.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/admin/main/reply/model"
  7. xsql "go-common/library/database/sql"
  8. "go-common/library/log"
  9. "go-common/library/xstr"
  10. )
  11. const (
  12. _inSQL = "INSERT IGNORE INTO reply_%d (id,oid,type,mid,root,parent,floor,state,attr,ctime,mtime) VALUES(?,?,?,?,?,?,?,?,?,?,?)"
  13. _selReplySQL = "SELECT id,oid,type,mid,root,parent,dialog,count,rcount,`like`,hate,floor,state,attr,ctime,mtime FROM reply_%d WHERE id=?"
  14. _txSelReplySQLForUpdate = "SELECT id,oid,type,mid,root,parent,dialog,count,rcount,`like`,hate,floor,state,attr,ctime,mtime FROM reply_%d WHERE id=? for update"
  15. _selRepliesSQL = "SELECT id,oid,type,mid,root,parent,dialog,count,rcount,`like`,floor,state,attr,ctime,mtime FROM reply_%d WHERE id IN (%s)"
  16. _incrRCountSQL = "UPDATE reply_%d SET rcount=rcount+1,mtime=? WHERE id=?"
  17. _upReplyStateSQL = "UPDATE reply_%d SET state=?,mtime=? WHERE id=?"
  18. _decrCntSQL = "UPDATE reply_%d SET rcount=rcount-1,mtime=? WHERE id=? AND rcount > 0"
  19. _upAttrSQL = "UPDATE reply_%d SET attr=?,mtime=? WHERE id=?"
  20. _selExportRepliesSQL = "SELECT id,oid,type,mid,root,parent,count,rcount,`like`,hate,floor,state,attr,T1.ctime,message from reply_%d as T1 inner join reply_content_%d as T2 on id=rpid where oid=? and type=? and T1.ctime>=? and T2.ctime<=?"
  21. )
  22. // InsertReply insert reply by transaction.
  23. func (d *Dao) InsertReply(c context.Context, r *model.Reply) (id int64, err error) {
  24. res, err := d.db.Exec(c, fmt.Sprintf(_inSQL, hit(r.Oid)), r.ID, r.Oid, r.Type, r.Mid, r.Root, r.Parent, r.Floor, r.State, r.Attr, r.CTime, r.MTime)
  25. if err != nil {
  26. log.Error("mysqlDB.Exec error(%v)", err)
  27. return
  28. }
  29. return res.LastInsertId()
  30. }
  31. // Reply get a reply from database.
  32. func (d *Dao) Reply(c context.Context, oid, rpID int64) (r *model.Reply, err error) {
  33. r = new(model.Reply)
  34. row := d.db.QueryRow(c, fmt.Sprintf(_selReplySQL, hit(oid)), rpID)
  35. if err = row.Scan(&r.ID, &r.Oid, &r.Type, &r.Mid, &r.Root, &r.Parent, &r.Dialog, &r.Count, &r.RCount, &r.Like, &r.Hate, &r.Floor, &r.State, &r.Attr, &r.CTime, &r.MTime); err != nil {
  36. if err == xsql.ErrNoRows {
  37. r = nil
  38. err = nil
  39. }
  40. }
  41. return
  42. }
  43. // Replies get replies by reply ids.
  44. func (d *Dao) Replies(c context.Context, oids []int64, rpIds []int64) (rpMap map[int64]*model.Reply, err error) {
  45. hitMap := make(map[int64][]int64)
  46. for i, oid := range oids {
  47. hitMap[hit(oid)] = append(hitMap[hit(oid)], rpIds[i])
  48. }
  49. rpMap = make(map[int64]*model.Reply, len(rpIds))
  50. for hit, ids := range hitMap {
  51. var rows *xsql.Rows
  52. rows, err = d.db.Query(c, fmt.Sprintf(_selRepliesSQL, hit, xstr.JoinInts(ids)))
  53. if err != nil {
  54. return
  55. }
  56. for rows.Next() {
  57. r := &model.Reply{}
  58. if err = rows.Scan(&r.ID, &r.Oid, &r.Type, &r.Mid, &r.Root, &r.Parent, &r.Dialog, &r.Count, &r.RCount, &r.Like, &r.Floor, &r.State, &r.Attr, &r.CTime, &r.MTime); err != nil {
  59. rows.Close()
  60. return
  61. }
  62. rpMap[r.ID] = r
  63. }
  64. if err = rows.Err(); err != nil {
  65. rows.Close()
  66. return
  67. }
  68. rows.Close()
  69. }
  70. return
  71. }
  72. // UpdateReplyState update reply state.
  73. func (d *Dao) UpdateReplyState(c context.Context, oid, rpID int64, state int32) (rows int64, err error) {
  74. now := time.Now()
  75. res, err := d.db.Exec(c, fmt.Sprintf(_upReplyStateSQL, hit(oid)), state, now, rpID)
  76. if err != nil {
  77. return
  78. }
  79. return res.RowsAffected()
  80. }
  81. // TxUpdateReplyState tx update reply state.
  82. func (d *Dao) TxUpdateReplyState(tx *xsql.Tx, oid, rpID int64, state int32, mtime time.Time) (rows int64, err error) {
  83. res, err := tx.Exec(fmt.Sprintf(_upReplyStateSQL, hit(oid)), state, mtime, rpID)
  84. if err != nil {
  85. return
  86. }
  87. return res.RowsAffected()
  88. }
  89. // TxIncrReplyRCount incr rcount of reply by transaction.
  90. func (d *Dao) TxIncrReplyRCount(tx *xsql.Tx, oid, rpID int64, now time.Time) (rows int64, err error) {
  91. res, err := tx.Exec(fmt.Sprintf(_incrRCountSQL, hit(oid)), now, rpID)
  92. if err != nil {
  93. return
  94. }
  95. return res.RowsAffected()
  96. }
  97. // TxDecrReplyRCount decr rcount of reply by transaction.
  98. func (d *Dao) TxDecrReplyRCount(tx *xsql.Tx, oid, rpID int64, now time.Time) (rows int64, err error) {
  99. res, err := tx.Exec(fmt.Sprintf(_decrCntSQL, hit(oid)), now, rpID)
  100. if err != nil {
  101. log.Error("mysqlDB.Exec error(%v)", err)
  102. return
  103. }
  104. return res.RowsAffected()
  105. }
  106. // TxUpReplyAttr update subject attr.
  107. func (d *Dao) TxUpReplyAttr(tx *xsql.Tx, oid, rpID int64, attr uint32, now time.Time) (rows int64, err error) {
  108. res, err := tx.Exec(fmt.Sprintf(_upAttrSQL, hit(oid)), attr, now, rpID)
  109. if err != nil {
  110. log.Error("mysqlDB.Exec error(%v)", err)
  111. return
  112. }
  113. return res.RowsAffected()
  114. }
  115. // TxReplyForUpdate get a reply from database.
  116. func (d *Dao) TxReplyForUpdate(tx *xsql.Tx, oid, rpID int64) (r *model.Reply, err error) {
  117. r = new(model.Reply)
  118. row := tx.QueryRow(fmt.Sprintf(_txSelReplySQLForUpdate, hit(oid)), rpID)
  119. if err = row.Scan(&r.ID, &r.Oid, &r.Type, &r.Mid, &r.Root, &r.Parent, &r.Dialog, &r.Count, &r.RCount, &r.Like, &r.Hate, &r.Floor, &r.State, &r.Attr, &r.CTime, &r.MTime); err != nil {
  120. if err == xsql.ErrNoRows {
  121. r = nil
  122. err = nil
  123. }
  124. }
  125. return
  126. }
  127. // TxReply get a reply from database.
  128. func (d *Dao) TxReply(tx *xsql.Tx, oid, rpID int64) (r *model.Reply, err error) {
  129. r = new(model.Reply)
  130. row := tx.QueryRow(fmt.Sprintf(_selReplySQL, hit(oid)), rpID)
  131. if err = row.Scan(&r.ID, &r.Oid, &r.Type, &r.Mid, &r.Root, &r.Parent, &r.Dialog, &r.Count, &r.RCount, &r.Like, &r.Hate, &r.Floor, &r.State, &r.Attr, &r.CTime, &r.MTime); err != nil {
  132. if err == xsql.ErrNoRows {
  133. r = nil
  134. err = nil
  135. }
  136. }
  137. return
  138. }
  139. // ExportReplies export replies
  140. func (d *Dao) ExportReplies(c context.Context, oid, mid int64, tp int8, state string, startTime, endTime time.Time) (data [][]string, err error) {
  141. var rows *xsql.Rows
  142. title := []string{"id", "oid", "type", "mid", "root", "parent", "count", "rcount", "like", "hate", "floor", "state", "attr", "ctime", "message"}
  143. data = append(data, title)
  144. query := fmt.Sprintf(_selExportRepliesSQL, hit(oid), hit(oid))
  145. if state != "" {
  146. query += fmt.Sprintf(" and state in (%s)", state)
  147. }
  148. if mid != 0 {
  149. query += fmt.Sprintf(" and mid=%d", mid)
  150. }
  151. rows, err = d.dbSlave.Query(c, query, oid, tp, startTime, endTime)
  152. if err != nil {
  153. return
  154. }
  155. defer rows.Close()
  156. for rows.Next() {
  157. r := &model.ExportedReply{}
  158. if err = rows.Scan(&r.ID, &r.Oid, &r.Type, &r.Mid, &r.Root, &r.Parent, &r.Count, &r.RCount, &r.Like, &r.Hate, &r.Floor, &r.State, &r.Attr, &r.CTime, &r.Message); err != nil {
  159. rows.Close()
  160. return
  161. }
  162. data = append(data, r.String())
  163. }
  164. if err = rows.Err(); err != nil {
  165. return
  166. }
  167. return
  168. }