config_offset.go 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "go-common/app/job/main/search/model"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. xtime "go-common/library/time"
  12. )
  13. const (
  14. _getOffsetSQL = "SELECT offset_incr_id,offset_incr_time,review_incr_id,review_icnr_time FROM digger_offset WHERE project=? AND table_name=?"
  15. //TODO 有问题,字段少了
  16. _updateOffsetSQL = "UPDATE digger_offset SET offset_incr_id=?,offset_incr_time=?,mtime=? WHERE project=? AND table_name=?"
  17. //_initOffsetSQL = "INSERT INTO digger_offset(project,table_name,offset_incr_time,offset_recover_id,offset_recover_time) VALUES(?,?,?,?,?,?) " +
  18. // "ON DUPLICATE KEY UPDATE offset_recover_id=?, offset_recover_time=?"
  19. )
  20. // Offset get offset
  21. func (d *Dao) Offset(c context.Context, appid, tableName string) (res *model.Offset, err error) {
  22. res = new(model.Offset)
  23. row := d.SearchDB.QueryRow(c, _getOffsetSQL, appid, tableName)
  24. if err = row.Scan(&res.OffID, &res.OffTime, &res.ReviewID, &res.ReviewTime); err != nil {
  25. log.Error("OffsetID row.Scan error(%v)", err)
  26. if err == sql.ErrNoRows {
  27. err = nil
  28. res.OffID = 1
  29. res.OffTime = xtime.Time(time.Now().Unix())
  30. return
  31. }
  32. log.Error("offset row.Scan error(%v)", err)
  33. }
  34. return
  35. }
  36. // updateOffset update offset
  37. func (d *Dao) updateOffset(c context.Context, offset *model.LoopOffset, appid, tableName string) (err error) {
  38. nowFormat := time.Now().Format("2006-01-02 15:04:05")
  39. if _, err = d.SearchDB.Exec(c, _updateOffsetSQL, offset.OffsetID, offset.OffsetTime, nowFormat, appid, tableName); err != nil {
  40. log.Error("updateOffset Exec() error(%v)", err)
  41. }
  42. return
  43. }
  44. // bulkInitOffset .
  45. func (d *Dao) bulkInitOffset(c context.Context, offset *model.LoopOffset, attrs *model.Attrs, arr []string) (err error) {
  46. var (
  47. values = []string{}
  48. nowFormat = time.Now().Format("2006-01-02 15:04:05")
  49. insertOffsetSQL = "INSERT INTO digger_offset(project,table_name,table_suffix,offset_incr_time,offset_recover_id,offset_recover_time) VALUES"
  50. )
  51. if len(arr) == 0 {
  52. for i := attrs.Table.TableFrom; i <= attrs.Table.TableTo; i++ {
  53. if attrs.Table.TableTo == 0 {
  54. arr = append(arr, attrs.Table.TablePrefix)
  55. } else {
  56. arr = append(arr, fmt.Sprintf("%s%0"+attrs.Table.TableZero+"d", attrs.Table.TablePrefix, i))
  57. }
  58. }
  59. }
  60. for _, v := range arr {
  61. // TODO why???
  62. // table := attrs.Table.TablePrefix + v
  63. // value := "('" + attrs.AppID + "','" + table + "','" + v + "','" + nowFormat + "'," + strconv.FormatInt(offset.RecoverID, 10) + ",'" + offset.RecoverTime + "')"
  64. value := "('" + attrs.AppID + "','" + v + "','" + attrs.Table.TablePrefix + "','" + nowFormat + "'," + strconv.FormatInt(offset.RecoverID, 10) + ",'" + offset.RecoverTime + "')"
  65. values = append(values, value)
  66. }
  67. valueStr := strings.Join(values, ",") + " ON DUPLICATE KEY UPDATE offset_recover_id=VALUES(offset_recover_id),offset_recover_time=VALUES(offset_recover_time)"
  68. bulkInserSQL := insertOffsetSQL + valueStr
  69. _, err = d.SearchDB.Exec(c, bulkInserSQL)
  70. return
  71. }