mysql.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "time"
  6. "go-common/app/service/main/sms/model"
  7. xsql "go-common/library/database/sql"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _templateByStatusSQL = "SELECT id,code,stype,template,status,submitter,approver FROM sms_template_new WHERE status=?"
  12. _taskSQL = "SELECT id,type,business_id,template_code,`desc`,file_path,file_name,send_time,status,ctime,mtime FROM sms_tasks WHERE status=? AND send_time<=? LIMIT 1 FOR UPDATE"
  13. _upadteTaskStatusSQL = "UPDATE sms_tasks SET status=? WHERE id=?"
  14. )
  15. // TemplateByStatus select template by status
  16. func (d *Dao) TemplateByStatus(ctx context.Context, status int) (res []*model.ModelTemplate, err error) {
  17. var rows *xsql.Rows
  18. if rows, err = d.db.Query(ctx, _templateByStatusSQL, status); err != nil {
  19. log.Error("d.TemplateByStatus.Query(%d) error(%v)", status, err)
  20. return
  21. }
  22. defer rows.Close()
  23. for rows.Next() {
  24. r := new(model.ModelTemplate)
  25. if err = rows.Scan(&r.ID, &r.Code, &r.Stype, &r.Template, &r.Status, &r.Submitter, &r.Approver); err != nil {
  26. log.Error("row.Scan() error(%v)", err)
  27. res = nil
  28. return
  29. }
  30. res = append(res, r)
  31. }
  32. err = rows.Err()
  33. return
  34. }
  35. // BeginTx begin transaction.
  36. func (d *Dao) BeginTx(c context.Context) (*xsql.Tx, error) {
  37. return d.db.Begin(c)
  38. }
  39. // TxTask gets prepared task.
  40. func (d *Dao) TxTask(tx *xsql.Tx) (t *model.ModelTask, err error) {
  41. t = new(model.ModelTask)
  42. if err = tx.QueryRow(_taskSQL, model.TaskStatusPrepared, time.Now()).Scan(&t.ID, &t.Type, &t.BusinessID,
  43. &t.TemplateCode, &t.Desc, &t.FilePath, &t.FileName, &t.SendTime, &t.Status, &t.Ctime, &t.Mtime); err != nil {
  44. t = nil
  45. if err == sql.ErrNoRows {
  46. err = nil
  47. return
  48. }
  49. log.Error("d.TxTask() QueryRow() error(%v)", err)
  50. }
  51. return
  52. }
  53. // TxUpdateTaskStatus updates task status by tx.
  54. func (d *Dao) TxUpdateTaskStatus(tx *xsql.Tx, taskID int64, status int32) (err error) {
  55. if _, err = tx.Exec(_upadteTaskStatusSQL, status, taskID); err != nil {
  56. log.Error("d.TxUpdateTaskStatus() Exec(%s,%d) error(%v)", taskID, status, err)
  57. }
  58. return
  59. }
  60. // UpdateTaskStatus updates task status.
  61. func (d *Dao) UpdateTaskStatus(ctx context.Context, taskID int64, status int32) (err error) {
  62. if _, err = d.db.Exec(ctx, _upadteTaskStatusSQL, status, taskID); err != nil {
  63. log.Error("d.UpdateTaskStatus() Exec(%s,%d) error(%v)", taskID, status, err)
  64. }
  65. return
  66. }