tidb_data.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package service
  2. import (
  3. "encoding/base64"
  4. "fmt"
  5. "strings"
  6. "go-common/app/infra/canal/model"
  7. pb "github.com/pingcap/tidb-tools/tidb_binlog/slave_binlog_proto/go-binlog"
  8. )
  9. // lower case column field type in mysql
  10. // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
  11. // for numeric type: int bigint smallint tinyint float double decimal bit
  12. // for string type: text longtext mediumtext char tinytext varchar
  13. // blob longblog mediumblog binary tinyblob varbinary
  14. // enum set
  15. // for json type: json
  16. // for text and char type, string_value is set
  17. // for blob and binary type, bytes_value is set
  18. // for enum, set, uint64_value is set
  19. // for json, bytes_value is set
  20. func tidbMakeData(m *msg) (data *model.Data, err error) {
  21. action := m.mu.GetType()
  22. if (action != pb.MutationType_Insert) && (action != pb.MutationType_Delete) && (action != pb.MutationType_Update) {
  23. err = errInvalidAction
  24. return
  25. }
  26. data = &model.Data{
  27. Action: strings.ToLower(action.String()),
  28. Table: m.table,
  29. }
  30. var keys []string
  31. switch action {
  32. case pb.MutationType_Insert, pb.MutationType_Delete:
  33. var values = m.mu.GetRow().GetColumns()
  34. for i, c := range m.columns {
  35. for _, key := range m.keys {
  36. if c.Name == key {
  37. keys = append(keys, columnToString(values[i]))
  38. break
  39. }
  40. }
  41. if m.ignore[c.Name] {
  42. continue
  43. }
  44. if data.New == nil {
  45. data.New = make(map[string]interface{}, len(m.columns))
  46. }
  47. if strings.Contains(c.GetMysqlType(), "binary") {
  48. data.New[c.Name] = base64.StdEncoding.EncodeToString(values[i].GetBytesValue())
  49. continue
  50. }
  51. data.New[c.Name] = columnToValue(values[i])
  52. }
  53. case pb.MutationType_Update:
  54. if m.mu.Row == nil || m.mu.ChangeRow == nil {
  55. err = errInvalidUpdate
  56. return
  57. }
  58. var oldValues = m.mu.GetChangeRow().GetColumns()
  59. var newValues = m.mu.GetRow().GetColumns()
  60. for i, c := range m.columns {
  61. for _, key := range m.keys {
  62. if c.Name == key {
  63. keys = append(keys, columnToString(newValues[i]))
  64. break
  65. }
  66. }
  67. if m.ignore[c.Name] {
  68. continue
  69. }
  70. if data.New == nil {
  71. data.New = make(map[string]interface{}, len(m.columns))
  72. }
  73. if data.Old == nil {
  74. data.Old = make(map[string]interface{}, len(m.columns))
  75. }
  76. if strings.Contains(c.GetMysqlType(), "binary") {
  77. data.Old[c.Name] = base64.StdEncoding.EncodeToString(oldValues[i].GetBytesValue())
  78. data.New[c.Name] = base64.StdEncoding.EncodeToString(newValues[i].GetBytesValue())
  79. continue
  80. }
  81. data.Old[c.Name] = columnToValue(oldValues[i])
  82. data.New[c.Name] = columnToValue(newValues[i])
  83. }
  84. }
  85. if len(keys) == 0 {
  86. data.Key = columnToString(m.mu.GetRow().GetColumns()[0])
  87. } else {
  88. data.Key = strings.Join(keys, ",")
  89. }
  90. if data.New == nil && data.Old == nil {
  91. data = nil
  92. }
  93. return
  94. }
  95. func columnToValue(c *pb.Column) interface{} {
  96. if c.GetIsNull() {
  97. return nil
  98. }
  99. if c.Int64Value != nil {
  100. return c.GetInt64Value()
  101. }
  102. if c.Uint64Value != nil {
  103. return c.GetUint64Value()
  104. }
  105. if c.DoubleValue != nil {
  106. return c.GetDoubleValue()
  107. }
  108. if c.StringValue != nil {
  109. return c.GetStringValue()
  110. }
  111. return c.GetBytesValue()
  112. }
  113. func columnToString(c *pb.Column) string {
  114. return fmt.Sprint(columnToValue(c))
  115. }