1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- package archive
- import (
- "context"
- "database/sql"
- "encoding/json"
- "time"
- "go-common/app/job/main/videoup-report/model/task"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- )
- const (
- _getTaskWeight = "SELECT t.id,t.state,a.mid,t.ctime,t.upspecial,t.ptime,e.description FROM `task_dispatch` AS t " +
- "LEFT JOIN `task_dispatch_extend` AS e ON t.id=e.task_id INNER JOIN archive as a ON a.id=t.aid WHERE t.state=0 AND t.id>? LIMIT 1000"
- _inDispatchExtendSQL = "INSERT INTO task_dispatch_extend(task_id,description) VALUE (?,?)"
- _delTaskExtendSQL = "DELETE FROM task_dispatch_extend WHERE mtime < ? LIMIT ?"
- )
- // GetTaskWeight 从数据库读取权重配置
- func (d *Dao) GetTaskWeight(c context.Context, lastid int64) (mcases map[int64]*task.WeightParams, err error) {
- var (
- rows *xsql.Rows
- desc sql.NullString
- )
- if rows, err = d.db.Query(c, _getTaskWeight, lastid); err != nil {
- log.Error("d.db.Query(%s, %d) error(%v)", _getTaskWeight, lastid, err)
- return
- }
- defer rows.Close()
- mcases = make(map[int64]*task.WeightParams)
- for rows.Next() {
- tp := new(task.WeightParams)
- if err = rows.Scan(&tp.TaskID, &tp.State, &tp.Mid, &tp.Ctime, &tp.Special, &tp.Ptime, &desc); err != nil {
- log.Error("rows.Scan error(%v)", err)
- return
- }
- if desc.Valid && len(desc.String) > 0 {
- arr := []*task.ConfigItem{}
- if err = json.Unmarshal([]byte(desc.String), &arr); err != nil {
- arr = nil
- log.Error("json.Unmarshal error(%v)", err)
- continue
- }
- tp.CfItems = arr
- }
- mcases[tp.TaskID] = tp
- }
- return
- }
- // InDispatchExtend 扩展表,记录权重配置信息
- func (d *Dao) InDispatchExtend(c context.Context, taskid int64, desc string) (lastid int64, err error) {
- res, err := d.db.Exec(c, _inDispatchExtendSQL, taskid, desc)
- if err != nil {
- log.Error("tx.Exec(%s, %d, %v) error(%v)", _inDispatchExtendSQL, taskid, desc, err)
- return
- }
- return res.LastInsertId()
- }
- // DelTaskExtend del task_dispatch_extend
- func (d *Dao) DelTaskExtend(c context.Context, before time.Time, limit int64) (rows int64, err error) {
- res, err := d.db.Exec(c, _delTaskExtendSQL, before.Format("2006-01-02 15:04:05"), limit)
- if err != nil {
- log.Error("d.db.Exec(%s, %s, %d) error(%v)", _delTaskExtendSQL, before, limit, err)
- return
- }
- return res.RowsAffected()
- }
|