transfer.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/job/main/dm2/model"
  6. "go-common/library/log"
  7. )
  8. const (
  9. _selectTransfer = "SELECT id,from_cid,to_cid,mid,offset,state,ctime FROM dm_transfer_job WHERE state=? limit 1"
  10. _updateTransfer = "UPDATE dm_transfer_job SET state=?,dmid=? WHERE id=?"
  11. _idxsSQL = "SELECT id,type,oid,mid,progress,state,pool,attr,ctime,mtime FROM dm_index_%03d FORCE INDEX(ix_oid_state) WHERE type=? AND oid=? AND id >? ORDER BY id limit ?"
  12. )
  13. // Transfers get all transfer job
  14. func (d *Dao) Transfers(c context.Context, state int8) (trans []*model.Transfer, err error) {
  15. rows, err := d.biliDMWriter.Query(c, _selectTransfer, model.StatInit)
  16. if err != nil {
  17. log.Error("d.biliDMWriter.Query(sql:%s) error(%v)", _selectTransfer, err)
  18. return
  19. }
  20. defer rows.Close()
  21. for rows.Next() {
  22. t := &model.Transfer{}
  23. if err = rows.Scan(&t.ID, &t.FromCid, &t.ToCid, &t.Mid, &t.Offset, &t.State, &t.Ctime); err != nil {
  24. log.Error("rows.Scan() error(%v)", err)
  25. return
  26. }
  27. trans = append(trans, t)
  28. }
  29. return
  30. }
  31. // UpdateTransfer change transfer job state.
  32. func (d *Dao) UpdateTransfer(c context.Context, t *model.Transfer) (affect int64, err error) {
  33. row, err := d.biliDMWriter.Exec(c, _updateTransfer, t.State, t.Dmid, t.ID)
  34. if err != nil {
  35. log.Error("d.biliDMWriter.Exec(%+v) error(%v)", t, err)
  36. return
  37. }
  38. return row.RowsAffected()
  39. }
  40. // DMIndexs get dm indexs info
  41. func (d *Dao) DMIndexs(c context.Context, tp int32, oid, minID, limit int64) (idxMap map[int64]*model.DM, dmids, special []int64, err error) {
  42. query := fmt.Sprintf(_idxsSQL, d.hitIndex(oid))
  43. rows, err := d.dmReader.Query(c, query, tp, oid, minID, limit)
  44. if err != nil {
  45. log.Error("db.Query() error(%v)", err)
  46. return
  47. }
  48. defer rows.Close()
  49. idxMap = make(map[int64]*model.DM)
  50. for rows.Next() {
  51. idx := new(model.DM)
  52. if err = rows.Scan(&idx.ID, &idx.Type, &idx.Oid, &idx.Mid, &idx.Progress, &idx.State, &idx.Pool, &idx.Attr, &idx.Ctime, &idx.Mtime); err != nil {
  53. log.Error("row.Scan() error(%v)", err)
  54. return
  55. }
  56. idxMap[idx.ID] = idx
  57. dmids = append(dmids, idx.ID)
  58. if idx.Pool == model.PoolSpecial {
  59. special = append(special, idx.ID)
  60. }
  61. }
  62. return
  63. }