databus.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package archive
  2. import (
  3. "context"
  4. "go-common/app/job/main/videoup/model/archive"
  5. "go-common/library/database/sql"
  6. "go-common/library/log"
  7. )
  8. const (
  9. _dbusSQL = "SELECT id,gp,topic,part,last_offset FROM archive_databus WHERE gp=? AND topic=? AND part=?"
  10. _inDBusSQL = "INSERT INTO archive_databus(gp,topic,part,last_offset) VALUES(?,?,?,?)"
  11. _upDBusSQL = "UPDATE archive_databus SET last_offset=? WHERE gp=? AND topic=? AND part=?"
  12. )
  13. // DBus get DBus by group+topic+partition
  14. func (d *Dao) DBus(c context.Context, group, topic string, partition int32) (dbus *archive.Databus, err error) {
  15. row := d.db.QueryRow(c, _dbusSQL, group, topic, partition)
  16. dbus = &archive.Databus{}
  17. if err = row.Scan(&dbus.ID, &dbus.Group, &dbus.Topic, &dbus.Partition, &dbus.Offset); err != nil {
  18. if err == sql.ErrNoRows {
  19. dbus = nil
  20. err = nil
  21. } else {
  22. log.Error("row.Scan error(%v)", err)
  23. }
  24. }
  25. return
  26. }
  27. // AddDBus add databus
  28. func (d *Dao) AddDBus(c context.Context, group, topic string, partition int32, offset int64) (rows int64, err error) {
  29. res, err := d.db.Exec(c, _inDBusSQL, group, topic, partition, offset)
  30. if err != nil {
  31. log.Error("d.db.Exec(%s, %s, %d, %d) error(%v)", group, topic, partition, offset, err)
  32. return
  33. }
  34. return res.RowsAffected()
  35. }
  36. // UpDBus update databus offset
  37. func (d *Dao) UpDBus(c context.Context, group, topic string, partition int32, offset int64) (rows int64, err error) {
  38. res, err := d.db.Exec(c, _upDBusSQL, offset, group, topic, partition)
  39. if err != nil {
  40. log.Error("d.db.Exec(%d, %s, %s, %d) error(%v)", offset, group, topic, partition, err)
  41. return
  42. }
  43. return res.RowsAffected()
  44. }