index.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/job/main/dm/model"
  6. "go-common/library/log"
  7. "go-common/library/xstr"
  8. )
  9. const (
  10. _selAllIdxSQL = "SELECT id,type,oid,mid,progress,state,pool,attr,ctime,mtime FROM dm_index_%03d WHERE type=? AND oid=? AND state IN(0,6)"
  11. _selIdxHidesSQL = "SELECT id,type,oid,mid,progress,state,pool,attr,ctime,mtime FROM dm_index_%03d FORCE INDEX(ix_oid_state) WHERE type=? AND oid=? AND state=2 ORDER BY id DESC limit ?" // NOTE slow query
  12. _upIdxSQL = "UPDATE dm_index_%03d SET mid=?,progress=?,state=?,pool=?,attr=? WHERE id=?"
  13. _upIdxStatesSQL = "UPDATE dm_index_%03d SET state=? WHERE id IN(%s)"
  14. )
  15. // DMInfos get indexs of oid.
  16. func (d *Dao) DMInfos(c context.Context, tp int32, oid int64) (dms []*model.DM, err error) {
  17. rows, err := d.dmReader.Query(c, fmt.Sprintf(_selAllIdxSQL, d.hitIndex(oid)), tp, oid)
  18. if err != nil {
  19. log.Error("db.Query(%d %d) error(%v)", tp, oid, err)
  20. return
  21. }
  22. defer rows.Close()
  23. for rows.Next() {
  24. dm := &model.DM{}
  25. if err = rows.Scan(&dm.ID, &dm.Type, &dm.Oid, &dm.Mid, &dm.Progress, &dm.State, &dm.Pool, &dm.Attr, &dm.Ctime, &dm.Mtime); err != nil {
  26. log.Error("rows.Scan() error(%v)", err)
  27. return
  28. }
  29. dms = append(dms, dm)
  30. }
  31. return
  32. }
  33. // DMHides get hide index info from db by oid and state.
  34. func (d *Dao) DMHides(c context.Context, typ int32, oid, limit int64) (res []*model.DM, err error) {
  35. rows, err := d.dmReader.Query(c, fmt.Sprintf(_selIdxHidesSQL, d.hitIndex(oid)), typ, oid, limit)
  36. if err != nil {
  37. log.Error("db.Query(%d %d) error(%v)", typ, oid, err)
  38. return
  39. }
  40. defer rows.Close()
  41. for rows.Next() {
  42. dm := &model.DM{}
  43. if err = rows.Scan(&dm.ID, &dm.Type, &dm.Oid, &dm.Mid, &dm.Progress, &dm.State, &dm.Pool, &dm.Attr, &dm.Ctime, &dm.Mtime); err != nil {
  44. log.Error("row.Scan() error(%v)", err)
  45. return
  46. }
  47. res = append(res, dm)
  48. }
  49. return
  50. }
  51. // UpdateDM update index of dm.
  52. func (d *Dao) UpdateDM(c context.Context, m *model.DM) (affect int64, err error) {
  53. res, err := d.dmWriter.Exec(c, fmt.Sprintf(_upIdxSQL, d.hitIndex(m.Oid)), m.Mid, m.Progress, m.State, m.Pool, m.Attr, m.ID)
  54. if err != nil {
  55. log.Error("tx.Exec error(%v)", err)
  56. return
  57. }
  58. return res.RowsAffected()
  59. }
  60. // UpdateDMStates multi update index state of dm.
  61. func (d *Dao) UpdateDMStates(c context.Context, oid int64, dmids []int64, state int32) (affect int64, err error) {
  62. upSQL := fmt.Sprintf(_upIdxStatesSQL, d.hitIndex(oid), xstr.JoinInts(dmids))
  63. res, err := d.dmWriter.Exec(c, upSQL, state)
  64. if err != nil {
  65. log.Error("db.Exec(%s) error(%v)", upSQL, err)
  66. return
  67. }
  68. return res.RowsAffected()
  69. }