client.go 933 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package cluster
  2. import (
  3. "errors"
  4. "sync/atomic"
  5. "github.com/Shopify/sarama"
  6. )
  7. var errClientInUse = errors.New("cluster: client is already used by another consumer")
  8. // Client is a group client
  9. type Client struct {
  10. sarama.Client
  11. config Config
  12. inUse uint32
  13. }
  14. // NewClient creates a new client instance
  15. func NewClient(addrs []string, config *Config) (*Client, error) {
  16. if config == nil {
  17. config = NewConfig()
  18. }
  19. if err := config.Validate(); err != nil {
  20. return nil, err
  21. }
  22. client, err := sarama.NewClient(addrs, &config.Config)
  23. if err != nil {
  24. return nil, err
  25. }
  26. return &Client{Client: client, config: *config}, nil
  27. }
  28. // ClusterConfig returns the cluster configuration.
  29. func (c *Client) ClusterConfig() *Config {
  30. cfg := c.config
  31. return &cfg
  32. }
  33. func (c *Client) claim() bool {
  34. return atomic.CompareAndSwapUint32(&c.inUse, 0, 1)
  35. }
  36. func (c *Client) release() {
  37. atomic.CompareAndSwapUint32(&c.inUse, 1, 0)
  38. }