tidb_instance.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "go-common/app/infra/canal/conf"
  9. "go-common/app/infra/canal/infoc"
  10. "go-common/app/infra/canal/model"
  11. "go-common/app/infra/canal/service/reader"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. pb "github.com/pingcap/tidb-tools/tidb_binlog/slave_binlog_proto/go-binlog"
  15. )
  16. type tidbInstance struct {
  17. canal *Canal
  18. config *conf.TiDBInsConf
  19. // one instance can have lots of target different by schema and table
  20. targets map[string][]producer
  21. latestOffset int64
  22. // scan latest timestamp, be used check delay
  23. latestTimestamp int64
  24. reader *reader.Reader
  25. err error
  26. closed bool
  27. tables map[string]map[string]*Table
  28. ignoreTables map[string]map[string]bool
  29. waitConsume sync.WaitGroup
  30. waitTable sync.WaitGroup
  31. }
  32. // Table db table
  33. type Table struct {
  34. PrimaryKey []string // kafka msg key
  35. OmitField map[string]bool // field will be ignored in table
  36. OmitAction map[string]bool // action will be ignored in table
  37. name string
  38. ch chan *msg
  39. }
  40. type msg struct {
  41. db string
  42. table string
  43. tableRegexp string
  44. mu *pb.TableMutation
  45. ignore map[string]bool
  46. keys []string
  47. columns []*pb.ColumnInfo
  48. }
  49. // newTiDBInstance new canal instance
  50. func newTiDBInstance(cl *Canal, c *conf.TiDBInsConf) (ins *tidbInstance, err error) {
  51. // new instance
  52. ins = &tidbInstance{
  53. config: c,
  54. canal: cl,
  55. targets: make(map[string][]producer, len(c.Databases)),
  56. tables: make(map[string]map[string]*Table),
  57. ignoreTables: make(map[string]map[string]bool),
  58. }
  59. cfg := &reader.Config{
  60. Name: c.Name,
  61. KafkaAddr: c.Addrs,
  62. Offset: c.Offset,
  63. CommitTS: c.CommitTS,
  64. ClusterID: c.ClusterID,
  65. }
  66. if err = ins.check(); err != nil {
  67. return
  68. }
  69. position, err := cl.dao.TiDBPosition(context.Background(), c.Name)
  70. if err == nil && position != nil {
  71. cfg.Offset = position.Offset
  72. cfg.CommitTS = position.CommitTS
  73. }
  74. ins.latestOffset = cfg.Offset
  75. ins.latestTimestamp = cfg.CommitTS
  76. for _, db := range c.Databases {
  77. if db.Databus != nil {
  78. if ins.targets == nil {
  79. ins.targets = make(map[string][]producer)
  80. }
  81. ins.targets[db.Schema] = append(ins.targets[db.Schema], &databusP{group: db.Databus.Group, topic: db.Databus.Topic, Databus: databus.New(db.Databus)})
  82. }
  83. if db.Infoc != nil {
  84. ins.targets[db.Schema] = append(ins.targets[db.Schema], &infocP{taskID: db.Infoc.TaskID, Infoc: infoc.New(db.Infoc)})
  85. }
  86. }
  87. ins.reader, ins.err = reader.NewReader(cfg)
  88. return ins, ins.err
  89. }
  90. // start start binlog receive
  91. func (ins *tidbInstance) start() {
  92. defer ins.waitConsume.Done()
  93. ins.waitConsume.Add(2)
  94. go ins.reader.Run()
  95. go ins.syncproc()
  96. for msg := range ins.reader.Messages() {
  97. ins.process(msg)
  98. }
  99. }
  100. // close close instance
  101. func (ins *tidbInstance) close() {
  102. if ins.closed {
  103. return
  104. }
  105. ins.closed = true
  106. ins.reader.Close()
  107. ins.waitConsume.Wait()
  108. for _, tables := range ins.tables {
  109. for _, table := range tables {
  110. close(table.ch)
  111. }
  112. }
  113. ins.waitTable.Wait()
  114. ins.sync()
  115. }
  116. // String .
  117. func (ins *tidbInstance) String() string {
  118. return fmt.Sprintf("%s-%s", ins.config.Name, ins.config.ClusterID)
  119. }
  120. func (ins *tidbInstance) process(m *reader.Message) (err error) {
  121. if m.Binlog.Type == pb.BinlogType_DDL {
  122. log.Info("tidb %s got ddl: %s", ins.String(), m.Binlog.DdlData.String())
  123. return
  124. }
  125. for _, table := range m.Binlog.DmlData.Tables {
  126. tb := ins.getTable(table.GetSchemaName(), table.GetTableName())
  127. if tb == nil {
  128. continue
  129. }
  130. for _, mu := range table.Mutations {
  131. action := strings.ToLower(mu.GetType().String())
  132. if tb.OmitAction[action] {
  133. continue
  134. }
  135. tb.ch <- &msg{
  136. db: table.GetSchemaName(),
  137. table: table.GetTableName(),
  138. mu: mu,
  139. ignore: tb.OmitField,
  140. keys: tb.PrimaryKey,
  141. columns: table.ColumnInfo,
  142. tableRegexp: tb.name,
  143. }
  144. if stats != nil {
  145. stats.Incr("syncer_counter", ins.String(), table.GetSchemaName(), tb.name, action)
  146. stats.State("delay_syncer", ins.delay(), ins.String(), tb.name, "", "")
  147. }
  148. }
  149. }
  150. ins.latestOffset = m.Offset
  151. ins.latestTimestamp = m.Binlog.CommitTs
  152. return nil
  153. }
  154. // Error returns instance error.
  155. func (ins *tidbInstance) Error() string {
  156. if ins.err == nil {
  157. return ""
  158. }
  159. return fmt.Sprintf("+%v", ins.err)
  160. }
  161. func (ins *tidbInstance) delay() int64 {
  162. return time.Now().Unix() - ins.latestTimestamp
  163. }
  164. func (ins *tidbInstance) sync() {
  165. info := &model.TiDBInfo{
  166. Name: ins.config.Name,
  167. ClusterID: ins.config.ClusterID,
  168. Offset: ins.latestOffset,
  169. CommitTS: ins.latestTimestamp,
  170. }
  171. ins.canal.dao.UpdateTiDBPosition(context.Background(), info)
  172. }