broker.go 22 KB


  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  15. type Broker struct {
  16. id int32
  17. addr string
  18. rack *string
  19. conf *Config
  20. correlationID int32
  21. conn net.Conn
  22. connErr error
  23. lock sync.Mutex
  24. opened int32
  25. responses chan responsePromise
  26. done chan bool
  27. incomingByteRate metrics.Meter
  28. requestRate metrics.Meter
  29. requestSize metrics.Histogram
  30. requestLatency metrics.Histogram
  31. outgoingByteRate metrics.Meter
  32. responseRate metrics.Meter
  33. responseSize metrics.Histogram
  34. brokerIncomingByteRate metrics.Meter
  35. brokerRequestRate metrics.Meter
  36. brokerRequestSize metrics.Histogram
  37. brokerRequestLatency metrics.Histogram
  38. brokerOutgoingByteRate metrics.Meter
  39. brokerResponseRate metrics.Meter
  40. brokerResponseSize metrics.Histogram
  41. }
  42. type responsePromise struct {
  43. requestTime time.Time
  44. correlationID int32
  45. packets chan []byte
  46. errors chan error
  47. }
  48. // NewBroker creates and returns a Broker targeting the given host:port address.
  49. // This does not attempt to actually connect, you have to call Open() for that.
  50. func NewBroker(addr string) *Broker {
  51. return &Broker{id: -1, addr: addr}
  52. }
  53. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  54. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  55. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  56. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  57. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  58. func (b *Broker) Open(conf *Config) error {
  59. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  60. return ErrAlreadyConnected
  61. }
  62. if conf == nil {
  63. conf = NewConfig()
  64. }
  65. err := conf.Validate()
  66. if err != nil {
  67. return err
  68. }
  69. b.lock.Lock()
  70. go withRecover(func() {
  71. defer b.lock.Unlock()
  72. dialer := net.Dialer{
  73. Timeout: conf.Net.DialTimeout,
  74. KeepAlive: conf.Net.KeepAlive,
  75. LocalAddr: conf.Net.LocalAddr,
  76. }
  77. if conf.Net.TLS.Enable {
  78. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  79. } else {
  80. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  81. }
  82. if b.connErr != nil {
  83. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  84. b.conn = nil
  85. atomic.StoreInt32(&b.opened, 0)
  86. return
  87. }
  88. b.conn = newBufConn(b.conn)
  89. b.conf = conf
  90. // Create or reuse the global metrics shared between brokers
  91. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  92. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  93. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  94. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  95. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  96. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  97. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  98. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  99. // the same id (-1) and are already exposed through the global metrics above
  100. if b.id >= 0 {
  101. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  102. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  103. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  104. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  105. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  106. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  107. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  108. }
  109. if conf.Net.SASL.Enable {
  110. b.connErr = b.sendAndReceiveSASLPlainAuth()
  111. if b.connErr != nil {
  112. err = b.conn.Close()
  113. if err == nil {
  114. Logger.Printf("Closed connection to broker %s\n", b.addr)
  115. } else {
  116. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  117. }
  118. b.conn = nil
  119. atomic.StoreInt32(&b.opened, 0)
  120. return
  121. }
  122. }
  123. b.done = make(chan bool)
  124. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  125. if b.id >= 0 {
  126. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  127. } else {
  128. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  129. }
  130. go withRecover(b.responseReceiver)
  131. })
  132. return nil
  133. }
  134. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  135. // connected but it had tried to connect, the error from that connection attempt is also returned.
  136. func (b *Broker) Connected() (bool, error) {
  137. b.lock.Lock()
  138. defer b.lock.Unlock()
  139. return b.conn != nil, b.connErr
  140. }
  141. func (b *Broker) Close() error {
  142. b.lock.Lock()
  143. defer b.lock.Unlock()
  144. if b.conn == nil {
  145. return ErrNotConnected
  146. }
  147. close(b.responses)
  148. <-b.done
  149. err := b.conn.Close()
  150. b.conn = nil
  151. b.connErr = nil
  152. b.done = nil
  153. b.responses = nil
  154. if b.id >= 0 {
  155. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
  156. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
  157. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
  158. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
  159. }
  160. if err == nil {
  161. Logger.Printf("Closed connection to broker %s\n", b.addr)
  162. } else {
  163. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  164. }
  165. atomic.StoreInt32(&b.opened, 0)
  166. return err
  167. }
  168. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  169. func (b *Broker) ID() int32 {
  170. return b.id
  171. }
  172. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  173. func (b *Broker) Addr() string {
  174. return b.addr
  175. }
  176. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  177. response := new(MetadataResponse)
  178. err := b.sendAndReceive(request, response)
  179. if err != nil {
  180. return nil, err
  181. }
  182. return response, nil
  183. }
  184. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  185. response := new(ConsumerMetadataResponse)
  186. err := b.sendAndReceive(request, response)
  187. if err != nil {
  188. return nil, err
  189. }
  190. return response, nil
  191. }
  192. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  193. response := new(FindCoordinatorResponse)
  194. err := b.sendAndReceive(request, response)
  195. if err != nil {
  196. return nil, err
  197. }
  198. return response, nil
  199. }
  200. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  201. response := new(OffsetResponse)
  202. err := b.sendAndReceive(request, response)
  203. if err != nil {
  204. return nil, err
  205. }
  206. return response, nil
  207. }
  208. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  209. var response *ProduceResponse
  210. var err error
  211. if request.RequiredAcks == NoResponse {
  212. err = b.sendAndReceive(request, nil)
  213. } else {
  214. response = new(ProduceResponse)
  215. err = b.sendAndReceive(request, response)
  216. }
  217. if err != nil {
  218. return nil, err
  219. }
  220. return response, nil
  221. }
  222. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  223. response := new(FetchResponse)
  224. err := b.sendAndReceive(request, response)
  225. if err != nil {
  226. return nil, err
  227. }
  228. return response, nil
  229. }
  230. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  231. response := new(OffsetCommitResponse)
  232. err := b.sendAndReceive(request, response)
  233. if err != nil {
  234. return nil, err
  235. }
  236. return response, nil
  237. }
  238. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  239. response := new(OffsetFetchResponse)
  240. err := b.sendAndReceive(request, response)
  241. if err != nil {
  242. return nil, err
  243. }
  244. return response, nil
  245. }
  246. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  247. response := new(JoinGroupResponse)
  248. err := b.sendAndReceive(request, response)
  249. if err != nil {
  250. return nil, err
  251. }
  252. return response, nil
  253. }
  254. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  255. response := new(SyncGroupResponse)
  256. err := b.sendAndReceive(request, response)
  257. if err != nil {
  258. return nil, err
  259. }
  260. return response, nil
  261. }
  262. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  263. response := new(LeaveGroupResponse)
  264. err := b.sendAndReceive(request, response)
  265. if err != nil {
  266. return nil, err
  267. }
  268. return response, nil
  269. }
  270. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  271. response := new(HeartbeatResponse)
  272. err := b.sendAndReceive(request, response)
  273. if err != nil {
  274. return nil, err
  275. }
  276. return response, nil
  277. }
  278. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  279. response := new(ListGroupsResponse)
  280. err := b.sendAndReceive(request, response)
  281. if err != nil {
  282. return nil, err
  283. }
  284. return response, nil
  285. }
  286. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  287. response := new(DescribeGroupsResponse)
  288. err := b.sendAndReceive(request, response)
  289. if err != nil {
  290. return nil, err
  291. }
  292. return response, nil
  293. }
  294. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  295. response := new(ApiVersionsResponse)
  296. err := b.sendAndReceive(request, response)
  297. if err != nil {
  298. return nil, err
  299. }
  300. return response, nil
  301. }
  302. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  303. response := new(CreateTopicsResponse)
  304. err := b.sendAndReceive(request, response)
  305. if err != nil {
  306. return nil, err
  307. }
  308. return response, nil
  309. }
  310. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  311. response := new(DeleteTopicsResponse)
  312. err := b.sendAndReceive(request, response)
  313. if err != nil {
  314. return nil, err
  315. }
  316. return response, nil
  317. }
  318. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  319. response := new(CreatePartitionsResponse)
  320. err := b.sendAndReceive(request, response)
  321. if err != nil {
  322. return nil, err
  323. }
  324. return response, nil
  325. }
  326. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  327. response := new(DeleteRecordsResponse)
  328. err := b.sendAndReceive(request, response)
  329. if err != nil {
  330. return nil, err
  331. }
  332. return response, nil
  333. }
  334. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  335. response := new(DescribeAclsResponse)
  336. err := b.sendAndReceive(request, response)
  337. if err != nil {
  338. return nil, err
  339. }
  340. return response, nil
  341. }
  342. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  343. response := new(CreateAclsResponse)
  344. err := b.sendAndReceive(request, response)
  345. if err != nil {
  346. return nil, err
  347. }
  348. return response, nil
  349. }
  350. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  351. response := new(DeleteAclsResponse)
  352. err := b.sendAndReceive(request, response)
  353. if err != nil {
  354. return nil, err
  355. }
  356. return response, nil
  357. }
  358. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  359. response := new(InitProducerIDResponse)
  360. err := b.sendAndReceive(request, response)
  361. if err != nil {
  362. return nil, err
  363. }
  364. return response, nil
  365. }
  366. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  367. response := new(AddPartitionsToTxnResponse)
  368. err := b.sendAndReceive(request, response)
  369. if err != nil {
  370. return nil, err
  371. }
  372. return response, nil
  373. }
  374. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  375. response := new(AddOffsetsToTxnResponse)
  376. err := b.sendAndReceive(request, response)
  377. if err != nil {
  378. return nil, err
  379. }
  380. return response, nil
  381. }
  382. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  383. response := new(EndTxnResponse)
  384. err := b.sendAndReceive(request, response)
  385. if err != nil {
  386. return nil, err
  387. }
  388. return response, nil
  389. }
  390. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  391. response := new(TxnOffsetCommitResponse)
  392. err := b.sendAndReceive(request, response)
  393. if err != nil {
  394. return nil, err
  395. }
  396. return response, nil
  397. }
  398. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  399. response := new(DescribeConfigsResponse)
  400. err := b.sendAndReceive(request, response)
  401. if err != nil {
  402. return nil, err
  403. }
  404. return response, nil
  405. }
  406. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  407. response := new(AlterConfigsResponse)
  408. err := b.sendAndReceive(request, response)
  409. if err != nil {
  410. return nil, err
  411. }
  412. return response, nil
  413. }
  414. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  415. response := new(DeleteGroupsResponse)
  416. if err := b.sendAndReceive(request, response); err != nil {
  417. return nil, err
  418. }
  419. return response, nil
  420. }
  421. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  422. b.lock.Lock()
  423. defer b.lock.Unlock()
  424. if b.conn == nil {
  425. if b.connErr != nil {
  426. return nil, b.connErr
  427. }
  428. return nil, ErrNotConnected
  429. }
  430. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  431. return nil, ErrUnsupportedVersion
  432. }
  433. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  434. buf, err := encode(req, b.conf.MetricRegistry)
  435. if err != nil {
  436. return nil, err
  437. }
  438. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  439. if err != nil {
  440. return nil, err
  441. }
  442. requestTime := time.Now()
  443. bytes, err := b.conn.Write(buf)
  444. b.updateOutgoingCommunicationMetrics(bytes)
  445. if err != nil {
  446. return nil, err
  447. }
  448. b.correlationID++
  449. if !promiseResponse {
  450. // Record request latency without the response
  451. b.updateRequestLatencyMetrics(time.Since(requestTime))
  452. return nil, nil
  453. }
  454. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  455. b.responses <- promise
  456. return &promise, nil
  457. }
  458. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  459. promise, err := b.send(req, res != nil)
  460. if err != nil {
  461. return err
  462. }
  463. if promise == nil {
  464. return nil
  465. }
  466. select {
  467. case buf := <-promise.packets:
  468. return versionedDecode(buf, res, req.version())
  469. case err = <-promise.errors:
  470. return err
  471. }
  472. }
  473. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  474. b.id, err = pd.getInt32()
  475. if err != nil {
  476. return err
  477. }
  478. host, err := pd.getString()
  479. if err != nil {
  480. return err
  481. }
  482. port, err := pd.getInt32()
  483. if err != nil {
  484. return err
  485. }
  486. if version >= 1 {
  487. b.rack, err = pd.getNullableString()
  488. if err != nil {
  489. return err
  490. }
  491. }
  492. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  493. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  494. return err
  495. }
  496. return nil
  497. }
  498. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  499. host, portstr, err := net.SplitHostPort(b.addr)
  500. if err != nil {
  501. return err
  502. }
  503. port, err := strconv.Atoi(portstr)
  504. if err != nil {
  505. return err
  506. }
  507. pe.putInt32(b.id)
  508. err = pe.putString(host)
  509. if err != nil {
  510. return err
  511. }
  512. pe.putInt32(int32(port))
  513. if version >= 1 {
  514. err = pe.putNullableString(b.rack)
  515. if err != nil {
  516. return err
  517. }
  518. }
  519. return nil
  520. }
  521. func (b *Broker) responseReceiver() {
  522. var dead error
  523. header := make([]byte, 8)
  524. for response := range b.responses {
  525. if dead != nil {
  526. response.errors <- dead
  527. continue
  528. }
  529. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  530. if err != nil {
  531. dead = err
  532. response.errors <- err
  533. continue
  534. }
  535. bytesReadHeader, err := io.ReadFull(b.conn, header)
  536. requestLatency := time.Since(response.requestTime)
  537. if err != nil {
  538. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  539. dead = err
  540. response.errors <- err
  541. continue
  542. }
  543. decodedHeader := responseHeader{}
  544. err = decode(header, &decodedHeader)
  545. if err != nil {
  546. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  547. dead = err
  548. response.errors <- err
  549. continue
  550. }
  551. if decodedHeader.correlationID != response.correlationID {
  552. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  553. // TODO if decoded ID < cur ID, discard until we catch up
  554. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  555. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  556. response.errors <- dead
  557. continue
  558. }
  559. buf := make([]byte, decodedHeader.length-4)
  560. bytesReadBody, err := io.ReadFull(b.conn, buf)
  561. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  562. if err != nil {
  563. dead = err
  564. response.errors <- err
  565. continue
  566. }
  567. response.packets <- buf
  568. }
  569. close(b.done)
  570. }
  571. func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
  572. rb := &SaslHandshakeRequest{"PLAIN"}
  573. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  574. buf, err := encode(req, b.conf.MetricRegistry)
  575. if err != nil {
  576. return err
  577. }
  578. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  579. if err != nil {
  580. return err
  581. }
  582. requestTime := time.Now()
  583. bytes, err := b.conn.Write(buf)
  584. b.updateOutgoingCommunicationMetrics(bytes)
  585. if err != nil {
  586. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  587. return err
  588. }
  589. b.correlationID++
  590. //wait for the response
  591. header := make([]byte, 8) // response header
  592. _, err = io.ReadFull(b.conn, header)
  593. if err != nil {
  594. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  595. return err
  596. }
  597. length := binary.BigEndian.Uint32(header[:4])
  598. payload := make([]byte, length-4)
  599. n, err := io.ReadFull(b.conn, payload)
  600. if err != nil {
  601. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  602. return err
  603. }
  604. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  605. res := &SaslHandshakeResponse{}
  606. err = versionedDecode(payload, res, 0)
  607. if err != nil {
  608. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  609. return err
  610. }
  611. if res.Err != ErrNoError {
  612. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  613. return res.Err
  614. }
  615. Logger.Print("Successful SASL handshake")
  616. return nil
  617. }
  618. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  619. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  620. //
  621. // In SASL Plain, Kafka expects the auth header to be in the following format
  622. // Message format (from https://tools.ietf.org/html/rfc4616):
  623. //
  624. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  625. // authcid = 1*SAFE ; MUST accept up to 255 octets
  626. // authzid = 1*SAFE ; MUST accept up to 255 octets
  627. // passwd = 1*SAFE ; MUST accept up to 255 octets
  628. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  629. //
  630. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  631. // ;; any UTF-8 encoded Unicode character except NUL
  632. //
  633. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  634. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  635. // of responding to bad credentials but thats how its being done today.
  636. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  637. if b.conf.Net.SASL.Handshake {
  638. handshakeErr := b.sendAndReceiveSASLPlainHandshake()
  639. if handshakeErr != nil {
  640. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  641. return handshakeErr
  642. }
  643. }
  644. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  645. authBytes := make([]byte, length+4) //4 byte length header + auth data
  646. binary.BigEndian.PutUint32(authBytes, uint32(length))
  647. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  648. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  649. if err != nil {
  650. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  651. return err
  652. }
  653. requestTime := time.Now()
  654. bytesWritten, err := b.conn.Write(authBytes)
  655. b.updateOutgoingCommunicationMetrics(bytesWritten)
  656. if err != nil {
  657. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  658. return err
  659. }
  660. header := make([]byte, 4)
  661. n, err := io.ReadFull(b.conn, header)
  662. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  663. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  664. // Otherwise, the broker closes the connection and we get an EOF
  665. if err != nil {
  666. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  667. return err
  668. }
  669. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  670. return nil
  671. }
  672. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  673. b.updateRequestLatencyMetrics(requestLatency)
  674. b.responseRate.Mark(1)
  675. if b.brokerResponseRate != nil {
  676. b.brokerResponseRate.Mark(1)
  677. }
  678. responseSize := int64(bytes)
  679. b.incomingByteRate.Mark(responseSize)
  680. if b.brokerIncomingByteRate != nil {
  681. b.brokerIncomingByteRate.Mark(responseSize)
  682. }
  683. b.responseSize.Update(responseSize)
  684. if b.brokerResponseSize != nil {
  685. b.brokerResponseSize.Update(responseSize)
  686. }
  687. }
  688. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  689. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  690. b.requestLatency.Update(requestLatencyInMs)
  691. if b.brokerRequestLatency != nil {
  692. b.brokerRequestLatency.Update(requestLatencyInMs)
  693. }
  694. }
  695. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  696. b.requestRate.Mark(1)
  697. if b.brokerRequestRate != nil {
  698. b.brokerRequestRate.Mark(1)
  699. }
  700. requestSize := int64(bytes)
  701. b.outgoingByteRate.Mark(requestSize)
  702. if b.brokerOutgoingByteRate != nil {
  703. b.brokerOutgoingByteRate.Mark(requestSize)
  704. }
  705. b.requestSize.Update(requestSize)
  706. if b.brokerRequestSize != nil {
  707. b.brokerRequestSize.Update(requestSize)
  708. }
  709. }