scan.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package hrpc
  6. import (
  7. "context"
  8. "errors"
  9. "fmt"
  10. "math"
  11. "github.com/golang/protobuf/proto"
  12. "github.com/tsuna/gohbase/pb"
  13. )
  14. const (
  15. // DefaultMaxVersions defualt value for maximum versions to return for scan queries
  16. DefaultMaxVersions uint32 = 1
  17. // MinTimestamp default value for minimum timestamp for scan queries
  18. MinTimestamp uint64 = 0
  19. // MaxTimestamp default value for maximum timestamp for scan queries
  20. MaxTimestamp = math.MaxUint64
  21. // DefaultMaxResultSize Maximum number of bytes fetched when calling a scanner's
  22. // next method. The default value is 2MB, which is good for 1ge networks.
  23. // With faster and/or high latency networks this value should be increased.
  24. DefaultMaxResultSize = 2097152
  25. // DefaultNumberOfRows is default maximum number of rows fetched by scanner
  26. DefaultNumberOfRows = math.MaxInt32
  27. // DefaultMaxResultsPerColumnFamily is the default max number of cells fetched
  28. // per column family for each row
  29. DefaultMaxResultsPerColumnFamily = math.MaxInt32
  30. )
  31. // Scanner is used to read data sequentially from HBase.
  32. // Scanner will be automatically closed if there's no more data to read,
  33. // otherwise Close method should be called.
  34. type Scanner interface {
  35. // Next returns a row at a time.
  36. // Once all rows are returned, subsequent calls will return io.EOF error.
  37. //
  38. // In case of an error, only the first call to Next() will return partial
  39. // result (could be not a complete row) and the actual error,
  40. // the subsequent calls will return io.EOF error.
  41. Next() (*Result, error)
  42. // Close should be called if it is desired to stop scanning before getting all of results.
  43. // If you call Next() after calling Close() you might still get buffered results.
  44. // Othwerwise, in case all results have been delivered or in case of an error, the Scanner
  45. // will be closed automatically.
  46. Close() error
  47. }
  48. // Scan represents a scanner on an HBase table.
  49. type Scan struct {
  50. base
  51. baseQuery
  52. startRow []byte
  53. stopRow []byte
  54. scannerID uint64
  55. maxResultSize uint64
  56. numberOfRows uint32
  57. reversed bool
  58. closeScanner bool
  59. allowPartialResults bool
  60. }
  61. // baseScan returns a Scan struct with default values set.
  62. func baseScan(ctx context.Context, table []byte,
  63. options ...func(Call) error) (*Scan, error) {
  64. s := &Scan{
  65. base: base{
  66. table: table,
  67. ctx: ctx,
  68. resultch: make(chan RPCResult, 1),
  69. },
  70. baseQuery: newBaseQuery(),
  71. scannerID: math.MaxUint64,
  72. maxResultSize: DefaultMaxResultSize,
  73. numberOfRows: DefaultNumberOfRows,
  74. reversed: false,
  75. }
  76. err := applyOptions(s, options...)
  77. if err != nil {
  78. return nil, err
  79. }
  80. return s, nil
  81. }
  82. func (s *Scan) String() string {
  83. return fmt.Sprintf("Scan{Table=%q StartRow=%q StopRow=%q TimeRange=(%d, %d) "+
  84. "MaxVersions=%d NumberOfRows=%d MaxResultSize=%d Familes=%v Filter=%v "+
  85. "StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v}",
  86. s.table, s.startRow, s.stopRow, s.fromTimestamp, s.toTimestamp,
  87. s.maxVersions, s.numberOfRows, s.maxResultSize, s.families, s.filter,
  88. s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner)
  89. }
  90. // NewScan creates a scanner for the given table.
  91. func NewScan(ctx context.Context, table []byte, options ...func(Call) error) (*Scan, error) {
  92. return baseScan(ctx, table, options...)
  93. }
  94. // NewScanRange creates a scanner for the given table and key range.
  95. // The range is half-open, i.e. [startRow; stopRow[ -- stopRow is not
  96. // included in the range.
  97. func NewScanRange(ctx context.Context, table, startRow, stopRow []byte,
  98. options ...func(Call) error) (*Scan, error) {
  99. scan, err := baseScan(ctx, table, options...)
  100. if err != nil {
  101. return nil, err
  102. }
  103. scan.startRow = startRow
  104. scan.stopRow = stopRow
  105. scan.key = startRow
  106. return scan, nil
  107. }
  108. // NewScanStr creates a scanner for the given table.
  109. func NewScanStr(ctx context.Context, table string, options ...func(Call) error) (*Scan, error) {
  110. return NewScan(ctx, []byte(table), options...)
  111. }
  112. // NewScanRangeStr creates a scanner for the given table and key range.
  113. // The range is half-open, i.e. [startRow; stopRow[ -- stopRow is not
  114. // included in the range.
  115. func NewScanRangeStr(ctx context.Context, table, startRow, stopRow string,
  116. options ...func(Call) error) (*Scan, error) {
  117. return NewScanRange(ctx, []byte(table), []byte(startRow), []byte(stopRow), options...)
  118. }
  119. // Name returns the name of this RPC call.
  120. func (s *Scan) Name() string {
  121. return "Scan"
  122. }
  123. // StopRow returns the end key (exclusive) of this scanner.
  124. func (s *Scan) StopRow() []byte {
  125. return s.stopRow
  126. }
  127. // StartRow returns the start key (inclusive) of this scanner.
  128. func (s *Scan) StartRow() []byte {
  129. return s.startRow
  130. }
  131. // IsClosing returns wether this scan closes scanner prematurely
  132. func (s *Scan) IsClosing() bool {
  133. return s.closeScanner
  134. }
  135. // AllowPartialResults returns true if client handles partials.
  136. func (s *Scan) AllowPartialResults() bool {
  137. return s.allowPartialResults
  138. }
  139. // Reversed returns true if scanner scans in reverse.
  140. func (s *Scan) Reversed() bool {
  141. return s.reversed
  142. }
  143. // NumberOfRows returns how many rows this scan
  144. // fetches from regionserver in a single response.
  145. func (s *Scan) NumberOfRows() uint32 {
  146. return s.numberOfRows
  147. }
  148. // ToProto converts this Scan into a protobuf message
  149. func (s *Scan) ToProto() proto.Message {
  150. scan := &pb.ScanRequest{
  151. Region: s.regionSpecifier(),
  152. CloseScanner: &s.closeScanner,
  153. NumberOfRows: &s.numberOfRows,
  154. // tell server that we can process results that are only part of a row
  155. ClientHandlesPartials: proto.Bool(true),
  156. // tell server that we "handle" heartbeats by ignoring them
  157. // since we don't really time out our scans (unless context was cancelled)
  158. ClientHandlesHeartbeats: proto.Bool(true),
  159. }
  160. if s.scannerID != math.MaxUint64 {
  161. scan.ScannerId = &s.scannerID
  162. return scan
  163. }
  164. scan.Scan = &pb.Scan{
  165. Column: familiesToColumn(s.families),
  166. StartRow: s.startRow,
  167. StopRow: s.stopRow,
  168. TimeRange: &pb.TimeRange{},
  169. MaxResultSize: &s.maxResultSize,
  170. }
  171. if s.maxVersions != DefaultMaxVersions {
  172. scan.Scan.MaxVersions = &s.maxVersions
  173. }
  174. /* added support for limit number of cells per row */
  175. if s.storeLimit != DefaultMaxResultsPerColumnFamily {
  176. scan.Scan.StoreLimit = &s.storeLimit
  177. }
  178. if s.storeOffset != 0 {
  179. scan.Scan.StoreOffset = &s.storeOffset
  180. }
  181. if s.fromTimestamp != MinTimestamp {
  182. scan.Scan.TimeRange.From = &s.fromTimestamp
  183. }
  184. if s.toTimestamp != MaxTimestamp {
  185. scan.Scan.TimeRange.To = &s.toTimestamp
  186. }
  187. if s.reversed {
  188. scan.Scan.Reversed = &s.reversed
  189. }
  190. scan.Scan.Filter = s.filter
  191. return scan
  192. }
  193. // NewResponse creates an empty protobuf message to read the response
  194. // of this RPC.
  195. func (s *Scan) NewResponse() proto.Message {
  196. return &pb.ScanResponse{}
  197. }
  198. // DeserializeCellBlocks deserializes scan results from cell blocks
  199. func (s *Scan) DeserializeCellBlocks(m proto.Message, b []byte) (uint32, error) {
  200. scanResp := m.(*pb.ScanResponse)
  201. partials := scanResp.GetPartialFlagPerResult()
  202. scanResp.Results = make([]*pb.Result, len(partials))
  203. var readLen uint32
  204. for i, numCells := range scanResp.GetCellsPerResult() {
  205. cells, l, err := deserializeCellBlocks(b[readLen:], numCells)
  206. if err != nil {
  207. return 0, err
  208. }
  209. scanResp.Results[i] = &pb.Result{
  210. Cell: cells,
  211. Partial: proto.Bool(partials[i]),
  212. }
  213. readLen += l
  214. }
  215. return readLen, nil
  216. }
  217. // ScannerID is an option for scan requests.
  218. // This is an internal option to fetch the next set of results for an ongoing scan.
  219. func ScannerID(id uint64) func(Call) error {
  220. return func(s Call) error {
  221. scan, ok := s.(*Scan)
  222. if !ok {
  223. return errors.New("'ScannerID' option can only be used with Scan queries")
  224. }
  225. scan.scannerID = id
  226. return nil
  227. }
  228. }
  229. // CloseScanner is an option for scan requests.
  230. // Closes scanner after the first result is returned. This is an internal option
  231. // but could be useful if you know that your scan result fits into one response
  232. // in order to save an extra request.
  233. func CloseScanner() func(Call) error {
  234. return func(s Call) error {
  235. scan, ok := s.(*Scan)
  236. if !ok {
  237. return errors.New("'Close' option can only be used with Scan queries")
  238. }
  239. scan.closeScanner = true
  240. return nil
  241. }
  242. }
  243. // MaxResultSize is an option for scan requests.
  244. // Maximum number of bytes fetched when calling a scanner's next method.
  245. // MaxResultSize takes priority over NumberOfRows.
  246. func MaxResultSize(n uint64) func(Call) error {
  247. return func(g Call) error {
  248. scan, ok := g.(*Scan)
  249. if !ok {
  250. return errors.New("'MaxResultSize' option can only be used with Scan queries")
  251. }
  252. if n == 0 {
  253. return errors.New("'MaxResultSize' option must be greater than 0")
  254. }
  255. scan.maxResultSize = n
  256. return nil
  257. }
  258. }
  259. // NumberOfRows is an option for scan requests.
  260. // Specifies how many rows are fetched with each request to regionserver.
  261. // Should be > 0, avoid extremely low values such as 1 because a request
  262. // to regionserver will be made for every row.
  263. func NumberOfRows(n uint32) func(Call) error {
  264. return func(g Call) error {
  265. scan, ok := g.(*Scan)
  266. if !ok {
  267. return errors.New("'NumberOfRows' option can only be used with Scan queries")
  268. }
  269. scan.numberOfRows = n
  270. return nil
  271. }
  272. }
  273. // AllowPartialResults is an option for scan requests.
  274. // This option should be provided if the client has really big rows and
  275. // wants to avoid OOM errors on her side. With this option provided, Next()
  276. // will return partial rows.
  277. func AllowPartialResults() func(Call) error {
  278. return func(g Call) error {
  279. scan, ok := g.(*Scan)
  280. if !ok {
  281. return errors.New("'AllowPartialResults' option can only be used with Scan queries")
  282. }
  283. scan.allowPartialResults = true
  284. return nil
  285. }
  286. }
  287. // Reversed is a Scan-only option which allows you to scan in reverse key order
  288. // To use it the startKey would be greater than the end key
  289. func Reversed() func(Call) error {
  290. return func(g Call) error {
  291. scan, ok := g.(*Scan)
  292. if !ok {
  293. return errors.New("'Reversed' option can only be used with Scan queries")
  294. }
  295. scan.reversed = true
  296. return nil
  297. }
  298. }