123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- package hbase
- import (
- "context"
- "io"
- "strings"
- "time"
- "github.com/tsuna/gohbase"
- "github.com/tsuna/gohbase/hrpc"
- "go-common/library/log"
- "go-common/library/net/trace"
- "go-common/library/stat"
- )
- var stats = stat.DB
- const (
- _family = "hbase_client_v2"
- )
- // Client hbase client.
- type Client struct {
- hc gohbase.Client
- addr string
- config *Config
- }
- func (c *Client) setTrace(ctx context.Context, call hrpc.Call, perr *error) func() {
- now := time.Now()
- if t, ok := trace.FromContext(ctx); ok {
- t = t.Fork(_family, call.Name())
- t.SetTag(trace.String(trace.TagAddress, c.addr), trace.String(trace.TagComment, string(call.Table())+"."+string(call.Key())))
- return func() {
- t.Finish(perr)
- stats.Timing("hbase:"+call.Name(), int64(time.Since(now)/time.Millisecond))
- }
- }
- return func() {
- stats.Timing("hbase:"+call.Name(), int64(time.Since(now)/time.Millisecond))
- }
- }
- // NewClient new a hbase client.
- func NewClient(config *Config, options ...gohbase.Option) *Client {
- zkquorum := strings.Join(config.Zookeeper.Addrs, ",")
- if config.Zookeeper.Root != "" {
- options = append(options, gohbase.ZookeeperRoot(config.Zookeeper.Root))
- }
- if config.Zookeeper.Timeout != 0 {
- options = append(options, gohbase.ZookeeperTimeout(time.Duration(config.Zookeeper.Timeout)))
- }
- if config.RPCQueueSize != 0 {
- log.Warn("RPCQueueSize configuration be ignored")
- }
- // force RpcQueueSize = 1, don't change it !!! it has reason (゜-゜)つロ
- options = append(options, gohbase.RpcQueueSize(1))
- if config.FlushInterval != 0 {
- options = append(options, gohbase.FlushInterval(time.Duration(config.FlushInterval)))
- }
- if config.EffectiveUser != "" {
- options = append(options, gohbase.EffectiveUser(config.EffectiveUser))
- }
- if config.RegionLookupTimeout != 0 {
- options = append(options, gohbase.RegionLookupTimeout(time.Duration(config.RegionLookupTimeout)))
- }
- if config.RegionReadTimeout != 0 {
- options = append(options, gohbase.RegionReadTimeout(time.Duration(config.RegionReadTimeout)))
- }
- hc := gohbase.NewClient(zkquorum, options...)
- return &Client{
- hc: hc,
- addr: zkquorum,
- config: config,
- }
- }
- // ScanAll do scan command and return all result
- // NOTE: if err != nil the results is safe for range operate even not result found
- func (c *Client) ScanAll(ctx context.Context, table []byte, options ...func(hrpc.Call) error) (results []*hrpc.Result, err error) {
- cursor, err := c.Scan(ctx, table, options...)
- if err != nil {
- return nil, err
- }
- for {
- result, err := cursor.Next()
- if err != nil {
- if err == io.EOF {
- break
- }
- return nil, err
- }
- results = append(results, result)
- }
- return results, nil
- }
- type scanTrace struct {
- hrpc.Scanner
- err error
- cancelTrace func()
- }
- func (s *scanTrace) Next() (*hrpc.Result, error) {
- var result *hrpc.Result
- result, s.err = s.Scanner.Next()
- if s.err != nil {
- if s.err == io.EOF {
- // reset error for trace
- s.err = nil
- return result, io.EOF
- }
- s.cancelTrace()
- return result, s.err
- }
- return result, s.err
- }
- func (s *scanTrace) Close() error {
- defer s.cancelTrace()
- s.err = s.Scanner.Close()
- return s.err
- }
- // Scan do a scan command.
- func (c *Client) Scan(ctx context.Context, table []byte, options ...func(hrpc.Call) error) (scanner hrpc.Scanner, err error) {
- var scan *hrpc.Scan
- scan, err = hrpc.NewScan(ctx, table, options...)
- if err != nil {
- return nil, err
- }
- scanner = c.hc.Scan(scan)
- st := &scanTrace{
- Scanner: scanner,
- }
- st.cancelTrace = c.setTrace(ctx, scan, &st.err)
- return st, nil
- }
- // ScanStr scan string
- func (c *Client) ScanStr(ctx context.Context, table string, options ...func(hrpc.Call) error) (hrpc.Scanner, error) {
- return c.Scan(ctx, []byte(table), options...)
- }
- // ScanStrAll scan string
- // NOTE: if err != nil the results is safe for range operate even not result found
- func (c *Client) ScanStrAll(ctx context.Context, table string, options ...func(hrpc.Call) error) ([]*hrpc.Result, error) {
- return c.ScanAll(ctx, []byte(table), options...)
- }
- // ScanRange get a scanner for the given table and key range.
- // The range is half-open, i.e. [startRow; stopRow[ -- stopRow is not
- // included in the range.
- func (c *Client) ScanRange(ctx context.Context, table, startRow, stopRow []byte, options ...func(hrpc.Call) error) (scanner hrpc.Scanner, err error) {
- var scan *hrpc.Scan
- scan, err = hrpc.NewScanRange(ctx, table, startRow, stopRow, options...)
- if err != nil {
- return nil, err
- }
- scanner = c.hc.Scan(scan)
- st := &scanTrace{
- Scanner: scanner,
- }
- st.cancelTrace = c.setTrace(ctx, scan, &st.err)
- return st, nil
- }
- // ScanRangeStr get a scanner for the given table and key range.
- // The range is half-open, i.e. [startRow; stopRow[ -- stopRow is not
- // included in the range.
- func (c *Client) ScanRangeStr(ctx context.Context, table, startRow, stopRow string, options ...func(hrpc.Call) error) (hrpc.Scanner, error) {
- return c.ScanRange(ctx, []byte(table), []byte(startRow), []byte(stopRow), options...)
- }
- // Get get result for the given table and row key.
- // NOTE: if err != nil then result != nil, if result not exists result.Cells length is 0
- func (c *Client) Get(ctx context.Context, table, key []byte, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
- var get *hrpc.Get
- get, err = hrpc.NewGet(ctx, table, key, options...)
- if err != nil {
- return nil, err
- }
- defer c.setTrace(ctx, get, &err)()
- return c.hc.Get(get)
- }
- // GetStr do a get command.
- // NOTE: if err != nil then result != nil, if result not exists result.Cells length is 0
- func (c *Client) GetStr(ctx context.Context, table, key string, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
- return c.Get(ctx, []byte(table), []byte(key), options...)
- }
- // PutStr insert the given family-column-values in the given row key of the given table.
- func (c *Client) PutStr(ctx context.Context, table string, key string, values map[string]map[string][]byte, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
- var put *hrpc.Mutate
- put, err = hrpc.NewPutStr(ctx, table, key, values, options...)
- if err != nil {
- return nil, err
- }
- defer c.setTrace(ctx, put, &err)()
- return c.hc.Put(put)
- }
- // Delete is used to perform Delete operations on a single row.
- // To delete entire row, values should be nil.
- //
- // To delete specific families, qualifiers map should be nil:
- // map[string]map[string][]byte{
- // "cf1": nil,
- // "cf2": nil,
- // }
- //
- // To delete specific qualifiers:
- // map[string]map[string][]byte{
- // "cf": map[string][]byte{
- // "q1": nil,
- // "q2": nil,
- // },
- // }
- //
- // To delete all versions before and at a timestamp, pass hrpc.Timestamp() option.
- // By default all versions will be removed.
- //
- // To delete only a specific version at a timestamp, pass hrpc.DeleteOneVersion() option
- // along with a timestamp. For delete specific qualifiers request, if timestamp is not
- // passed, only the latest version will be removed. For delete specific families request,
- // the timestamp should be passed or it will have no effect as it's an expensive
- // operation to perform.
- func (c *Client) Delete(ctx context.Context, table string, key string, values map[string]map[string][]byte, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
- var delete *hrpc.Mutate
- delete, err = hrpc.NewDelStr(ctx, table, key, values, options...)
- defer c.setTrace(ctx, delete, &err)()
- return c.hc.Delete(delete)
- }
- // Append do a append command.
- func (c *Client) Append(ctx context.Context, table string, key string, values map[string]map[string][]byte, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
- var append *hrpc.Mutate
- append, err = hrpc.NewAppStr(ctx, table, key, values, options...)
- defer c.setTrace(ctx, append, &err)()
- return c.hc.Append(append)
- }
- // Increment the given values in HBase under the given table and key.
- func (c *Client) Increment(ctx context.Context, table string, key string, values map[string]map[string][]byte, options ...func(hrpc.Call) error) (result int64, err error) {
- var increment *hrpc.Mutate
- increment, err = hrpc.NewIncStr(ctx, table, key, values, options...)
- if err != nil {
- return 0, err
- }
- defer c.setTrace(ctx, increment, &err)()
- return c.hc.Increment(increment)
- }
- // IncrementSingle increment the given value by amount in HBase under the given table, key, family and qualifier.
- func (c *Client) IncrementSingle(ctx context.Context, table string, key string, family string, qualifier string, amount int64, options ...func(hrpc.Call) error) (result int64, err error) {
- var increment *hrpc.Mutate
- increment, err = hrpc.NewIncStrSingle(ctx, table, key, family, qualifier, amount, options...)
- if err != nil {
- return 0, err
- }
- defer c.setTrace(ctx, increment, &err)()
- return c.hc.Increment(increment)
- }
- // Ping ping.
- func (c *Client) Ping(ctx context.Context) (err error) {
- testRowKey := "test"
- if c.config.TestRowKey != "" {
- testRowKey = c.config.TestRowKey
- }
- values := map[string]map[string][]byte{"test": map[string][]byte{"test": []byte("test")}}
- _, err = c.PutStr(ctx, "test", testRowKey, values)
- return
- }
- // Close close client.
- func (c *Client) Close() error {
- c.hc.Close()
- return nil
- }
|