reader.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Copyright 2018 PingCAP, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package reader
  14. import (
  15. "fmt"
  16. "go-common/library/log"
  17. "github.com/Shopify/sarama"
  18. pb "github.com/pingcap/tidb-tools/tidb_binlog/slave_binlog_proto/go-binlog"
  19. pkgerr "github.com/pkg/errors"
  20. )
  21. func init() {
  22. // log.SetLevel(log.LOG_LEVEL_NONE)
  23. sarama.MaxResponseSize = 1 << 30
  24. }
  25. // Config for Reader
  26. type Config struct {
  27. KafkaAddr []string
  28. // the CommitTs of binlog return by reader will bigger than the config CommitTs
  29. CommitTS int64
  30. Offset int64 // start at kafka offset
  31. ClusterID string
  32. Name string
  33. }
  34. // Message read from reader
  35. type Message struct {
  36. Binlog *pb.Binlog
  37. Offset int64 // kafka offset
  38. }
  39. // Reader to read binlog from kafka
  40. type Reader struct {
  41. cfg *Config
  42. client sarama.Client
  43. msgs chan *Message
  44. stop chan struct{}
  45. clusterID string
  46. }
  47. func (r *Reader) getTopic() (string, int32) {
  48. return r.cfg.ClusterID + "_obinlog", 0
  49. }
  50. func (r *Reader) name() string {
  51. return fmt.Sprintf("%s-%s", r.cfg.Name, r.cfg.ClusterID)
  52. }
  53. // NewReader creates an instance of Reader
  54. func NewReader(cfg *Config) (r *Reader, err error) {
  55. r = &Reader{
  56. cfg: cfg,
  57. stop: make(chan struct{}),
  58. msgs: make(chan *Message, 1024),
  59. clusterID: cfg.ClusterID,
  60. }
  61. r.client, err = sarama.NewClient(r.cfg.KafkaAddr, nil)
  62. if err != nil {
  63. err = pkgerr.WithStack(err)
  64. r = nil
  65. return
  66. }
  67. if (r.cfg.Offset == 0) && (r.cfg.CommitTS > 0) {
  68. r.cfg.Offset, err = r.getOffsetByTS(r.cfg.CommitTS)
  69. if err != nil {
  70. err = pkgerr.WithStack(err)
  71. r = nil
  72. return
  73. }
  74. log.Info("tidb %s: set offset to: %v", r.name(), r.cfg.Offset)
  75. }
  76. return
  77. }
  78. // Close shuts down the reader
  79. func (r *Reader) Close() {
  80. close(r.stop)
  81. r.client.Close()
  82. }
  83. // Messages returns a chan that contains unread buffered message
  84. func (r *Reader) Messages() (msgs <-chan *Message) {
  85. return r.msgs
  86. }
  87. func (r *Reader) getOffsetByTS(ts int64) (offset int64, err error) {
  88. seeker, err := NewKafkaSeeker(r.cfg.KafkaAddr, nil)
  89. if err != nil {
  90. err = pkgerr.WithStack(err)
  91. return
  92. }
  93. topic, partition := r.getTopic()
  94. offsets, err := seeker.Seek(topic, ts, []int32{partition})
  95. if err != nil {
  96. err = pkgerr.WithStack(err)
  97. return
  98. }
  99. offset = offsets[0]
  100. return
  101. }
  102. // Run start consume msg
  103. func (r *Reader) Run() {
  104. offset := r.cfg.Offset
  105. log.Info("tidb %s start at offset: %v", r.name(), offset)
  106. consumer, err := sarama.NewConsumerFromClient(r.client)
  107. if err != nil {
  108. log.Error("tidb %s NewConsumerFromClient err: %v", r.name(), err)
  109. return
  110. }
  111. defer consumer.Close()
  112. topic, partition := r.getTopic()
  113. partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)
  114. if err != nil {
  115. log.Error("tidb %s ConsumePartition err: %v", r.name(), err)
  116. return
  117. }
  118. defer partitionConsumer.Close()
  119. for {
  120. select {
  121. case <-r.stop:
  122. partitionConsumer.Close()
  123. close(r.msgs)
  124. log.Info("tidb %s reader stop to run", r.name())
  125. return
  126. case kmsg, ok := <-partitionConsumer.Messages():
  127. if !ok {
  128. close(r.msgs)
  129. log.Info("tidb %s reader stop to run because partitionConsumer close", r.name())
  130. return
  131. }
  132. if kmsg == nil {
  133. continue
  134. }
  135. log.Info("tidb %s get kmsg offset: %v", r.name(), kmsg.Offset)
  136. binlog := new(pb.Binlog)
  137. err := binlog.Unmarshal(kmsg.Value)
  138. if err != nil {
  139. log.Warn("%s unmarshal err %+v", r.name(), err)
  140. continue
  141. }
  142. if r.cfg.CommitTS > 0 && binlog.CommitTs <= r.cfg.CommitTS {
  143. log.Warn("%s skip binlog CommitTs: ", r.name(), binlog.CommitTs)
  144. continue
  145. }
  146. r.msgs <- &Message{
  147. Binlog: binlog,
  148. Offset: kmsg.Offset,
  149. }
  150. }
  151. }
  152. }