123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- // Copyright 2018 PingCAP, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package reader
- import (
- "fmt"
- "go-common/library/log"
- "github.com/Shopify/sarama"
- pb "github.com/pingcap/tidb-tools/tidb_binlog/slave_binlog_proto/go-binlog"
- pkgerr "github.com/pkg/errors"
- )
- func init() {
- // log.SetLevel(log.LOG_LEVEL_NONE)
- sarama.MaxResponseSize = 1 << 30
- }
- // Config for Reader
- type Config struct {
- KafkaAddr []string
- // the CommitTs of binlog return by reader will bigger than the config CommitTs
- CommitTS int64
- Offset int64 // start at kafka offset
- ClusterID string
- Name string
- }
- // Message read from reader
- type Message struct {
- Binlog *pb.Binlog
- Offset int64 // kafka offset
- }
- // Reader to read binlog from kafka
- type Reader struct {
- cfg *Config
- client sarama.Client
- msgs chan *Message
- stop chan struct{}
- clusterID string
- }
- func (r *Reader) getTopic() (string, int32) {
- return r.cfg.ClusterID + "_obinlog", 0
- }
- func (r *Reader) name() string {
- return fmt.Sprintf("%s-%s", r.cfg.Name, r.cfg.ClusterID)
- }
- // NewReader creates an instance of Reader
- func NewReader(cfg *Config) (r *Reader, err error) {
- r = &Reader{
- cfg: cfg,
- stop: make(chan struct{}),
- msgs: make(chan *Message, 1024),
- clusterID: cfg.ClusterID,
- }
- r.client, err = sarama.NewClient(r.cfg.KafkaAddr, nil)
- if err != nil {
- err = pkgerr.WithStack(err)
- r = nil
- return
- }
- if (r.cfg.Offset == 0) && (r.cfg.CommitTS > 0) {
- r.cfg.Offset, err = r.getOffsetByTS(r.cfg.CommitTS)
- if err != nil {
- err = pkgerr.WithStack(err)
- r = nil
- return
- }
- log.Info("tidb %s: set offset to: %v", r.name(), r.cfg.Offset)
- }
- return
- }
- // Close shuts down the reader
- func (r *Reader) Close() {
- close(r.stop)
- r.client.Close()
- }
- // Messages returns a chan that contains unread buffered message
- func (r *Reader) Messages() (msgs <-chan *Message) {
- return r.msgs
- }
- func (r *Reader) getOffsetByTS(ts int64) (offset int64, err error) {
- seeker, err := NewKafkaSeeker(r.cfg.KafkaAddr, nil)
- if err != nil {
- err = pkgerr.WithStack(err)
- return
- }
- topic, partition := r.getTopic()
- offsets, err := seeker.Seek(topic, ts, []int32{partition})
- if err != nil {
- err = pkgerr.WithStack(err)
- return
- }
- offset = offsets[0]
- return
- }
- // Run start consume msg
- func (r *Reader) Run() {
- offset := r.cfg.Offset
- log.Info("tidb %s start at offset: %v", r.name(), offset)
- consumer, err := sarama.NewConsumerFromClient(r.client)
- if err != nil {
- log.Error("tidb %s NewConsumerFromClient err: %v", r.name(), err)
- return
- }
- defer consumer.Close()
- topic, partition := r.getTopic()
- partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)
- if err != nil {
- log.Error("tidb %s ConsumePartition err: %v", r.name(), err)
- return
- }
- defer partitionConsumer.Close()
- for {
- select {
- case <-r.stop:
- partitionConsumer.Close()
- close(r.msgs)
- log.Info("tidb %s reader stop to run", r.name())
- return
- case kmsg, ok := <-partitionConsumer.Messages():
- if !ok {
- close(r.msgs)
- log.Info("tidb %s reader stop to run because partitionConsumer close", r.name())
- return
- }
- if kmsg == nil {
- continue
- }
- log.Info("tidb %s get kmsg offset: %v", r.name(), kmsg.Offset)
- binlog := new(pb.Binlog)
- err := binlog.Unmarshal(kmsg.Value)
- if err != nil {
- log.Warn("%s unmarshal err %+v", r.name(), err)
- continue
- }
- if r.cfg.CommitTS > 0 && binlog.CommitTs <= r.cfg.CommitTS {
- log.Warn("%s skip binlog CommitTs: ", r.name(), binlog.CommitTs)
- continue
- }
- r.msgs <- &Message{
- Binlog: binlog,
- Offset: kmsg.Offset,
- }
- }
- }
- }
|