hbase.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package hbase
  2. import (
  3. "context"
  4. "io"
  5. "strings"
  6. "time"
  7. "github.com/tsuna/gohbase"
  8. "github.com/tsuna/gohbase/hrpc"
  9. "go-common/library/log"
  10. "go-common/library/net/trace"
  11. "go-common/library/stat"
  12. )
  13. var stats = stat.DB
  14. const (
  15. _family = "hbase_client_v2"
  16. )
  17. // Client hbase client.
  18. type Client struct {
  19. hc gohbase.Client
  20. addr string
  21. config *Config
  22. }
  23. func (c *Client) setTrace(ctx context.Context, call hrpc.Call, perr *error) func() {
  24. now := time.Now()
  25. if t, ok := trace.FromContext(ctx); ok {
  26. t = t.Fork(_family, call.Name())
  27. t.SetTag(trace.String(trace.TagAddress, c.addr), trace.String(trace.TagComment, string(call.Table())+"."+string(call.Key())))
  28. return func() {
  29. t.Finish(perr)
  30. stats.Timing("hbase:"+call.Name(), int64(time.Since(now)/time.Millisecond))
  31. }
  32. }
  33. return func() {
  34. stats.Timing("hbase:"+call.Name(), int64(time.Since(now)/time.Millisecond))
  35. }
  36. }
  37. // NewClient new a hbase client.
  38. func NewClient(config *Config, options ...gohbase.Option) *Client {
  39. zkquorum := strings.Join(config.Zookeeper.Addrs, ",")
  40. if config.Zookeeper.Root != "" {
  41. options = append(options, gohbase.ZookeeperRoot(config.Zookeeper.Root))
  42. }
  43. if config.Zookeeper.Timeout != 0 {
  44. options = append(options, gohbase.ZookeeperTimeout(time.Duration(config.Zookeeper.Timeout)))
  45. }
  46. if config.RPCQueueSize != 0 {
  47. log.Warn("RPCQueueSize configuration be ignored")
  48. }
  49. // force RpcQueueSize = 1, don't change it !!! it has reason (゜-゜)つロ
  50. options = append(options, gohbase.RpcQueueSize(1))
  51. if config.FlushInterval != 0 {
  52. options = append(options, gohbase.FlushInterval(time.Duration(config.FlushInterval)))
  53. }
  54. if config.EffectiveUser != "" {
  55. options = append(options, gohbase.EffectiveUser(config.EffectiveUser))
  56. }
  57. if config.RegionLookupTimeout != 0 {
  58. options = append(options, gohbase.RegionLookupTimeout(time.Duration(config.RegionLookupTimeout)))
  59. }
  60. if config.RegionReadTimeout != 0 {
  61. options = append(options, gohbase.RegionReadTimeout(time.Duration(config.RegionReadTimeout)))
  62. }
  63. hc := gohbase.NewClient(zkquorum, options...)
  64. return &Client{
  65. hc: hc,
  66. addr: zkquorum,
  67. config: config,
  68. }
  69. }
  70. // ScanAll do scan command and return all result
  71. // NOTE: if err != nil the results is safe for range operate even not result found
  72. func (c *Client) ScanAll(ctx context.Context, table []byte, options ...func(hrpc.Call) error) (results []*hrpc.Result, err error) {
  73. cursor, err := c.Scan(ctx, table, options...)
  74. if err != nil {
  75. return nil, err
  76. }
  77. for {
  78. result, err := cursor.Next()
  79. if err != nil {
  80. if err == io.EOF {
  81. break
  82. }
  83. return nil, err
  84. }
  85. results = append(results, result)
  86. }
  87. return results, nil
  88. }
  89. type scanTrace struct {
  90. hrpc.Scanner
  91. err error
  92. cancelTrace func()
  93. }
  94. func (s *scanTrace) Next() (*hrpc.Result, error) {
  95. var result *hrpc.Result
  96. result, s.err = s.Scanner.Next()
  97. if s.err != nil {
  98. if s.err == io.EOF {
  99. // reset error for trace
  100. s.err = nil
  101. return result, io.EOF
  102. }
  103. s.cancelTrace()
  104. return result, s.err
  105. }
  106. return result, s.err
  107. }
  108. func (s *scanTrace) Close() error {
  109. defer s.cancelTrace()
  110. s.err = s.Scanner.Close()
  111. return s.err
  112. }
  113. // Scan do a scan command.
  114. func (c *Client) Scan(ctx context.Context, table []byte, options ...func(hrpc.Call) error) (scanner hrpc.Scanner, err error) {
  115. var scan *hrpc.Scan
  116. scan, err = hrpc.NewScan(ctx, table, options...)
  117. if err != nil {
  118. return nil, err
  119. }
  120. scanner = c.hc.Scan(scan)
  121. st := &scanTrace{
  122. Scanner: scanner,
  123. }
  124. st.cancelTrace = c.setTrace(ctx, scan, &st.err)
  125. return st, nil
  126. }
  127. // ScanStr scan string
  128. func (c *Client) ScanStr(ctx context.Context, table string, options ...func(hrpc.Call) error) (hrpc.Scanner, error) {
  129. return c.Scan(ctx, []byte(table), options...)
  130. }
  131. // ScanStrAll scan string
  132. // NOTE: if err != nil the results is safe for range operate even not result found
  133. func (c *Client) ScanStrAll(ctx context.Context, table string, options ...func(hrpc.Call) error) ([]*hrpc.Result, error) {
  134. return c.ScanAll(ctx, []byte(table), options...)
  135. }
  136. // ScanRange get a scanner for the given table and key range.
  137. // The range is half-open, i.e. [startRow; stopRow[ -- stopRow is not
  138. // included in the range.
  139. func (c *Client) ScanRange(ctx context.Context, table, startRow, stopRow []byte, options ...func(hrpc.Call) error) (scanner hrpc.Scanner, err error) {
  140. var scan *hrpc.Scan
  141. scan, err = hrpc.NewScanRange(ctx, table, startRow, stopRow, options...)
  142. if err != nil {
  143. return nil, err
  144. }
  145. scanner = c.hc.Scan(scan)
  146. st := &scanTrace{
  147. Scanner: scanner,
  148. }
  149. st.cancelTrace = c.setTrace(ctx, scan, &st.err)
  150. return st, nil
  151. }
  152. // ScanRangeStr get a scanner for the given table and key range.
  153. // The range is half-open, i.e. [startRow; stopRow[ -- stopRow is not
  154. // included in the range.
  155. func (c *Client) ScanRangeStr(ctx context.Context, table, startRow, stopRow string, options ...func(hrpc.Call) error) (hrpc.Scanner, error) {
  156. return c.ScanRange(ctx, []byte(table), []byte(startRow), []byte(stopRow), options...)
  157. }
  158. // Get get result for the given table and row key.
  159. // NOTE: if err != nil then result != nil, if result not exists result.Cells length is 0
  160. func (c *Client) Get(ctx context.Context, table, key []byte, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
  161. var get *hrpc.Get
  162. get, err = hrpc.NewGet(ctx, table, key, options...)
  163. if err != nil {
  164. return nil, err
  165. }
  166. defer c.setTrace(ctx, get, &err)()
  167. return c.hc.Get(get)
  168. }
  169. // GetStr do a get command.
  170. // NOTE: if err != nil then result != nil, if result not exists result.Cells length is 0
  171. func (c *Client) GetStr(ctx context.Context, table, key string, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
  172. return c.Get(ctx, []byte(table), []byte(key), options...)
  173. }
  174. // PutStr insert the given family-column-values in the given row key of the given table.
  175. func (c *Client) PutStr(ctx context.Context, table string, key string, values map[string]map[string][]byte, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
  176. var put *hrpc.Mutate
  177. put, err = hrpc.NewPutStr(ctx, table, key, values, options...)
  178. if err != nil {
  179. return nil, err
  180. }
  181. defer c.setTrace(ctx, put, &err)()
  182. return c.hc.Put(put)
  183. }
  184. // Delete is used to perform Delete operations on a single row.
  185. // To delete entire row, values should be nil.
  186. //
  187. // To delete specific families, qualifiers map should be nil:
  188. // map[string]map[string][]byte{
  189. // "cf1": nil,
  190. // "cf2": nil,
  191. // }
  192. //
  193. // To delete specific qualifiers:
  194. // map[string]map[string][]byte{
  195. // "cf": map[string][]byte{
  196. // "q1": nil,
  197. // "q2": nil,
  198. // },
  199. // }
  200. //
  201. // To delete all versions before and at a timestamp, pass hrpc.Timestamp() option.
  202. // By default all versions will be removed.
  203. //
  204. // To delete only a specific version at a timestamp, pass hrpc.DeleteOneVersion() option
  205. // along with a timestamp. For delete specific qualifiers request, if timestamp is not
  206. // passed, only the latest version will be removed. For delete specific families request,
  207. // the timestamp should be passed or it will have no effect as it's an expensive
  208. // operation to perform.
  209. func (c *Client) Delete(ctx context.Context, table string, key string, values map[string]map[string][]byte, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
  210. var delete *hrpc.Mutate
  211. delete, err = hrpc.NewDelStr(ctx, table, key, values, options...)
  212. defer c.setTrace(ctx, delete, &err)()
  213. return c.hc.Delete(delete)
  214. }
  215. // Append do a append command.
  216. func (c *Client) Append(ctx context.Context, table string, key string, values map[string]map[string][]byte, options ...func(hrpc.Call) error) (result *hrpc.Result, err error) {
  217. var append *hrpc.Mutate
  218. append, err = hrpc.NewAppStr(ctx, table, key, values, options...)
  219. defer c.setTrace(ctx, append, &err)()
  220. return c.hc.Append(append)
  221. }
  222. // Increment the given values in HBase under the given table and key.
  223. func (c *Client) Increment(ctx context.Context, table string, key string, values map[string]map[string][]byte, options ...func(hrpc.Call) error) (result int64, err error) {
  224. var increment *hrpc.Mutate
  225. increment, err = hrpc.NewIncStr(ctx, table, key, values, options...)
  226. if err != nil {
  227. return 0, err
  228. }
  229. defer c.setTrace(ctx, increment, &err)()
  230. return c.hc.Increment(increment)
  231. }
  232. // IncrementSingle increment the given value by amount in HBase under the given table, key, family and qualifier.
  233. func (c *Client) IncrementSingle(ctx context.Context, table string, key string, family string, qualifier string, amount int64, options ...func(hrpc.Call) error) (result int64, err error) {
  234. var increment *hrpc.Mutate
  235. increment, err = hrpc.NewIncStrSingle(ctx, table, key, family, qualifier, amount, options...)
  236. if err != nil {
  237. return 0, err
  238. }
  239. defer c.setTrace(ctx, increment, &err)()
  240. return c.hc.Increment(increment)
  241. }
  242. // Ping ping.
  243. func (c *Client) Ping(ctx context.Context) (err error) {
  244. testRowKey := "test"
  245. if c.config.TestRowKey != "" {
  246. testRowKey = c.config.TestRowKey
  247. }
  248. values := map[string]map[string][]byte{"test": map[string][]byte{"test": []byte("test")}}
  249. _, err = c.PutStr(ctx, "test", testRowKey, values)
  250. return
  251. }
  252. // Close close client.
  253. func (c *Client) Close() error {
  254. c.hc.Close()
  255. return nil
  256. }