async_producer.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/eapache/go-resiliency/breaker"
  8. "github.com/eapache/queue"
  9. )
  10. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  11. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  12. // and parses responses for errors. You must read from the Errors() channel or the
  13. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  14. // leaks: it will not be garbage-collected automatically when it passes out of
  15. // scope.
  16. type AsyncProducer interface {
  17. // AsyncClose triggers a shutdown of the producer. The shutdown has completed
  18. // when both the Errors and Successes channels have been closed. When calling
  19. // AsyncClose, you *must* continue to read from those channels in order to
  20. // drain the results of any messages in flight.
  21. AsyncClose()
  22. // Close shuts down the producer and waits for any buffered messages to be
  23. // flushed. You must call this function before a producer object passes out of
  24. // scope, as it may otherwise leak memory. You must call this before calling
  25. // Close on the underlying client.
  26. Close() error
  27. // Input is the input channel for the user to write messages to that they
  28. // wish to send.
  29. Input() chan<- *ProducerMessage
  30. // Successes is the success output channel back to the user when Return.Successes is
  31. // enabled. If Return.Successes is true, you MUST read from this channel or the
  32. // Producer will deadlock. It is suggested that you send and read messages
  33. // together in a single select statement.
  34. Successes() <-chan *ProducerMessage
  35. // Errors is the error output channel back to the user. You MUST read from this
  36. // channel or the Producer will deadlock when the channel is full. Alternatively,
  37. // you can set Producer.Return.Errors in your config to false, which prevents
  38. // errors to be returned.
  39. Errors() <-chan *ProducerError
  40. }
  41. type asyncProducer struct {
  42. client Client
  43. conf *Config
  44. ownClient bool
  45. errors chan *ProducerError
  46. input, successes, retries chan *ProducerMessage
  47. inFlight sync.WaitGroup
  48. brokers map[*Broker]chan<- *ProducerMessage
  49. brokerRefs map[chan<- *ProducerMessage]int
  50. brokerLock sync.Mutex
  51. }
  52. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  53. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  54. client, err := NewClient(addrs, conf)
  55. if err != nil {
  56. return nil, err
  57. }
  58. p, err := NewAsyncProducerFromClient(client)
  59. if err != nil {
  60. return nil, err
  61. }
  62. p.(*asyncProducer).ownClient = true
  63. return p, nil
  64. }
  65. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  66. // necessary to call Close() on the underlying client when shutting down this producer.
  67. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  68. // Check that we are not dealing with a closed Client before processing any other arguments
  69. if client.Closed() {
  70. return nil, ErrClosedClient
  71. }
  72. p := &asyncProducer{
  73. client: client,
  74. conf: client.Config(),
  75. errors: make(chan *ProducerError),
  76. input: make(chan *ProducerMessage),
  77. successes: make(chan *ProducerMessage),
  78. retries: make(chan *ProducerMessage),
  79. brokers: make(map[*Broker]chan<- *ProducerMessage),
  80. brokerRefs: make(map[chan<- *ProducerMessage]int),
  81. }
  82. // launch our singleton dispatchers
  83. go withRecover(p.dispatcher)
  84. go withRecover(p.retryHandler)
  85. return p, nil
  86. }
  87. type flagSet int8
  88. const (
  89. syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
  90. fin // final message from partitionProducer to brokerProducer and back
  91. shutdown // start the shutdown process
  92. )
  93. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  94. type ProducerMessage struct {
  95. Topic string // The Kafka topic for this message.
  96. // The partitioning key for this message. Pre-existing Encoders include
  97. // StringEncoder and ByteEncoder.
  98. Key Encoder
  99. // The actual message to store in Kafka. Pre-existing Encoders include
  100. // StringEncoder and ByteEncoder.
  101. Value Encoder
  102. // The headers are key-value pairs that are transparently passed
  103. // by Kafka between producers and consumers.
  104. Headers []RecordHeader
  105. // This field is used to hold arbitrary data you wish to include so it
  106. // will be available when receiving on the Successes and Errors channels.
  107. // Sarama completely ignores this field and is only to be used for
  108. // pass-through data.
  109. Metadata interface{}
  110. // Below this point are filled in by the producer as the message is processed
  111. // Offset is the offset of the message stored on the broker. This is only
  112. // guaranteed to be defined if the message was successfully delivered and
  113. // RequiredAcks is not NoResponse.
  114. Offset int64
  115. // Partition is the partition that the message was sent to. This is only
  116. // guaranteed to be defined if the message was successfully delivered.
  117. Partition int32
  118. // Timestamp is the timestamp assigned to the message by the broker. This
  119. // is only guaranteed to be defined if the message was successfully
  120. // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
  121. // least version 0.10.0.
  122. Timestamp time.Time
  123. retries int
  124. flags flagSet
  125. expectation chan *ProducerError
  126. }
  127. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  128. func (m *ProducerMessage) byteSize(version int) int {
  129. var size int
  130. if version >= 2 {
  131. size = maximumRecordOverhead
  132. for _, h := range m.Headers {
  133. size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
  134. }
  135. } else {
  136. size = producerMessageOverhead
  137. }
  138. if m.Key != nil {
  139. size += m.Key.Length()
  140. }
  141. if m.Value != nil {
  142. size += m.Value.Length()
  143. }
  144. return size
  145. }
  146. func (m *ProducerMessage) clear() {
  147. m.flags = 0
  148. m.retries = 0
  149. }
  150. // ProducerError is the type of error generated when the producer fails to deliver a message.
  151. // It contains the original ProducerMessage as well as the actual error value.
  152. type ProducerError struct {
  153. Msg *ProducerMessage
  154. Err error
  155. }
  156. func (pe ProducerError) Error() string {
  157. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  158. }
  159. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  160. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  161. // when closing a producer.
  162. type ProducerErrors []*ProducerError
  163. func (pe ProducerErrors) Error() string {
  164. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  165. }
  166. func (p *asyncProducer) Errors() <-chan *ProducerError {
  167. return p.errors
  168. }
  169. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  170. return p.successes
  171. }
  172. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  173. return p.input
  174. }
  175. func (p *asyncProducer) Close() error {
  176. p.AsyncClose()
  177. if p.conf.Producer.Return.Successes {
  178. go withRecover(func() {
  179. for range p.successes {
  180. }
  181. })
  182. }
  183. var errors ProducerErrors
  184. if p.conf.Producer.Return.Errors {
  185. for event := range p.errors {
  186. errors = append(errors, event)
  187. }
  188. } else {
  189. <-p.errors
  190. }
  191. if len(errors) > 0 {
  192. return errors
  193. }
  194. return nil
  195. }
  196. func (p *asyncProducer) AsyncClose() {
  197. go withRecover(p.shutdown)
  198. }
  199. // singleton
  200. // dispatches messages by topic
  201. func (p *asyncProducer) dispatcher() {
  202. handlers := make(map[string]chan<- *ProducerMessage)
  203. shuttingDown := false
  204. for msg := range p.input {
  205. if msg == nil {
  206. Logger.Println("Something tried to send a nil message, it was ignored.")
  207. continue
  208. }
  209. if msg.flags&shutdown != 0 {
  210. shuttingDown = true
  211. p.inFlight.Done()
  212. continue
  213. } else if msg.retries == 0 {
  214. if shuttingDown {
  215. // we can't just call returnError here because that decrements the wait group,
  216. // which hasn't been incremented yet for this message, and shouldn't be
  217. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  218. if p.conf.Producer.Return.Errors {
  219. p.errors <- pErr
  220. } else {
  221. Logger.Println(pErr)
  222. }
  223. continue
  224. }
  225. p.inFlight.Add(1)
  226. }
  227. version := 1
  228. if p.conf.Version.IsAtLeast(V0_11_0_0) {
  229. version = 2
  230. } else if msg.Headers != nil {
  231. p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
  232. continue
  233. }
  234. if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
  235. p.returnError(msg, ErrMessageSizeTooLarge)
  236. continue
  237. }
  238. handler := handlers[msg.Topic]
  239. if handler == nil {
  240. handler = p.newTopicProducer(msg.Topic)
  241. handlers[msg.Topic] = handler
  242. }
  243. handler <- msg
  244. }
  245. for _, handler := range handlers {
  246. close(handler)
  247. }
  248. }
  249. // one per topic
  250. // partitions messages, then dispatches them by partition
  251. type topicProducer struct {
  252. parent *asyncProducer
  253. topic string
  254. input <-chan *ProducerMessage
  255. breaker *breaker.Breaker
  256. handlers map[int32]chan<- *ProducerMessage
  257. partitioner Partitioner
  258. }
  259. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  260. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  261. tp := &topicProducer{
  262. parent: p,
  263. topic: topic,
  264. input: input,
  265. breaker: breaker.New(3, 1, 10*time.Second),
  266. handlers: make(map[int32]chan<- *ProducerMessage),
  267. partitioner: p.conf.Producer.Partitioner(topic),
  268. }
  269. go withRecover(tp.dispatch)
  270. return input
  271. }
  272. func (tp *topicProducer) dispatch() {
  273. for msg := range tp.input {
  274. if msg.retries == 0 {
  275. if err := tp.partitionMessage(msg); err != nil {
  276. tp.parent.returnError(msg, err)
  277. continue
  278. }
  279. }
  280. handler := tp.handlers[msg.Partition]
  281. if handler == nil {
  282. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  283. tp.handlers[msg.Partition] = handler
  284. }
  285. handler <- msg
  286. }
  287. for _, handler := range tp.handlers {
  288. close(handler)
  289. }
  290. }
  291. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  292. var partitions []int32
  293. err := tp.breaker.Run(func() (err error) {
  294. var requiresConsistency = false
  295. if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
  296. requiresConsistency = ep.MessageRequiresConsistency(msg)
  297. } else {
  298. requiresConsistency = tp.partitioner.RequiresConsistency()
  299. }
  300. if requiresConsistency {
  301. partitions, err = tp.parent.client.Partitions(msg.Topic)
  302. } else {
  303. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  304. }
  305. return
  306. })
  307. if err != nil {
  308. return err
  309. }
  310. numPartitions := int32(len(partitions))
  311. if numPartitions == 0 {
  312. return ErrLeaderNotAvailable
  313. }
  314. choice, err := tp.partitioner.Partition(msg, numPartitions)
  315. if err != nil {
  316. return err
  317. } else if choice < 0 || choice >= numPartitions {
  318. return ErrInvalidPartition
  319. }
  320. msg.Partition = partitions[choice]
  321. return nil
  322. }
  323. // one per partition per topic
  324. // dispatches messages to the appropriate broker
  325. // also responsible for maintaining message order during retries
  326. type partitionProducer struct {
  327. parent *asyncProducer
  328. topic string
  329. partition int32
  330. input <-chan *ProducerMessage
  331. leader *Broker
  332. breaker *breaker.Breaker
  333. output chan<- *ProducerMessage
  334. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  335. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  336. // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  337. // therefore whether our buffer is complete and safe to flush)
  338. highWatermark int
  339. retryState []partitionRetryState
  340. }
  341. type partitionRetryState struct {
  342. buf []*ProducerMessage
  343. expectChaser bool
  344. }
  345. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  346. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  347. pp := &partitionProducer{
  348. parent: p,
  349. topic: topic,
  350. partition: partition,
  351. input: input,
  352. breaker: breaker.New(3, 1, 10*time.Second),
  353. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  354. }
  355. go withRecover(pp.dispatch)
  356. return input
  357. }
  358. func (pp *partitionProducer) dispatch() {
  359. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  360. // on the first message
  361. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  362. if pp.leader != nil {
  363. pp.output = pp.parent.getBrokerProducer(pp.leader)
  364. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  365. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  366. }
  367. for msg := range pp.input {
  368. if msg.retries > pp.highWatermark {
  369. // a new, higher, retry level; handle it and then back off
  370. pp.newHighWatermark(msg.retries)
  371. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  372. } else if pp.highWatermark > 0 {
  373. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  374. if msg.retries < pp.highWatermark {
  375. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
  376. if msg.flags&fin == fin {
  377. pp.retryState[msg.retries].expectChaser = false
  378. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  379. } else {
  380. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  381. }
  382. continue
  383. } else if msg.flags&fin == fin {
  384. // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
  385. // meaning this retry level is done and we can go down (at least) one level and flush that
  386. pp.retryState[pp.highWatermark].expectChaser = false
  387. pp.flushRetryBuffers()
  388. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  389. continue
  390. }
  391. }
  392. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  393. // without breaking any of our ordering guarantees
  394. if pp.output == nil {
  395. if err := pp.updateLeader(); err != nil {
  396. pp.parent.returnError(msg, err)
  397. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  398. continue
  399. }
  400. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  401. }
  402. pp.output <- msg
  403. }
  404. if pp.output != nil {
  405. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  406. }
  407. }
  408. func (pp *partitionProducer) newHighWatermark(hwm int) {
  409. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  410. pp.highWatermark = hwm
  411. // send off a fin so that we know when everything "in between" has made it
  412. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  413. pp.retryState[pp.highWatermark].expectChaser = true
  414. pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
  415. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
  416. // a new HWM means that our current broker selection is out of date
  417. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  418. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  419. pp.output = nil
  420. }
  421. func (pp *partitionProducer) flushRetryBuffers() {
  422. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  423. for {
  424. pp.highWatermark--
  425. if pp.output == nil {
  426. if err := pp.updateLeader(); err != nil {
  427. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  428. goto flushDone
  429. }
  430. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  431. }
  432. for _, msg := range pp.retryState[pp.highWatermark].buf {
  433. pp.output <- msg
  434. }
  435. flushDone:
  436. pp.retryState[pp.highWatermark].buf = nil
  437. if pp.retryState[pp.highWatermark].expectChaser {
  438. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  439. break
  440. } else if pp.highWatermark == 0 {
  441. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  442. break
  443. }
  444. }
  445. }
  446. func (pp *partitionProducer) updateLeader() error {
  447. return pp.breaker.Run(func() (err error) {
  448. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  449. return err
  450. }
  451. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  452. return err
  453. }
  454. pp.output = pp.parent.getBrokerProducer(pp.leader)
  455. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  456. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  457. return nil
  458. })
  459. }
  460. // one per broker; also constructs an associated flusher
  461. func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  462. var (
  463. input = make(chan *ProducerMessage)
  464. bridge = make(chan *produceSet)
  465. responses = make(chan *brokerProducerResponse)
  466. )
  467. bp := &brokerProducer{
  468. parent: p,
  469. broker: broker,
  470. input: input,
  471. output: bridge,
  472. responses: responses,
  473. buffer: newProduceSet(p),
  474. currentRetries: make(map[string]map[int32]error),
  475. }
  476. go withRecover(bp.run)
  477. // minimal bridge to make the network response `select`able
  478. go withRecover(func() {
  479. for set := range bridge {
  480. request := set.buildRequest()
  481. response, err := broker.Produce(request)
  482. responses <- &brokerProducerResponse{
  483. set: set,
  484. err: err,
  485. res: response,
  486. }
  487. }
  488. close(responses)
  489. })
  490. return input
  491. }
  492. type brokerProducerResponse struct {
  493. set *produceSet
  494. err error
  495. res *ProduceResponse
  496. }
  497. // groups messages together into appropriately-sized batches for sending to the broker
  498. // handles state related to retries etc
  499. type brokerProducer struct {
  500. parent *asyncProducer
  501. broker *Broker
  502. input <-chan *ProducerMessage
  503. output chan<- *produceSet
  504. responses <-chan *brokerProducerResponse
  505. buffer *produceSet
  506. timer <-chan time.Time
  507. timerFired bool
  508. closing error
  509. currentRetries map[string]map[int32]error
  510. }
  511. func (bp *brokerProducer) run() {
  512. var output chan<- *produceSet
  513. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  514. for {
  515. select {
  516. case msg := <-bp.input:
  517. if msg == nil {
  518. bp.shutdown()
  519. return
  520. }
  521. if msg.flags&syn == syn {
  522. Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
  523. bp.broker.ID(), msg.Topic, msg.Partition)
  524. if bp.currentRetries[msg.Topic] == nil {
  525. bp.currentRetries[msg.Topic] = make(map[int32]error)
  526. }
  527. bp.currentRetries[msg.Topic][msg.Partition] = nil
  528. bp.parent.inFlight.Done()
  529. continue
  530. }
  531. if reason := bp.needsRetry(msg); reason != nil {
  532. bp.parent.retryMessage(msg, reason)
  533. if bp.closing == nil && msg.flags&fin == fin {
  534. // we were retrying this partition but we can start processing again
  535. delete(bp.currentRetries[msg.Topic], msg.Partition)
  536. Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
  537. bp.broker.ID(), msg.Topic, msg.Partition)
  538. }
  539. continue
  540. }
  541. if bp.buffer.wouldOverflow(msg) {
  542. if err := bp.waitForSpace(msg); err != nil {
  543. bp.parent.retryMessage(msg, err)
  544. continue
  545. }
  546. }
  547. if err := bp.buffer.add(msg); err != nil {
  548. bp.parent.returnError(msg, err)
  549. continue
  550. }
  551. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  552. bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
  553. }
  554. case <-bp.timer:
  555. bp.timerFired = true
  556. case output <- bp.buffer:
  557. bp.rollOver()
  558. case response := <-bp.responses:
  559. bp.handleResponse(response)
  560. }
  561. if bp.timerFired || bp.buffer.readyToFlush() {
  562. output = bp.output
  563. } else {
  564. output = nil
  565. }
  566. }
  567. }
  568. func (bp *brokerProducer) shutdown() {
  569. for !bp.buffer.empty() {
  570. select {
  571. case response := <-bp.responses:
  572. bp.handleResponse(response)
  573. case bp.output <- bp.buffer:
  574. bp.rollOver()
  575. }
  576. }
  577. close(bp.output)
  578. for response := range bp.responses {
  579. bp.handleResponse(response)
  580. }
  581. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  582. }
  583. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  584. if bp.closing != nil {
  585. return bp.closing
  586. }
  587. return bp.currentRetries[msg.Topic][msg.Partition]
  588. }
  589. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
  590. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  591. for {
  592. select {
  593. case response := <-bp.responses:
  594. bp.handleResponse(response)
  595. // handling a response can change our state, so re-check some things
  596. if reason := bp.needsRetry(msg); reason != nil {
  597. return reason
  598. } else if !bp.buffer.wouldOverflow(msg) {
  599. return nil
  600. }
  601. case bp.output <- bp.buffer:
  602. bp.rollOver()
  603. return nil
  604. }
  605. }
  606. }
  607. func (bp *brokerProducer) rollOver() {
  608. bp.timer = nil
  609. bp.timerFired = false
  610. bp.buffer = newProduceSet(bp.parent)
  611. }
  612. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  613. if response.err != nil {
  614. bp.handleError(response.set, response.err)
  615. } else {
  616. bp.handleSuccess(response.set, response.res)
  617. }
  618. if bp.buffer.empty() {
  619. bp.rollOver() // this can happen if the response invalidated our buffer
  620. }
  621. }
  622. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  623. // we iterate through the blocks in the request set, not the response, so that we notice
  624. // if the response is missing a block completely
  625. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  626. if response == nil {
  627. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  628. bp.parent.returnSuccesses(msgs)
  629. return
  630. }
  631. block := response.GetBlock(topic, partition)
  632. if block == nil {
  633. bp.parent.returnErrors(msgs, ErrIncompleteResponse)
  634. return
  635. }
  636. switch block.Err {
  637. // Success
  638. case ErrNoError:
  639. if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
  640. for _, msg := range msgs {
  641. msg.Timestamp = block.Timestamp
  642. }
  643. }
  644. for i, msg := range msgs {
  645. msg.Offset = block.Offset + int64(i)
  646. }
  647. bp.parent.returnSuccesses(msgs)
  648. // Retriable errors
  649. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  650. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  651. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  652. bp.broker.ID(), topic, partition, block.Err)
  653. bp.currentRetries[topic][partition] = block.Err
  654. bp.parent.retryMessages(msgs, block.Err)
  655. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  656. // Other non-retriable errors
  657. default:
  658. bp.parent.returnErrors(msgs, block.Err)
  659. }
  660. })
  661. }
  662. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  663. switch err.(type) {
  664. case PacketEncodingError:
  665. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  666. bp.parent.returnErrors(msgs, err)
  667. })
  668. default:
  669. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  670. bp.parent.abandonBrokerConnection(bp.broker)
  671. _ = bp.broker.Close()
  672. bp.closing = err
  673. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  674. bp.parent.retryMessages(msgs, err)
  675. })
  676. bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  677. bp.parent.retryMessages(msgs, err)
  678. })
  679. bp.rollOver()
  680. }
  681. }
  682. // singleton
  683. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  684. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  685. func (p *asyncProducer) retryHandler() {
  686. var msg *ProducerMessage
  687. buf := queue.New()
  688. for {
  689. if buf.Length() == 0 {
  690. msg = <-p.retries
  691. } else {
  692. select {
  693. case msg = <-p.retries:
  694. case p.input <- buf.Peek().(*ProducerMessage):
  695. buf.Remove()
  696. continue
  697. }
  698. }
  699. if msg == nil {
  700. return
  701. }
  702. buf.Add(msg)
  703. }
  704. }
  705. // utility functions
  706. func (p *asyncProducer) shutdown() {
  707. Logger.Println("Producer shutting down.")
  708. p.inFlight.Add(1)
  709. p.input <- &ProducerMessage{flags: shutdown}
  710. p.inFlight.Wait()
  711. if p.ownClient {
  712. err := p.client.Close()
  713. if err != nil {
  714. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  715. }
  716. }
  717. close(p.input)
  718. close(p.retries)
  719. close(p.errors)
  720. close(p.successes)
  721. }
  722. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  723. msg.clear()
  724. pErr := &ProducerError{Msg: msg, Err: err}
  725. if p.conf.Producer.Return.Errors {
  726. p.errors <- pErr
  727. } else {
  728. Logger.Println(pErr)
  729. }
  730. p.inFlight.Done()
  731. }
  732. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  733. for _, msg := range batch {
  734. p.returnError(msg, err)
  735. }
  736. }
  737. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  738. for _, msg := range batch {
  739. if p.conf.Producer.Return.Successes {
  740. msg.clear()
  741. p.successes <- msg
  742. }
  743. p.inFlight.Done()
  744. }
  745. }
  746. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  747. if msg.retries >= p.conf.Producer.Retry.Max {
  748. p.returnError(msg, err)
  749. } else {
  750. msg.retries++
  751. p.retries <- msg
  752. }
  753. }
  754. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  755. for _, msg := range batch {
  756. p.retryMessage(msg, err)
  757. }
  758. }
  759. func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  760. p.brokerLock.Lock()
  761. defer p.brokerLock.Unlock()
  762. bp := p.brokers[broker]
  763. if bp == nil {
  764. bp = p.newBrokerProducer(broker)
  765. p.brokers[broker] = bp
  766. p.brokerRefs[bp] = 0
  767. }
  768. p.brokerRefs[bp]++
  769. return bp
  770. }
  771. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
  772. p.brokerLock.Lock()
  773. defer p.brokerLock.Unlock()
  774. p.brokerRefs[bp]--
  775. if p.brokerRefs[bp] == 0 {
  776. close(bp)
  777. delete(p.brokerRefs, bp)
  778. if p.brokers[broker] == bp {
  779. delete(p.brokers, broker)
  780. }
  781. }
  782. }
  783. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  784. p.brokerLock.Lock()
  785. defer p.brokerLock.Unlock()
  786. delete(p.brokers, broker)
  787. }