|
- // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
- // This file is part of GoHBase.
- // Use of this source code is governed by the Apache License 2.0
- // that can be found in the COPYING file.
- package gohbase
- import (
- "encoding/binary"
- "fmt"
- "sync"
- "time"
- "github.com/cznic/b"
- "github.com/golang/protobuf/proto"
- log "github.com/sirupsen/logrus"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/pb"
- "github.com/tsuna/gohbase/region"
- "github.com/tsuna/gohbase/zk"
- "golang.org/x/time/rate"
- )
- const (
- standardClient = iota
- adminClient
- defaultRPCQueueSize = 100
- defaultFlushInterval = 20 * time.Millisecond
- defaultZkRoot = "/hbase"
- defaultZkTimeout = 30 * time.Second
- defaultEffectiveUser = "root"
- // metaBurst is maximum number of request allowed at once.
- metaBurst = 10
- // metaLimit is rate at which to throttle requests to hbase:meta table.
- metaLimit = rate.Limit(100)
- )
- // Client a regular HBase client
- type Client interface {
- Scan(s *hrpc.Scan) hrpc.Scanner
- Get(g *hrpc.Get) (*hrpc.Result, error)
- Put(p *hrpc.Mutate) (*hrpc.Result, error)
- Delete(d *hrpc.Mutate) (*hrpc.Result, error)
- Append(a *hrpc.Mutate) (*hrpc.Result, error)
- Increment(i *hrpc.Mutate) (int64, error)
- CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
- expectedValue []byte) (bool, error)
- Close()
- }
- // RPCClient is core client of gohbase. It's exposed for testing.
- type RPCClient interface {
- SendRPC(rpc hrpc.Call) (proto.Message, error)
- }
- // Option is a function used to configure optional config items for a Client.
- type Option func(*client)
- // A Client provides access to an HBase cluster.
- type client struct {
- clientType int
- regions keyRegionCache
- // Maps a hrpc.RegionInfo to the *region.Client that we think currently
- // serves it.
- clients clientRegionCache
- metaRegionInfo hrpc.RegionInfo
- adminRegionInfo hrpc.RegionInfo
- // The maximum size of the RPC queue in the region client
- rpcQueueSize int
- // zkClient is zookeeper for retrieving meta and admin information
- zkClient zk.Client
- // The root zookeeper path for Hbase. By default, this is usually "/hbase".
- zkRoot string
- // The zookeeper session timeout
- zkTimeout time.Duration
- // The timeout before flushing the RPC queue in the region client
- flushInterval time.Duration
- // The user used when accessing regions.
- effectiveUser string
- // metaLookupLimiter is used to throttle lookups to hbase:meta table
- metaLookupLimiter *rate.Limiter
- // How long to wait for a region lookup (either meta lookup or finding
- // meta in ZooKeeper). Should be greater than or equal to the ZooKeeper
- // session timeout.
- regionLookupTimeout time.Duration
- // regionReadTimeout is the maximum amount of time to wait for regionserver reply
- regionReadTimeout time.Duration
- done chan struct{}
- closeOnce sync.Once
- }
- // NewClient creates a new HBase client.
- func NewClient(zkquorum string, options ...Option) Client {
- return newClient(zkquorum, options...)
- }
- func newClient(zkquorum string, options ...Option) *client {
- log.WithFields(log.Fields{
- "Host": zkquorum,
- }).Debug("Creating new client.")
- c := &client{
- clientType: standardClient,
- regions: keyRegionCache{regions: b.TreeNew(region.CompareGeneric)},
- clients: clientRegionCache{
- regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}),
- },
- rpcQueueSize: defaultRPCQueueSize,
- flushInterval: defaultFlushInterval,
- metaRegionInfo: region.NewInfo(
- 0,
- []byte("hbase"),
- []byte("meta"),
- []byte("hbase:meta,,1"),
- nil,
- nil),
- zkRoot: defaultZkRoot,
- zkTimeout: defaultZkTimeout,
- effectiveUser: defaultEffectiveUser,
- metaLookupLimiter: rate.NewLimiter(metaLimit, metaBurst),
- regionLookupTimeout: region.DefaultLookupTimeout,
- regionReadTimeout: region.DefaultReadTimeout,
- done: make(chan struct{}),
- }
- for _, option := range options {
- option(c)
- }
- //Have to create the zkClient after the Options have been set
- //since the zkTimeout could be changed as an option
- c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
- return c
- }
- // RpcQueueSize will return an option that will set the size of the RPC queues
- // used in a given client
- func RpcQueueSize(size int) Option {
- return func(c *client) {
- c.rpcQueueSize = size
- }
- }
- // ZookeeperRoot will return an option that will set the zookeeper root path used in a given client.
- func ZookeeperRoot(root string) Option {
- return func(c *client) {
- c.zkRoot = root
- }
- }
- // ZookeeperTimeout will return an option that will set the zookeeper session timeout.
- func ZookeeperTimeout(to time.Duration) Option {
- return func(c *client) {
- c.zkTimeout = to
- }
- }
- // RegionLookupTimeout will return an option that sets the region lookup timeout
- func RegionLookupTimeout(to time.Duration) Option {
- return func(c *client) {
- c.regionLookupTimeout = to
- }
- }
- // RegionReadTimeout will return an option that sets the region read timeout
- func RegionReadTimeout(to time.Duration) Option {
- return func(c *client) {
- c.regionReadTimeout = to
- }
- }
- // EffectiveUser will return an option that will set the user used when accessing regions.
- func EffectiveUser(user string) Option {
- return func(c *client) {
- c.effectiveUser = user
- }
- }
- // FlushInterval will return an option that will set the timeout for flushing
- // the RPC queues used in a given client
- func FlushInterval(interval time.Duration) Option {
- return func(c *client) {
- c.flushInterval = interval
- }
- }
- // Close closes connections to hbase master and regionservers
- func (c *client) Close() {
- c.closeOnce.Do(func() {
- close(c.done)
- if c.clientType == adminClient {
- if ac := c.adminRegionInfo.Client(); ac != nil {
- ac.Close()
- }
- }
- c.clients.closeAll()
- })
- }
- func (c *client) Scan(s *hrpc.Scan) hrpc.Scanner {
- return newScanner(c, s)
- }
- func (c *client) Get(g *hrpc.Get) (*hrpc.Result, error) {
- pbmsg, err := c.SendRPC(g)
- if err != nil {
- return nil, err
- }
- r, ok := pbmsg.(*pb.GetResponse)
- if !ok {
- return nil, fmt.Errorf("sendRPC returned not a GetResponse")
- }
- return hrpc.ToLocalResult(r.Result), nil
- }
- func (c *client) Put(p *hrpc.Mutate) (*hrpc.Result, error) {
- return c.mutate(p)
- }
- func (c *client) Delete(d *hrpc.Mutate) (*hrpc.Result, error) {
- return c.mutate(d)
- }
- func (c *client) Append(a *hrpc.Mutate) (*hrpc.Result, error) {
- return c.mutate(a)
- }
- func (c *client) Increment(i *hrpc.Mutate) (int64, error) {
- r, err := c.mutate(i)
- if err != nil {
- return 0, err
- }
- if len(r.Cells) != 1 {
- return 0, fmt.Errorf("increment returned %d cells, but we expected exactly one",
- len(r.Cells))
- }
- val := binary.BigEndian.Uint64(r.Cells[0].Value)
- return int64(val), nil
- }
- func (c *client) mutate(m *hrpc.Mutate) (*hrpc.Result, error) {
- pbmsg, err := c.SendRPC(m)
- if err != nil {
- return nil, err
- }
- r, ok := pbmsg.(*pb.MutateResponse)
- if !ok {
- return nil, fmt.Errorf("sendRPC returned not a MutateResponse")
- }
- return hrpc.ToLocalResult(r.Result), nil
- }
- func (c *client) CheckAndPut(p *hrpc.Mutate, family string,
- qualifier string, expectedValue []byte) (bool, error) {
- cas, err := hrpc.NewCheckAndPut(p, family, qualifier, expectedValue)
- if err != nil {
- return false, err
- }
- pbmsg, err := c.SendRPC(cas)
- if err != nil {
- return false, err
- }
- r, ok := pbmsg.(*pb.MutateResponse)
- if !ok {
- return false, fmt.Errorf("sendRPC returned a %T instead of MutateResponse", pbmsg)
- }
- if r.Processed == nil {
- return false, fmt.Errorf("protobuf in the response didn't contain the field "+
- "indicating whether the CheckAndPut was successful or not: %s", r)
- }
- return r.GetProcessed(), nil
- }
|