flow_design.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package archive
  2. import (
  3. "context"
  4. "go-common/app/job/main/videoup-report/model/archive"
  5. "go-common/library/database/sql"
  6. "go-common/library/log"
  7. )
  8. const (
  9. _oidFlowCount = "SELECT COUNT(*) as count FROM flow_design WHERE state = 0 AND pool = ? AND group_id = ? AND oid = ?"
  10. _inFlowSQL = "INSERT into flow_design(pool,oid,group_id,uid,remark) VALUES (?,?,?,?,?)"
  11. _inFlowLogSQL = "INSERT into flow_design_log(pool,oid,group_id,uid,action,remark) VALUES (?,?,?,?,?,?)"
  12. _upFlowStateSQL = "UPDATE flow_design SET state=? WHERE id=?"
  13. _flowUniqueSQL = "SELECT id,pool,oid,group_id,parent,state FROM flow_design WHERE oid=? AND pool=? AND group_id=? LIMIT 1"
  14. )
  15. // HasFlowGroup check if has flow group record
  16. func (d *Dao) HasFlowGroup(c context.Context, pool int, gid, oid int64) (has bool, err error) {
  17. var (
  18. count int
  19. )
  20. row := d.db.QueryRow(c, _oidFlowCount, pool, gid, oid)
  21. if err = row.Scan(&count); err != nil {
  22. log.Error("d.hasFlowGroup err(%v)", err)
  23. return
  24. }
  25. has = count > 0
  26. return
  27. }
  28. // TxAddFlow tx add flow_design.
  29. func (d *Dao) TxAddFlow(tx *sql.Tx, pool int8, oid, uid, groupID int64, remark string) (id int64, err error) {
  30. res, err := tx.Exec(_inFlowSQL, pool, oid, groupID, uid, remark)
  31. if err != nil {
  32. log.Error("d.TxAddFlow.Exec() error(%v)", err)
  33. return
  34. }
  35. id, err = res.LastInsertId()
  36. return
  37. }
  38. // TxAddFlowLog tx add flow_design log.
  39. func (d *Dao) TxAddFlowLog(tx *sql.Tx, pool, action int8, oid, uid, groupID int64, remark string) (id int64, err error) {
  40. res, err := tx.Exec(_inFlowLogSQL, pool, oid, groupID, uid, action, remark)
  41. if err != nil {
  42. log.Error("d._inFlowLog.Exec() error(%v)", err)
  43. return
  44. }
  45. id, err = res.LastInsertId()
  46. return
  47. }
  48. // TxUpFlowState 更新pool!=1的流量套餐资源的状态
  49. // return int64, error/nil
  50. func (d *Dao) TxUpFlowState(tx *sql.Tx, id int64, state int8) (rows int64, err error) {
  51. res, err := tx.Exec(_upFlowStateSQL, state, id)
  52. if err != nil {
  53. log.Error("TxUpFlowState.Exec() error(%v)", err)
  54. return
  55. }
  56. rows, err = res.RowsAffected()
  57. return
  58. }
  59. // FlowUnique 获取命中 指定流量套餐的记录
  60. // return *archive.FlowData/nil, error/nil
  61. func (d *Dao) FlowUnique(c context.Context, oid, groupID int64, pool int8) (f *archive.FlowData, err error) {
  62. f = &archive.FlowData{}
  63. if err = d.db.QueryRow(context.TODO(), _flowUniqueSQL, oid, pool, groupID).Scan(&f.ID, &f.Pool, &f.OID, &f.GroupID, &f.Parent, &f.State); err != nil {
  64. if err == sql.ErrNoRows {
  65. err = nil
  66. f = nil
  67. } else {
  68. log.Error("row.Scan error(%v)", err)
  69. }
  70. }
  71. return
  72. }