dataplatform.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package dao
  2. import (
  3. "context"
  4. "go-common/app/admin/main/push/model"
  5. "go-common/library/database/sql"
  6. "go-common/library/ecode"
  7. "go-common/library/log"
  8. )
  9. const (
  10. _dpConditionSQL = `select id,job,task,conditions,sql_stmt,status,status_url,file from push_dataplatform_conditions where job=?`
  11. _addDpConditionSQL = `insert into push_dataplatform_conditions (job,task,conditions,sql_stmt,status,status_url,file) values (?,?,?,?,?,?,?) on duplicate key update task=?,conditions=?,sql_stmt=?,status=?,status_url=?,file=?`
  12. _updateDpConditionStatusSQL = `update push_dataplatform_conditions set status=? where job=?`
  13. )
  14. // AddDPCondition add data platform task
  15. func (d *Dao) AddDPCondition(ctx context.Context, cond *model.DPCondition) (id int64, err error) {
  16. res, err := d.db.Exec(ctx, _addDpConditionSQL, cond.Job, cond.Task, cond.Condition, cond.SQL, cond.Status, cond.StatusURL, cond.File,
  17. cond.Task, cond.Condition, cond.SQL, cond.Status, cond.StatusURL, cond.File)
  18. if err != nil {
  19. log.Error("d.AddDPCondition(%+v) error(%v)", cond, err)
  20. return
  21. }
  22. id, err = res.LastInsertId()
  23. return
  24. }
  25. // DPCondition .
  26. func (d *Dao) DPCondition(ctx context.Context, job string) (c *model.DPCondition, err error) {
  27. c = new(model.DPCondition)
  28. if err = d.db.QueryRow(ctx, _dpConditionSQL, job).Scan(&c.ID, &c.Job, &c.Task, &c.Condition, &c.SQL, &c.Status, &c.StatusURL, &c.File); err != nil {
  29. if err == sql.ErrNoRows {
  30. c = nil
  31. err = nil
  32. }
  33. return
  34. }
  35. return
  36. }
  37. // UpdateDpCondtionStatus .
  38. func (d *Dao) UpdateDpCondtionStatus(ctx context.Context, job string, status int) (err error) {
  39. _, err = d.db.Exec(ctx, _updateDpConditionStatusSQL, status, job)
  40. return
  41. }
  42. // Partitions 获取一级分区数据
  43. func (d *Dao) Partitions(ctx context.Context) (m map[int]string, err error) {
  44. var res = struct {
  45. Code int `json:"code"`
  46. Data map[int]struct {
  47. ID int `json:"id"`
  48. Pid int `json:"pid"`
  49. Name string `json:"name"`
  50. } `json:"data"`
  51. }{}
  52. if err = d.httpClient.Get(ctx, d.c.Cfg.PartitionsURL, "", nil, &res); err != nil {
  53. return
  54. }
  55. if !ecode.Int(res.Code).Equal(ecode.OK) {
  56. err = ecode.Int(res.Code)
  57. return
  58. }
  59. m = make(map[int]string)
  60. for _, v := range res.Data {
  61. if v.Pid != 0 {
  62. continue
  63. }
  64. m[v.ID] = v.Name
  65. }
  66. return
  67. }