client.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package gohbase
  6. import (
  7. "encoding/binary"
  8. "fmt"
  9. "sync"
  10. "time"
  11. "github.com/cznic/b"
  12. "github.com/golang/protobuf/proto"
  13. log "github.com/sirupsen/logrus"
  14. "github.com/tsuna/gohbase/hrpc"
  15. "github.com/tsuna/gohbase/pb"
  16. "github.com/tsuna/gohbase/region"
  17. "github.com/tsuna/gohbase/zk"
  18. "golang.org/x/time/rate"
  19. )
  20. const (
  21. standardClient = iota
  22. adminClient
  23. defaultRPCQueueSize = 100
  24. defaultFlushInterval = 20 * time.Millisecond
  25. defaultZkRoot = "/hbase"
  26. defaultZkTimeout = 30 * time.Second
  27. defaultEffectiveUser = "root"
  28. // metaBurst is maximum number of request allowed at once.
  29. metaBurst = 10
  30. // metaLimit is rate at which to throttle requests to hbase:meta table.
  31. metaLimit = rate.Limit(100)
  32. )
  33. // Client a regular HBase client
  34. type Client interface {
  35. Scan(s *hrpc.Scan) hrpc.Scanner
  36. Get(g *hrpc.Get) (*hrpc.Result, error)
  37. Put(p *hrpc.Mutate) (*hrpc.Result, error)
  38. Delete(d *hrpc.Mutate) (*hrpc.Result, error)
  39. Append(a *hrpc.Mutate) (*hrpc.Result, error)
  40. Increment(i *hrpc.Mutate) (int64, error)
  41. CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
  42. expectedValue []byte) (bool, error)
  43. Close()
  44. }
  45. // RPCClient is core client of gohbase. It's exposed for testing.
  46. type RPCClient interface {
  47. SendRPC(rpc hrpc.Call) (proto.Message, error)
  48. }
  49. // Option is a function used to configure optional config items for a Client.
  50. type Option func(*client)
  51. // A Client provides access to an HBase cluster.
  52. type client struct {
  53. clientType int
  54. regions keyRegionCache
  55. // Maps a hrpc.RegionInfo to the *region.Client that we think currently
  56. // serves it.
  57. clients clientRegionCache
  58. metaRegionInfo hrpc.RegionInfo
  59. adminRegionInfo hrpc.RegionInfo
  60. // The maximum size of the RPC queue in the region client
  61. rpcQueueSize int
  62. // zkClient is zookeeper for retrieving meta and admin information
  63. zkClient zk.Client
  64. // The root zookeeper path for Hbase. By default, this is usually "/hbase".
  65. zkRoot string
  66. // The zookeeper session timeout
  67. zkTimeout time.Duration
  68. // The timeout before flushing the RPC queue in the region client
  69. flushInterval time.Duration
  70. // The user used when accessing regions.
  71. effectiveUser string
  72. // metaLookupLimiter is used to throttle lookups to hbase:meta table
  73. metaLookupLimiter *rate.Limiter
  74. // How long to wait for a region lookup (either meta lookup or finding
  75. // meta in ZooKeeper). Should be greater than or equal to the ZooKeeper
  76. // session timeout.
  77. regionLookupTimeout time.Duration
  78. // regionReadTimeout is the maximum amount of time to wait for regionserver reply
  79. regionReadTimeout time.Duration
  80. done chan struct{}
  81. closeOnce sync.Once
  82. }
  83. // NewClient creates a new HBase client.
  84. func NewClient(zkquorum string, options ...Option) Client {
  85. return newClient(zkquorum, options...)
  86. }
  87. func newClient(zkquorum string, options ...Option) *client {
  88. log.WithFields(log.Fields{
  89. "Host": zkquorum,
  90. }).Debug("Creating new client.")
  91. c := &client{
  92. clientType: standardClient,
  93. regions: keyRegionCache{regions: b.TreeNew(region.CompareGeneric)},
  94. clients: clientRegionCache{
  95. regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}),
  96. },
  97. rpcQueueSize: defaultRPCQueueSize,
  98. flushInterval: defaultFlushInterval,
  99. metaRegionInfo: region.NewInfo(
  100. 0,
  101. []byte("hbase"),
  102. []byte("meta"),
  103. []byte("hbase:meta,,1"),
  104. nil,
  105. nil),
  106. zkRoot: defaultZkRoot,
  107. zkTimeout: defaultZkTimeout,
  108. effectiveUser: defaultEffectiveUser,
  109. metaLookupLimiter: rate.NewLimiter(metaLimit, metaBurst),
  110. regionLookupTimeout: region.DefaultLookupTimeout,
  111. regionReadTimeout: region.DefaultReadTimeout,
  112. done: make(chan struct{}),
  113. }
  114. for _, option := range options {
  115. option(c)
  116. }
  117. //Have to create the zkClient after the Options have been set
  118. //since the zkTimeout could be changed as an option
  119. c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
  120. return c
  121. }
  122. // RpcQueueSize will return an option that will set the size of the RPC queues
  123. // used in a given client
  124. func RpcQueueSize(size int) Option {
  125. return func(c *client) {
  126. c.rpcQueueSize = size
  127. }
  128. }
  129. // ZookeeperRoot will return an option that will set the zookeeper root path used in a given client.
  130. func ZookeeperRoot(root string) Option {
  131. return func(c *client) {
  132. c.zkRoot = root
  133. }
  134. }
  135. // ZookeeperTimeout will return an option that will set the zookeeper session timeout.
  136. func ZookeeperTimeout(to time.Duration) Option {
  137. return func(c *client) {
  138. c.zkTimeout = to
  139. }
  140. }
  141. // RegionLookupTimeout will return an option that sets the region lookup timeout
  142. func RegionLookupTimeout(to time.Duration) Option {
  143. return func(c *client) {
  144. c.regionLookupTimeout = to
  145. }
  146. }
  147. // RegionReadTimeout will return an option that sets the region read timeout
  148. func RegionReadTimeout(to time.Duration) Option {
  149. return func(c *client) {
  150. c.regionReadTimeout = to
  151. }
  152. }
  153. // EffectiveUser will return an option that will set the user used when accessing regions.
  154. func EffectiveUser(user string) Option {
  155. return func(c *client) {
  156. c.effectiveUser = user
  157. }
  158. }
  159. // FlushInterval will return an option that will set the timeout for flushing
  160. // the RPC queues used in a given client
  161. func FlushInterval(interval time.Duration) Option {
  162. return func(c *client) {
  163. c.flushInterval = interval
  164. }
  165. }
  166. // Close closes connections to hbase master and regionservers
  167. func (c *client) Close() {
  168. c.closeOnce.Do(func() {
  169. close(c.done)
  170. if c.clientType == adminClient {
  171. if ac := c.adminRegionInfo.Client(); ac != nil {
  172. ac.Close()
  173. }
  174. }
  175. c.clients.closeAll()
  176. })
  177. }
  178. func (c *client) Scan(s *hrpc.Scan) hrpc.Scanner {
  179. return newScanner(c, s)
  180. }
  181. func (c *client) Get(g *hrpc.Get) (*hrpc.Result, error) {
  182. pbmsg, err := c.SendRPC(g)
  183. if err != nil {
  184. return nil, err
  185. }
  186. r, ok := pbmsg.(*pb.GetResponse)
  187. if !ok {
  188. return nil, fmt.Errorf("sendRPC returned not a GetResponse")
  189. }
  190. return hrpc.ToLocalResult(r.Result), nil
  191. }
  192. func (c *client) Put(p *hrpc.Mutate) (*hrpc.Result, error) {
  193. return c.mutate(p)
  194. }
  195. func (c *client) Delete(d *hrpc.Mutate) (*hrpc.Result, error) {
  196. return c.mutate(d)
  197. }
  198. func (c *client) Append(a *hrpc.Mutate) (*hrpc.Result, error) {
  199. return c.mutate(a)
  200. }
  201. func (c *client) Increment(i *hrpc.Mutate) (int64, error) {
  202. r, err := c.mutate(i)
  203. if err != nil {
  204. return 0, err
  205. }
  206. if len(r.Cells) != 1 {
  207. return 0, fmt.Errorf("increment returned %d cells, but we expected exactly one",
  208. len(r.Cells))
  209. }
  210. val := binary.BigEndian.Uint64(r.Cells[0].Value)
  211. return int64(val), nil
  212. }
  213. func (c *client) mutate(m *hrpc.Mutate) (*hrpc.Result, error) {
  214. pbmsg, err := c.SendRPC(m)
  215. if err != nil {
  216. return nil, err
  217. }
  218. r, ok := pbmsg.(*pb.MutateResponse)
  219. if !ok {
  220. return nil, fmt.Errorf("sendRPC returned not a MutateResponse")
  221. }
  222. return hrpc.ToLocalResult(r.Result), nil
  223. }
  224. func (c *client) CheckAndPut(p *hrpc.Mutate, family string,
  225. qualifier string, expectedValue []byte) (bool, error) {
  226. cas, err := hrpc.NewCheckAndPut(p, family, qualifier, expectedValue)
  227. if err != nil {
  228. return false, err
  229. }
  230. pbmsg, err := c.SendRPC(cas)
  231. if err != nil {
  232. return false, err
  233. }
  234. r, ok := pbmsg.(*pb.MutateResponse)
  235. if !ok {
  236. return false, fmt.Errorf("sendRPC returned a %T instead of MutateResponse", pbmsg)
  237. }
  238. if r.Processed == nil {
  239. return false, fmt.Errorf("protobuf in the response didn't contain the field "+
  240. "indicating whether the CheckAndPut was successful or not: %s", r)
  241. }
  242. return r.GetProcessed(), nil
  243. }