rows.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package canal
  2. import (
  3. "fmt"
  4. "github.com/juju/errors"
  5. "github.com/siddontang/go-mysql/schema"
  6. )
  7. const (
  8. UpdateAction = "update"
  9. InsertAction = "insert"
  10. DeleteAction = "delete"
  11. )
  12. type RowsEvent struct {
  13. Table *schema.Table
  14. Action string
  15. // changed row list
  16. // binlog has three update event version, v0, v1 and v2.
  17. // for v1 and v2, the rows number must be even.
  18. // Two rows for one event, format is [before update row, after update row]
  19. // for update v0, only one row for a event, and we don't support this version.
  20. Rows [][]interface{}
  21. }
  22. func newRowsEvent(table *schema.Table, action string, rows [][]interface{}) *RowsEvent {
  23. e := new(RowsEvent)
  24. e.Table = table
  25. e.Action = action
  26. e.Rows = rows
  27. return e
  28. }
  29. // Get primary keys in one row for a table, a table may use multi fields as the PK
  30. func GetPKValues(table *schema.Table, row []interface{}) ([]interface{}, error) {
  31. indexes := table.PKColumns
  32. if len(indexes) == 0 {
  33. return nil, errors.Errorf("table %s has no PK", table)
  34. } else if len(table.Columns) != len(row) {
  35. return nil, errors.Errorf("table %s has %d columns, but row data %v len is %d", table,
  36. len(table.Columns), row, len(row))
  37. }
  38. values := make([]interface{}, 0, len(indexes))
  39. for _, index := range indexes {
  40. values = append(values, row[index])
  41. }
  42. return values, nil
  43. }
  44. // Get term column's value
  45. func GetColumnValue(table *schema.Table, column string, row []interface{}) (interface{}, error) {
  46. index := table.FindColumn(column)
  47. if index == -1 {
  48. return nil, errors.Errorf("table %s has no column name %s", table, column)
  49. }
  50. return row[index], nil
  51. }
  52. // String implements fmt.Stringer interface.
  53. func (r *RowsEvent) String() string {
  54. return fmt.Sprintf("%s %s %v", r.Action, r.Table, r.Rows)
  55. }