subject.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/admin/main/reply/model"
  7. xsql "go-common/library/database/sql"
  8. "go-common/library/log"
  9. "go-common/library/sync/errgroup"
  10. "go-common/library/xstr"
  11. "sync"
  12. )
  13. const _subShard int64 = 50
  14. const (
  15. _selSubjectSQL = "SELECT id,oid,type,mid,count,rcount,acount,state,attr,ctime,mtime FROM reply_subject_%d WHERE oid=? AND type=?"
  16. _selSubjectForUpdateSQL = "SELECT id,oid,type,mid,count,rcount,acount,state,attr,ctime,mtime FROM reply_subject_%d WHERE oid=? AND type=? FOR UPDATE"
  17. _selSubjectsSQL = "SELECT id,oid,type,mid,count,rcount,acount,state,attr,ctime,mtime FROM reply_subject_%d WHERE oid IN (%s) AND type=?"
  18. _inSubjectSQL = "INSERT INTO reply_subject_%d (oid,type,mid,state,ctime,mtime) VALUES(?,?,?,?,?,?) ON DUPLICATE KEY UPDATE state=?,mid=?,mtime=?"
  19. _selSubjectMCountSQL = "SELECT oid, mcount FROM reply_subject_%d WHERE oid IN(%s) AND type=?"
  20. _upSubStateSQL = "UPDATE reply_subject_%d SET state=?,mtime=? WHERE oid=? AND type=?"
  21. _upSubAttrSQL = "UPDATE reply_subject_%d SET attr=?,mtime=? WHERE oid=? AND type=?"
  22. _upSubStateAndAttrSQL = "UPDATE reply_subject_%d SET state=?,attr=?,mtime=? WHERE oid=? AND type=?"
  23. _upSubMetaSQL = "UPDATE reply_subject_%d SET meta=?,mtime=? WHERE oid=? AND type=?"
  24. _incrSubCountSQL = "UPDATE reply_subject_%d SET count=count+1,rcount=rcount+1,acount=acount+1,mtime=? WHERE oid=? AND type=?"
  25. _incrSubFCountSQL = "UPDATE reply_subject_%d SET count=count+1,mtime=? WHERE oid=? AND type=?"
  26. _incrSubRCountSQL = "UPDATE reply_subject_%d SET rcount=rcount+1,mtime=? WHERE oid=? AND type=?"
  27. _incrSubACountSQL = "UPDATE reply_subject_%d SET acount=acount+?,mtime=? WHERE oid=? AND type=?"
  28. _decrSubRCountSQL = "UPDATE reply_subject_%d SET rcount=rcount-1,mtime=? WHERE oid=? AND type=?"
  29. _decrSubACountSQL = "UPDATE reply_subject_%d SET acount=acount-?,mtime=? WHERE oid=? AND type=?"
  30. _decrSubMCountSQL = "UPDATE reply_subject_%d SET mcount=mcount-1,mtime=? WHERE oid=? AND type=? AND mcount>0"
  31. )
  32. func subHit(id int64) int64 {
  33. return id % _subShard
  34. }
  35. // SubMCount get subject mcount from mysql
  36. func (d *Dao) SubMCount(c context.Context, oids []int64, typ int32) (res map[int64]int32, err error) {
  37. hits := make(map[int64][]int64)
  38. for _, oid := range oids {
  39. hit := subHit(oid)
  40. hits[hit] = append(hits[hit], oid)
  41. }
  42. res = make(map[int64]int32, len(oids))
  43. wg, ctx := errgroup.WithContext(c)
  44. var lock = sync.RWMutex{}
  45. for idx, oids := range hits {
  46. o := oids
  47. i := idx
  48. wg.Go(func() (err error) {
  49. var rows *xsql.Rows
  50. if rows, err = d.db.Query(ctx, fmt.Sprintf(_selSubjectMCountSQL, i, xstr.JoinInts(o)), typ); err != nil {
  51. log.Error("dao.db.Query error(%v)", err)
  52. return
  53. }
  54. var mcount int32
  55. var oid int64
  56. for rows.Next() {
  57. if err = rows.Scan(&oid, &mcount); err != nil {
  58. if err == xsql.ErrNoRows {
  59. mcount = 0
  60. oid = 0
  61. err = nil
  62. continue
  63. } else {
  64. log.Error("row.Scan error(%v)", err)
  65. rows.Close()
  66. return
  67. }
  68. }
  69. lock.Lock()
  70. res[oid] = mcount
  71. lock.Unlock()
  72. }
  73. if err = rows.Err(); err != nil {
  74. log.Error("rows.err error(%v)", err)
  75. rows.Close()
  76. return
  77. }
  78. rows.Close()
  79. return
  80. })
  81. }
  82. if err = wg.Wait(); err != nil {
  83. return
  84. }
  85. return
  86. }
  87. // Subjects get subjects from mysql.
  88. func (d *Dao) Subjects(c context.Context, oids []int64, typ int32) (subMap map[int64]*model.Subject, err error) {
  89. hitMap := make(map[int64][]int64)
  90. for _, oid := range oids {
  91. hitMap[subHit(oid)] = append(hitMap[subHit(oid)], oid)
  92. }
  93. subMap = make(map[int64]*model.Subject)
  94. for hit, ids := range hitMap {
  95. var rows *xsql.Rows
  96. rows, err = d.db.Query(c, fmt.Sprintf(_selSubjectsSQL, hit, xstr.JoinInts(ids)), typ)
  97. if err != nil {
  98. return
  99. }
  100. for rows.Next() {
  101. m := new(model.Subject)
  102. if err = rows.Scan(&m.ID, &m.Oid, &m.Type, &m.Mid, &m.Count, &m.RCount, &m.ACount, &m.State, &m.Attr, &m.CTime, &m.MTime); err != nil {
  103. rows.Close()
  104. return
  105. }
  106. subMap[m.Oid] = m
  107. }
  108. if err = rows.Err(); err != nil {
  109. rows.Close()
  110. return
  111. }
  112. rows.Close()
  113. }
  114. return
  115. }
  116. // Subject get a subject from mysql.
  117. func (d *Dao) Subject(c context.Context, oid int64, typ int32) (m *model.Subject, err error) {
  118. m = new(model.Subject)
  119. row := d.db.QueryRow(c, fmt.Sprintf(_selSubjectSQL, subHit(oid)), oid, typ)
  120. if err = row.Scan(&m.ID, &m.Oid, &m.Type, &m.Mid, &m.Count, &m.RCount, &m.ACount, &m.State, &m.Attr, &m.CTime, &m.MTime); err != nil {
  121. if err == xsql.ErrNoRows {
  122. m = nil
  123. err = nil
  124. }
  125. }
  126. return
  127. }
  128. // TxSubject get a subject from mysql.
  129. func (d *Dao) TxSubject(tx *xsql.Tx, oid int64, typ int32) (m *model.Subject, err error) {
  130. m = new(model.Subject)
  131. row := tx.QueryRow(fmt.Sprintf(_selSubjectSQL, subHit(oid)), oid, typ)
  132. if err = row.Scan(&m.ID, &m.Oid, &m.Type, &m.Mid, &m.Count, &m.RCount, &m.ACount, &m.State, &m.Attr, &m.CTime, &m.MTime); err != nil {
  133. if err == xsql.ErrNoRows {
  134. m = nil
  135. err = nil
  136. }
  137. }
  138. return
  139. }
  140. // TxSubjectForUpdate get a subject from mysql for update.
  141. func (d *Dao) TxSubjectForUpdate(tx *xsql.Tx, oid int64, typ int32) (m *model.Subject, err error) {
  142. m = new(model.Subject)
  143. row := tx.QueryRow(fmt.Sprintf(_selSubjectForUpdateSQL, subHit(oid)), oid, typ)
  144. if err = row.Scan(&m.ID, &m.Oid, &m.Type, &m.Mid, &m.Count, &m.RCount, &m.ACount, &m.State, &m.Attr, &m.CTime, &m.MTime); err != nil {
  145. if err == xsql.ErrNoRows {
  146. m = nil
  147. err = nil
  148. }
  149. }
  150. return
  151. }
  152. // AddSubject insert or update subject state.
  153. func (d *Dao) AddSubject(c context.Context, mid, oid int64, typ, state int32, now time.Time) (id int64, err error) {
  154. res, err := d.db.Exec(c, fmt.Sprintf(_inSubjectSQL, subHit(oid)), oid, typ, mid, state, now, now, state, mid, now)
  155. if err != nil {
  156. return
  157. }
  158. return res.LastInsertId()
  159. }
  160. // UpSubjectState update subject state.
  161. func (d *Dao) UpSubjectState(c context.Context, oid int64, typ, state int32, now time.Time) (rows int64, err error) {
  162. res, err := d.db.Exec(c, fmt.Sprintf(_upSubStateSQL, subHit(oid)), state, now, oid, typ)
  163. if err != nil {
  164. return
  165. }
  166. return res.RowsAffected()
  167. }
  168. // UpSubjectAttr update subject attr.
  169. func (d *Dao) UpSubjectAttr(c context.Context, oid int64, typ int32, attr uint32, now time.Time) (rows int64, err error) {
  170. res, err := d.db.Exec(c, fmt.Sprintf(_upSubAttrSQL, subHit(oid)), attr, now, oid, typ)
  171. if err != nil {
  172. return
  173. }
  174. return res.RowsAffected()
  175. }
  176. // UpStateAndAttr update subject state and attr
  177. func (d *Dao) UpStateAndAttr(c context.Context, oid int64, typ, state int32, attr uint32, now time.Time) (rows int64, err error) {
  178. res, err := d.db.Exec(c, fmt.Sprintf(_upSubStateAndAttrSQL, subHit(oid)), state, attr, now, oid, typ)
  179. if err != nil {
  180. return
  181. }
  182. return res.RowsAffected()
  183. }
  184. // TxIncrSubCount incr subject count and rcount by transaction.
  185. func (d *Dao) TxIncrSubCount(tx *xsql.Tx, oid int64, typ int32, now time.Time) (rows int64, err error) {
  186. res, err := tx.Exec(fmt.Sprintf(_incrSubCountSQL, subHit(oid)), now, oid, typ)
  187. if err != nil {
  188. return
  189. }
  190. return res.RowsAffected()
  191. }
  192. // TxIncrSubFCount incr subject count and rcount by transaction.
  193. func (d *Dao) TxIncrSubFCount(tx *xsql.Tx, oid int64, typ int32, now time.Time) (rows int64, err error) {
  194. res, err := tx.Exec(fmt.Sprintf(_incrSubFCountSQL, subHit(oid)), now, oid, typ)
  195. if err != nil {
  196. return
  197. }
  198. return res.RowsAffected()
  199. }
  200. // TxIncrSubRCount incr subject rcount by transaction
  201. func (d *Dao) TxIncrSubRCount(tx *xsql.Tx, oid int64, typ int32, now time.Time) (rows int64, err error) {
  202. res, err := tx.Exec(fmt.Sprintf(_incrSubRCountSQL, subHit(oid)), now, oid, typ)
  203. if err != nil {
  204. return
  205. }
  206. return res.RowsAffected()
  207. }
  208. // TxDecrSubRCount decr subject count by transaction.
  209. func (d *Dao) TxDecrSubRCount(tx *xsql.Tx, oid int64, typ int32, now time.Time) (rows int64, err error) {
  210. res, err := tx.Exec(fmt.Sprintf(_decrSubRCountSQL, subHit(oid)), now, oid, typ)
  211. if err != nil {
  212. return
  213. }
  214. return res.RowsAffected()
  215. }
  216. // TxIncrSubACount incr subject acount by transaction.
  217. func (d *Dao) TxIncrSubACount(tx *xsql.Tx, oid int64, typ int32, count int32, now time.Time) (rows int64, err error) {
  218. res, err := tx.Exec(fmt.Sprintf(_incrSubACountSQL, subHit(oid)), count, now, oid, typ)
  219. if err != nil {
  220. return
  221. }
  222. return res.RowsAffected()
  223. }
  224. // TxSubDecrACount decr subject rcount by transaction.
  225. func (d *Dao) TxSubDecrACount(tx *xsql.Tx, oid int64, typ int32, count int32, now time.Time) (rows int64, err error) {
  226. res, err := tx.Exec(fmt.Sprintf(_decrSubACountSQL, subHit(oid)), count, now, oid, typ)
  227. if err != nil {
  228. return
  229. }
  230. return res.RowsAffected()
  231. }
  232. // TxSubDecrMCount decr subject mcount by transaction.
  233. func (d *Dao) TxSubDecrMCount(tx *xsql.Tx, oid int64, typ int32, now time.Time) (rows int64, err error) {
  234. res, err := tx.Exec(fmt.Sprintf(_decrSubMCountSQL, subHit(oid)), now, oid, typ)
  235. if err != nil {
  236. return
  237. }
  238. return res.RowsAffected()
  239. }
  240. // TxUpSubAttr update subject attr.
  241. func (d *Dao) TxUpSubAttr(tx *xsql.Tx, oid int64, tp int32, attr uint32, now time.Time) (rows int64, err error) {
  242. res, err := tx.Exec(fmt.Sprintf(_upSubAttrSQL, subHit(oid)), attr, now, oid, tp)
  243. if err != nil {
  244. log.Error("mysqlDB.Exec error(%v)", err)
  245. return
  246. }
  247. return res.RowsAffected()
  248. }
  249. // TxUpSubMeta update subject meta.
  250. func (d *Dao) TxUpSubMeta(tx *xsql.Tx, oid int64, tp int32, meta string, now time.Time) (rows int64, err error) {
  251. res, err := tx.Exec(fmt.Sprintf(_upSubMetaSQL, subHit(oid)), meta, now, oid, tp)
  252. if err != nil {
  253. log.Error("mysqlDB.Exec error(%v)", err)
  254. return
  255. }
  256. return res.RowsAffected()
  257. }