offset.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Copyright 2018 PingCAP, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package reader
  14. import (
  15. "go-common/library/log"
  16. "github.com/Shopify/sarama"
  17. pb "github.com/pingcap/tidb-tools/tidb_binlog/slave_binlog_proto/go-binlog"
  18. pkgerr "github.com/pkg/errors"
  19. )
  20. // KafkaSeeker seeks offset in kafka topics by given condition
  21. type KafkaSeeker struct {
  22. consumer sarama.Consumer
  23. client sarama.Client
  24. }
  25. // NewKafkaSeeker creates an instance of KafkaSeeker
  26. func NewKafkaSeeker(addr []string, config *sarama.Config) (*KafkaSeeker, error) {
  27. client, err := sarama.NewClient(addr, config)
  28. if err != nil {
  29. return nil, pkgerr.WithStack(err)
  30. }
  31. consumer, err := sarama.NewConsumerFromClient(client)
  32. if err != nil {
  33. return nil, pkgerr.WithStack(err)
  34. }
  35. s := &KafkaSeeker{
  36. client: client,
  37. consumer: consumer,
  38. }
  39. return s, nil
  40. }
  41. // Close releases resources of KafkaSeeker
  42. func (ks *KafkaSeeker) Close() {
  43. ks.consumer.Close()
  44. ks.client.Close()
  45. }
  46. // Seek seeks the first offset which binlog CommitTs bigger than ts
  47. func (ks *KafkaSeeker) Seek(topic string, ts int64, partitions []int32) (offsets []int64, err error) {
  48. if len(partitions) == 0 {
  49. partitions, err = ks.consumer.Partitions(topic)
  50. if err != nil {
  51. log.Error("tidb get partitions from topic %s error %v", topic, err)
  52. return nil, pkgerr.WithStack(err)
  53. }
  54. }
  55. offsets, err = ks.seekOffsets(topic, partitions, ts)
  56. if err != nil {
  57. err = pkgerr.WithStack(err)
  58. log.Error("tidb seek offsets error %v", err)
  59. }
  60. return
  61. }
  62. func (ks *KafkaSeeker) getTSFromMSG(msg *sarama.ConsumerMessage) (ts int64, err error) {
  63. binlog := new(pb.Binlog)
  64. err = binlog.Unmarshal(msg.Value)
  65. if err != nil {
  66. err = pkgerr.WithStack(err)
  67. return
  68. }
  69. return binlog.CommitTs, nil
  70. }
  71. // seekOffsets returns all valid offsets in partitions
  72. func (ks *KafkaSeeker) seekOffsets(topic string, partitions []int32, pos int64) ([]int64, error) {
  73. offsets := make([]int64, len(partitions))
  74. for _, partition := range partitions {
  75. start, err := ks.client.GetOffset(topic, partition, sarama.OffsetOldest)
  76. if err != nil {
  77. err = pkgerr.WithStack(err)
  78. return nil, err
  79. }
  80. end, err := ks.client.GetOffset(topic, partition, sarama.OffsetNewest)
  81. if err != nil {
  82. err = pkgerr.WithStack(err)
  83. return nil, err
  84. }
  85. offset, err := ks.seekOffset(topic, partition, start, end-1, pos)
  86. if err != nil {
  87. err = pkgerr.WithStack(err)
  88. return nil, err
  89. }
  90. offsets[partition] = offset
  91. }
  92. return offsets, nil
  93. }
  94. func (ks *KafkaSeeker) seekOffset(topic string, partition int32, start int64, end int64, ts int64) (offset int64, err error) {
  95. startTS, err := ks.getTSAtOffset(topic, partition, start)
  96. if err != nil {
  97. err = pkgerr.WithStack(err)
  98. return
  99. }
  100. if ts < startTS {
  101. log.Warn("given ts %v is smaller than oldest message's ts %v, some binlogs may lose", ts, startTS)
  102. offset = start
  103. return
  104. } else if ts == startTS {
  105. offset = start + 1
  106. return
  107. }
  108. for start < end {
  109. mid := (end-start)/2 + start
  110. var midTS int64
  111. midTS, err = ks.getTSAtOffset(topic, partition, mid)
  112. if err != nil {
  113. err = pkgerr.WithStack(err)
  114. return
  115. }
  116. if midTS <= ts {
  117. start = mid + 1
  118. } else {
  119. end = mid
  120. }
  121. }
  122. var endTS int64
  123. endTS, err = ks.getTSAtOffset(topic, partition, end)
  124. if err != nil {
  125. err = pkgerr.WithStack(err)
  126. return
  127. }
  128. if endTS <= ts {
  129. return sarama.OffsetNewest, nil
  130. }
  131. return end, nil
  132. }
  133. func (ks *KafkaSeeker) getTSAtOffset(topic string, partition int32, offset int64) (ts int64, err error) {
  134. pc, err := ks.consumer.ConsumePartition(topic, partition, offset)
  135. if err != nil {
  136. err = pkgerr.WithStack(err)
  137. return
  138. }
  139. defer pc.Close()
  140. for msg := range pc.Messages() {
  141. ts, err = ks.getTSFromMSG(msg)
  142. err = pkgerr.WithStack(err)
  143. return
  144. }
  145. panic("unreachable")
  146. }