mysql.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. package fav
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. favmdl "go-common/app/service/main/favorite/model"
  7. xsql "go-common/library/database/sql"
  8. "go-common/library/log"
  9. xtime "go-common/library/time"
  10. "go-common/library/xstr"
  11. )
  12. const (
  13. _countSharding int64 = 50 // count by oid
  14. _folderSharding int64 = 100 // folder by mid
  15. _relationSharding int64 = 500 // relations in folder by mid
  16. _usersSharding int64 = 500 // users by oid
  17. // folder
  18. _folderSQL = "SELECT id,type,mid,name,cover,description,count,attr,state,ctime,mtime FROM fav_folder_%s WHERE id=? AND type=? AND mid=? AND state=0"
  19. _upFolderCntSQL = "UPDATE fav_folder_%s SET count=?,mtime=? WHERE id=?"
  20. // relation
  21. _relationSQL = "SELECT id,type,oid,mid,fid,state,ctime,mtime,sequence FROM fav_relation_%s WHERE type=? AND mid=? AND fid=? AND oid=? AND state=0"
  22. _relationsSQL = "SELECT id,type,oid,mid,fid,state,ctime,mtime,sequence FROM fav_relation_%s FORCE INDEX(ix_fid_state_type_mtime) WHERE fid=? AND mtime>=? AND state=0 AND type=? ORDER BY mtime ASC LIMIT ?"
  23. _allRelationsSQL = "SELECT id,type,oid,mid,fid,state,ctime,mtime,sequence FROM fav_relation_%s FORCE INDEX(ix_fid_state_mtime) WHERE fid=? AND mtime>=? AND state=0 ORDER BY mtime ASC LIMIT ?"
  24. _relationOidsSQL = "SELECT oid FROM fav_relation_%s FORCE INDEX(ix_fid_state_type_mtime) WHERE fid=? AND state=0 AND type=? ORDER BY mtime DESC LIMIT ?,?"
  25. _relationFidsByOidSQL = "SELECT fid FROM fav_relation_%s WHERE type=? AND mid=? AND oid=? and state=0"
  26. _relationFidsSQL = "SELECT oid,fid FROM fav_relation_%s WHERE type=? AND mid=? AND state=0"
  27. _cntRelationSQL = "SELECT COUNT(id) FROM fav_relation_%s FORCE INDEX(ix_fid_state_sequence) WHERE fid=? and state=0"
  28. _addRelationSQL = "INSERT INTO fav_relation_%s (type,oid,mid,fid,state,ctime,mtime,sequence) VALUES(?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE state=?,mtime=?,sequence=?"
  29. _delRelationSQL = "UPDATE fav_relation_%s SET state=1,mtime=? WHERE type=? AND fid=? AND oid=?"
  30. _delRelationsByOidsSQL = "UPDATE fav_relation_%s SET state=1,mtime=? WHERE type=? AND fid=? AND oid in (%s)"
  31. _selRelationsByOidsSql = "SELECT id,type,oid,mid,fid,state,ctime,mtime,sequence FROM fav_relation_%s WHERE type=? AND fid=? AND oid in (%s)"
  32. _updateAllRelationSeq = "UPDATE fav_relation_%s SET mtime=mtime,sequence=(case id %s end) WHERE id in (%s)"
  33. _updateRelationSeq = "UPDATE fav_relation_%s SET sequence=?,mtime=? WHERE fid=? and oid=? and type=?"
  34. _selectMaxSequence = "SELECT id,type,oid,mid,fid,state,ctime,mtime,sequence FROM fav_relation_%s FORCE INDEX(ix_fid_state_sequence) WHERE fid=? AND state=0 order BY sequence desc limit 1"
  35. _recentOidsSQL = "SELECT oid,type FROM fav_relation_%s FORCE INDEX(ix_fid_state_mtime) WHERE fid=? AND state=0 ORDER BY mtime DESC LIMIT 3"
  36. // users
  37. _addUserSQL = "INSERT INTO fav_users_%s (type,oid,mid,state,ctime,mtime) VALUES(?,?,?,?,?,?) ON DUPLICATE KEY UPDATE state=?,mtime=?"
  38. _delUserSQL = "UPDATE fav_users_%s SET state=1,mtime=? WHERE type=? AND oid=? AND mid=?"
  39. // stat
  40. _statCntSQL = "SELECT count from fav_count_%s WHERE type=? AND oid=?"
  41. _upStatCntSQL = "INSERT INTO fav_count_%s (type,oid,count,ctime,mtime) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE count=count+?,mtime=?"
  42. )
  43. func cntHit(mid int64) string {
  44. return fmt.Sprintf("%02d", mid%_countSharding)
  45. }
  46. func folderHit(mid int64) string {
  47. return fmt.Sprintf("%02d", mid%_folderSharding)
  48. }
  49. func relationHit(mid int64) string {
  50. return fmt.Sprintf("%03d", mid%_relationSharding)
  51. }
  52. func usersHit(oid int64) string {
  53. return fmt.Sprintf("%03d", oid%_usersSharding)
  54. }
  55. // pingMySQL check mysql connection.
  56. func (d *Dao) pingMySQL(c context.Context) error {
  57. return d.db.Ping(c)
  58. }
  59. func (d *Dao) TxUpdateFavSequence(tx *xsql.Tx, mid int64, fid, oid int64, typ int8, sequence uint64, mtime xtime.Time) (rows int64, err error) {
  60. res, err := tx.Exec(fmt.Sprintf(_updateRelationSeq, relationHit(mid)), sequence, mtime, fid, oid, typ)
  61. if err != nil {
  62. log.Error("d.db.Exec(%s,%d,%d,%d,%d,%d) error(%v)", _updateRelationSeq, sequence, mtime, fid, oid, typ, err)
  63. return
  64. }
  65. return res.RowsAffected()
  66. }
  67. func (d *Dao) BatchUpdateSeq(c context.Context, mid int64, favs []*favmdl.Favorite) (int64, error) {
  68. var caseStr string
  69. var ids []int64
  70. for _, fav := range favs {
  71. ids = append(ids, fav.ID)
  72. caseStr += fmt.Sprintf("when %d then %d ", fav.ID, fav.Sequence)
  73. }
  74. if len(ids) <= 0 {
  75. return 0, nil
  76. }
  77. res, err := d.db.Exec(c, fmt.Sprintf(_updateAllRelationSeq, relationHit(mid), caseStr, xstr.JoinInts(ids)))
  78. if err != nil {
  79. return 0, err
  80. }
  81. return res.RowsAffected()
  82. }
  83. // RecentRes return user's three newest fav from a folder.
  84. func (d *Dao) RecentRes(c context.Context, mid, fid int64) (res []*favmdl.Resource, err error) {
  85. rows, err := d.db.Query(c, fmt.Sprintf(_recentOidsSQL, relationHit(mid)), fid)
  86. if err != nil {
  87. log.Error("d.db.Query(%d,%d) error(%v)", mid, fid, err)
  88. return
  89. }
  90. defer rows.Close()
  91. for rows.Next() {
  92. var oid int64
  93. var typ int32
  94. if err = rows.Scan(&oid, &typ); err != nil {
  95. log.Error("rows.Scan error(%v)", err)
  96. return
  97. }
  98. res = append(res, &favmdl.Resource{Oid: oid, Typ: typ})
  99. }
  100. err = rows.Err()
  101. return
  102. }
  103. // Folder get a Folder by fid from mysql.
  104. func (d *Dao) Folder(c context.Context, tp int8, mid, fid int64) (f *favmdl.Folder, err error) {
  105. f = &favmdl.Folder{}
  106. row := d.db.QueryRow(c, fmt.Sprintf(_folderSQL, folderHit(mid)), fid, tp, mid)
  107. if err = row.Scan(&f.ID, &f.Type, &f.Mid, &f.Name, &f.Cover, &f.Description, &f.Count, &f.Attr, &f.State, &f.CTime, &f.MTime); err != nil {
  108. if err == sql.ErrNoRows {
  109. f = nil
  110. err = nil
  111. } else {
  112. log.Error("row.Scan error(%v)", err)
  113. }
  114. }
  115. return
  116. }
  117. // UpFolderCnt update folder count to mysql.
  118. func (d *Dao) UpFolderCnt(c context.Context, mid, fid int64, cnt int, now xtime.Time) (rows int64, err error) {
  119. res, err := d.db.Exec(c, fmt.Sprintf(_upFolderCntSQL, folderHit(mid)), cnt, now, fid)
  120. if err != nil {
  121. log.Error("db.Exec error(%v)", err)
  122. return
  123. }
  124. return res.RowsAffected()
  125. }
  126. // MaxRelation get a max sequence relation from mysql.
  127. func (d *Dao) MaxRelation(c context.Context, mid, fid int64) (m *favmdl.Favorite, err error) {
  128. m = &favmdl.Favorite{}
  129. row := d.db.QueryRow(c, fmt.Sprintf(_selectMaxSequence, relationHit(mid)), fid)
  130. if err = row.Scan(&m.ID, &m.Type, &m.Oid, &m.Mid, &m.Fid, &m.State, &m.CTime, &m.MTime, &m.Sequence); err != nil {
  131. if err == sql.ErrNoRows {
  132. err = nil
  133. m = nil
  134. } else {
  135. log.Error("row.Scan error(%v)", err)
  136. }
  137. }
  138. return
  139. }
  140. // Relation get a relation from mysql.
  141. func (d *Dao) Relation(c context.Context, tp int8, mid, fid, oid int64) (m *favmdl.Favorite, err error) {
  142. m = &favmdl.Favorite{}
  143. row := d.db.QueryRow(c, fmt.Sprintf(_relationSQL, relationHit(mid)), tp, mid, fid, oid)
  144. if err = row.Scan(&m.ID, &m.Type, &m.Oid, &m.Mid, &m.Fid, &m.State, &m.CTime, &m.MTime, &m.Sequence); err != nil {
  145. if err == sql.ErrNoRows {
  146. err = nil
  147. m = nil
  148. } else {
  149. log.Error("row.Scan error(%v)", err)
  150. }
  151. }
  152. return
  153. }
  154. // AllRelations get favorite relations from mysql.
  155. func (d *Dao) AllRelations(c context.Context, mid, fid int64, mtime xtime.Time, limit int) (fr []*favmdl.Favorite, err error) {
  156. rows, err := d.db.Query(c, fmt.Sprintf(_allRelationsSQL, relationHit(mid)), fid, mtime, limit)
  157. if err != nil {
  158. log.Error("d.db.Query(%s,%d,%d,%d,%d) error(%v)", fmt.Sprintf(_allRelationsSQL, relationHit(mid)), mid, fid, mtime, limit, err)
  159. return
  160. }
  161. defer rows.Close()
  162. fr = make([]*favmdl.Favorite, 0)
  163. for rows.Next() {
  164. var r = &favmdl.Favorite{}
  165. if err = rows.Scan(&r.ID, &r.Type, &r.Oid, &r.Mid, &r.Fid, &r.State, &r.CTime, &r.MTime, &r.Sequence); err != nil {
  166. log.Error("rows.Scan error(%v)", err)
  167. return
  168. }
  169. if r.Mid != mid {
  170. log.Error("dirty data relations(%d,%d)", mid, fid)
  171. continue
  172. }
  173. fr = append(fr, r)
  174. }
  175. err = rows.Err()
  176. return
  177. }
  178. func (d *Dao) RelationsByOids(c context.Context, typ int8, mid, fid int64, oids []int64) (fr []*favmdl.Favorite, err error) {
  179. if len(oids) <= 0 {
  180. return
  181. }
  182. rows, err := d.db.Query(c, fmt.Sprintf(_selRelationsByOidsSql, relationHit(mid), xstr.JoinInts(oids)), typ, fid)
  183. if err != nil {
  184. log.Error("d.db.Query(%s,%d,%d,%v) error(%v)", fmt.Sprintf(_selRelationsByOidsSql, relationHit(mid), xstr.JoinInts(oids)), typ, fid, err)
  185. return
  186. }
  187. defer rows.Close()
  188. fr = make([]*favmdl.Favorite, 0)
  189. for rows.Next() {
  190. var r = &favmdl.Favorite{}
  191. if err = rows.Scan(&r.ID, &r.Type, &r.Oid, &r.Mid, &r.Fid, &r.State, &r.CTime, &r.MTime, &r.Sequence); err != nil {
  192. log.Error("rows.Scan error(%v)", err)
  193. return
  194. }
  195. if r.Type != typ || r.Mid != mid {
  196. log.Error("dirty data relations(%d,%d,%d)", typ, mid, fid)
  197. continue
  198. }
  199. fr = append(fr, r)
  200. }
  201. err = rows.Err()
  202. return
  203. }
  204. // Relations get favorite relations from mysql.
  205. func (d *Dao) Relations(c context.Context, typ int8, mid, fid int64, mtime xtime.Time, limit int) (fr []*favmdl.Favorite, err error) {
  206. rows, err := d.db.Query(c, fmt.Sprintf(_relationsSQL, relationHit(mid)), fid, mtime, typ, limit)
  207. if err != nil {
  208. log.Error("d.db.Query(%s,%d,%d,%d,%d,%d) error(%v)", fmt.Sprintf(_relationsSQL, relationHit(mid)), typ, mid, fid, mtime, limit, err)
  209. return
  210. }
  211. defer rows.Close()
  212. fr = make([]*favmdl.Favorite, 0)
  213. for rows.Next() {
  214. var r = &favmdl.Favorite{}
  215. if err = rows.Scan(&r.ID, &r.Type, &r.Oid, &r.Mid, &r.Fid, &r.State, &r.CTime, &r.MTime, &r.Sequence); err != nil {
  216. log.Error("rows.Scan error(%v)", err)
  217. return
  218. }
  219. if r.Mid != mid {
  220. log.Error("dirty data relations(%d,%d,%d)", typ, mid, fid)
  221. continue
  222. }
  223. fr = append(fr, r)
  224. }
  225. err = rows.Err()
  226. return
  227. }
  228. // RelationFidsByOid get favortied folders in relations by oid from mysql.
  229. func (d *Dao) RelationFidsByOid(c context.Context, tp int8, mid, oid int64) (fids []int64, err error) {
  230. var (
  231. fid int64
  232. idx = relationHit(mid)
  233. )
  234. rows, err := d.db.Query(c, fmt.Sprintf(_relationFidsByOidSQL, idx), tp, mid, oid)
  235. if err != nil {
  236. log.Error("d.db.QueryRow(%d,%d,%d) error(%v)", mid, mid, oid, err)
  237. return
  238. }
  239. defer rows.Close()
  240. for rows.Next() {
  241. if err = rows.Scan(&fid); err != nil {
  242. log.Error("rows.Scan error(%v)", err)
  243. return
  244. }
  245. fids = append(fids, fid)
  246. }
  247. err = rows.Err()
  248. return
  249. }
  250. // RelationCnt get favoried folders count in relation from mysql.
  251. func (d *Dao) RelationCnt(c context.Context, mid, fid int64) (cnt int, err error) {
  252. row := d.db.QueryRow(c, fmt.Sprintf(_cntRelationSQL, relationHit(mid)), fid)
  253. if err = row.Scan(&cnt); err != nil {
  254. if err == sql.ErrNoRows {
  255. err = nil
  256. } else {
  257. log.Error("row.Scan error(%v)", err)
  258. }
  259. }
  260. return
  261. }
  262. // AddRelation add a favorite relation to mysql.
  263. func (d *Dao) AddRelation(c context.Context, fr *favmdl.Favorite) (rows int64, err error) {
  264. res, err := d.db.Exec(c, fmt.Sprintf(_addRelationSQL, relationHit(fr.Mid)), fr.Type, fr.Oid, fr.Mid, fr.Fid, fr.State, fr.CTime, fr.MTime, fr.Sequence, fr.State, fr.MTime, fr.Sequence)
  265. if err != nil {
  266. log.Error("db.Exec error(%v)", err)
  267. return
  268. }
  269. return res.LastInsertId()
  270. }
  271. // DelRelation delete a favorite relation to mysql.
  272. func (d *Dao) DelRelation(c context.Context, tp int8, mid, fid, oid int64, now xtime.Time) (rows int64, err error) {
  273. res, err := d.db.Exec(c, fmt.Sprintf(_delRelationSQL, relationHit(mid)), now, tp, fid, oid)
  274. if err != nil {
  275. log.Error("d.db.Exec(%d,%d,%d,%d) error(%v)", mid, tp, fid, oid, err)
  276. return
  277. }
  278. return res.RowsAffected()
  279. }
  280. // UpStatCnt update stat count to mysql.
  281. func (d *Dao) UpStatCnt(c context.Context, tp int8, oid int64, incr int, now xtime.Time) (rows int64, err error) {
  282. res, err := d.db.Exec(c, fmt.Sprintf(_upStatCntSQL, cntHit(oid)), tp, oid, incr, now, now, incr, now)
  283. if err != nil {
  284. log.Error("db.Exec error(%v)", err)
  285. return
  286. }
  287. return res.RowsAffected()
  288. }
  289. // StatCnt return stat count from mysql.
  290. func (d *Dao) StatCnt(c context.Context, tp int8, oid int64) (cnt int, err error) {
  291. row := d.db.QueryRow(c, fmt.Sprintf(_statCntSQL, cntHit(oid)), tp, oid)
  292. if err = row.Scan(&cnt); err != nil {
  293. if err == sql.ErrNoRows {
  294. err = nil
  295. } else {
  296. log.Error("row.Scan error(%v)", err)
  297. }
  298. }
  299. return
  300. }
  301. // RelationFids get the map [oid -> "fid,fid"] for user.
  302. func (d *Dao) RelationFids(c context.Context, tp int8, mid int64) (rfids map[int64][]int64, err error) {
  303. rows, err := d.db.Query(c, fmt.Sprintf(_relationFidsSQL, relationHit(mid)), tp, mid)
  304. if err != nil {
  305. log.Error("db.Query(%d) error(%v)", mid, err)
  306. return
  307. }
  308. defer rows.Close()
  309. rfids = make(map[int64][]int64, 128)
  310. for rows.Next() {
  311. var oid, fid int64
  312. if err = rows.Scan(&oid, &fid); err != nil {
  313. log.Error("rows.Scan error(%v)", err)
  314. return
  315. }
  316. rfids[oid] = append(rfids[oid], fid)
  317. }
  318. err = rows.Err()
  319. return
  320. }
  321. // OidsByFid get oids by fid
  322. func (d *Dao) OidsByFid(c context.Context, typ int8, mid, fid int64, offset, limit int) (oids []int64, err error) {
  323. rows, err := d.db.Query(c, fmt.Sprintf(_relationOidsSQL, relationHit(mid)), fid, typ, offset, limit)
  324. if err != nil {
  325. log.Error("d.db.Query(%d,%d,%d,%d,%d) error(%v)", typ, mid, fid, offset, limit, err)
  326. return
  327. }
  328. defer rows.Close()
  329. oid := int64(0)
  330. oids = make([]int64, 0, limit)
  331. for rows.Next() {
  332. if err = rows.Scan(&oid); err != nil {
  333. log.Error("rows.Scan error(%v)", err)
  334. return
  335. }
  336. oids = append(oids, oid)
  337. }
  338. err = rows.Err()
  339. return
  340. }
  341. // DelRelationsByOids update del state by fid and oids.
  342. func (d *Dao) DelRelationsByOids(c context.Context, typ int8, mid, fid int64, oids []int64, now xtime.Time) (rows int64, err error) {
  343. if len(oids) == 0 {
  344. return
  345. }
  346. oidsStr := xstr.JoinInts(oids)
  347. res, err := d.db.Exec(c, fmt.Sprintf(_delRelationsByOidsSQL, relationHit(mid), oidsStr), now, typ, fid)
  348. if err != nil {
  349. log.Error("d.db.Exec(%d,%d,%d,%s) error(%v)", typ, mid, fid, oidsStr, now, err)
  350. return
  351. }
  352. return res.RowsAffected()
  353. }
  354. // AddUser add a favorite user to mysql.
  355. func (d *Dao) AddUser(c context.Context, u *favmdl.User) (rows int64, err error) {
  356. res, err := d.db.Exec(c, fmt.Sprintf(_addUserSQL, usersHit(u.Oid)), u.Type, u.Oid, u.Mid, u.State, u.CTime, u.MTime, u.State, u.MTime)
  357. if err != nil {
  358. log.Error("db.Exec error(%v)", err)
  359. return
  360. }
  361. return res.LastInsertId()
  362. }
  363. // DelUser delete a favorite user.
  364. func (d *Dao) DelUser(c context.Context, u *favmdl.User) (rows int64, err error) {
  365. res, err := d.db.Exec(c, fmt.Sprintf(_delUserSQL, usersHit(u.Oid)), u.MTime, u.Type, u.Oid, u.Mid)
  366. if err != nil {
  367. log.Error("d.db.Exec(%d,%d,%d,%d) error(%v)", u.MTime, u.Type, u.Oid, u.Mid, err)
  368. return
  369. }
  370. return res.RowsAffected()
  371. }