scanner.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Copyright (C) 2017 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package gohbase
  6. import (
  7. "bytes"
  8. "context"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "math"
  13. "github.com/golang/protobuf/proto"
  14. "github.com/tsuna/gohbase/hrpc"
  15. "github.com/tsuna/gohbase/pb"
  16. )
  17. const noScannerID = math.MaxUint64
  18. // rowPadding used to pad the row key when constructing a row before
  19. var rowPadding = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
  20. type scanner struct {
  21. RPCClient
  22. // rpc is original scan query
  23. rpc *hrpc.Scan
  24. // scannerID is the id of scanner on current region
  25. scannerID uint64
  26. // startRow is the start row in the current region
  27. startRow []byte
  28. results []*pb.Result
  29. closed bool
  30. }
  31. func (s *scanner) Close() error {
  32. if s.closed {
  33. return errors.New("scanner has already been closed")
  34. }
  35. s.closed = true
  36. if s.scannerID != noScannerID {
  37. go func() {
  38. // if we are closing in the middle of scanning a region,
  39. // send a close scanner request
  40. // TODO: add a deadline
  41. rpc, err := hrpc.NewScanRange(context.Background(),
  42. s.rpc.Table(), s.startRow, nil,
  43. hrpc.ScannerID(s.scannerID),
  44. hrpc.CloseScanner(),
  45. hrpc.NumberOfRows(0))
  46. if err != nil {
  47. panic(fmt.Sprintf("should not happen: %s", err))
  48. }
  49. // If the request fails, the scanner lease will be expired
  50. // and it will be closed automatically by hbase.
  51. // No need to bother clients about that.
  52. s.SendRPC(rpc)
  53. }()
  54. }
  55. return nil
  56. }
  57. func (s *scanner) fetch() ([]*pb.Result, error) {
  58. // keep looping until we have error, some non-empty result or until close
  59. for {
  60. resp, region, err := s.request()
  61. if err != nil {
  62. s.Close()
  63. return nil, err
  64. }
  65. s.update(resp, region)
  66. if s.shouldClose(resp, region) {
  67. s.Close()
  68. }
  69. if rs := resp.Results; len(rs) > 0 {
  70. return rs, nil
  71. } else if s.closed {
  72. return nil, io.EOF
  73. }
  74. }
  75. }
  76. func (s *scanner) peek() (*pb.Result, error) {
  77. if len(s.results) == 0 {
  78. if s.closed {
  79. // done scanning
  80. return nil, io.EOF
  81. }
  82. rs, err := s.fetch()
  83. if err != nil {
  84. return nil, err
  85. }
  86. // fetch cannot return zero results
  87. s.results = rs
  88. }
  89. return s.results[0], nil
  90. }
  91. func (s *scanner) shift() {
  92. if len(s.results) == 0 {
  93. return
  94. }
  95. // set to nil so that GC isn't blocked to clean up the result
  96. s.results[0] = nil
  97. s.results = s.results[1:]
  98. }
  99. // coalesce combines result with partial if they belong to the same row
  100. // and returns the coalesced result and whether coalescing happened
  101. func (s *scanner) coalesce(result, partial *pb.Result) (*pb.Result, bool) {
  102. if result == nil {
  103. return partial, true
  104. }
  105. if !result.GetPartial() {
  106. // results is not partial, shouldn't coalesce
  107. return result, false
  108. }
  109. if len(partial.Cell) > 0 && !bytes.Equal(result.Cell[0].Row, partial.Cell[0].Row) {
  110. // new row
  111. result.Partial = proto.Bool(false)
  112. return result, false
  113. }
  114. // same row, add the partial
  115. result.Cell = append(result.Cell, partial.Cell...)
  116. if partial.GetStale() {
  117. result.Stale = proto.Bool(partial.GetStale())
  118. }
  119. return result, true
  120. }
  121. func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner {
  122. return &scanner{
  123. RPCClient: c,
  124. rpc: rpc,
  125. startRow: rpc.StartRow(),
  126. scannerID: noScannerID,
  127. }
  128. }
  129. func toLocalResult(r *pb.Result) *hrpc.Result {
  130. if r == nil {
  131. return nil
  132. }
  133. return hrpc.ToLocalResult(r)
  134. }
  135. func (s *scanner) Next() (*hrpc.Result, error) {
  136. var (
  137. result, partial *pb.Result
  138. err error
  139. )
  140. select {
  141. case <-s.rpc.Context().Done():
  142. s.Close()
  143. return nil, s.rpc.Context().Err()
  144. default:
  145. }
  146. if s.rpc.AllowPartialResults() {
  147. // if client handles partials, just return it
  148. result, err := s.peek()
  149. if err != nil {
  150. return nil, err
  151. }
  152. s.shift()
  153. return toLocalResult(result), nil
  154. }
  155. for {
  156. partial, err = s.peek()
  157. if err == io.EOF && result != nil {
  158. // no more results, return what we have. Next call to the Next() will get EOF
  159. result.Partial = proto.Bool(false)
  160. return toLocalResult(result), nil
  161. }
  162. if err != nil {
  163. // return whatever we have so far and the error
  164. return toLocalResult(result), err
  165. }
  166. var done bool
  167. result, done = s.coalesce(result, partial)
  168. if done {
  169. s.shift()
  170. }
  171. if !result.GetPartial() {
  172. // if not partial anymore, return it
  173. return toLocalResult(result), nil
  174. }
  175. }
  176. }
  177. func (s *scanner) request() (*pb.ScanResponse, hrpc.RegionInfo, error) {
  178. var (
  179. rpc *hrpc.Scan
  180. err error
  181. )
  182. if s.scannerID == noScannerID {
  183. // starting to scan on a new region
  184. rpc, err = hrpc.NewScanRange(
  185. s.rpc.Context(),
  186. s.rpc.Table(),
  187. s.startRow,
  188. s.rpc.StopRow(),
  189. s.rpc.Options()...)
  190. } else {
  191. // continuing to scan current region
  192. rpc, err = hrpc.NewScanRange(s.rpc.Context(),
  193. s.rpc.Table(),
  194. s.startRow,
  195. nil,
  196. hrpc.ScannerID(s.scannerID),
  197. hrpc.NumberOfRows(s.rpc.NumberOfRows()))
  198. }
  199. if err != nil {
  200. return nil, nil, err
  201. }
  202. res, err := s.SendRPC(rpc)
  203. if err != nil {
  204. return nil, nil, err
  205. }
  206. scanres, ok := res.(*pb.ScanResponse)
  207. if !ok {
  208. return nil, nil, errors.New("got non-ScanResponse for scan request")
  209. }
  210. return scanres, rpc.Region(), nil
  211. }
  212. // update updates the scanner for the next scan request
  213. func (s *scanner) update(resp *pb.ScanResponse, region hrpc.RegionInfo) {
  214. if resp.GetMoreResultsInRegion() {
  215. if resp.ScannerId != nil {
  216. s.scannerID = resp.GetScannerId()
  217. }
  218. } else {
  219. // we are done with this region, prepare scan for next region
  220. s.scannerID = noScannerID
  221. // Normal Scan
  222. if !s.rpc.Reversed() {
  223. s.startRow = region.StopKey()
  224. return
  225. }
  226. // Reversed Scan
  227. // return if we are at the end
  228. if len(region.StartKey()) == 0 {
  229. s.startRow = region.StartKey()
  230. return
  231. }
  232. // create the nearest value lower than the current region startKey
  233. rsk := region.StartKey()
  234. // if last element is 0x0, just shorten the slice
  235. if rsk[len(rsk)-1] == 0x0 {
  236. s.startRow = rsk[:len(rsk)-1]
  237. return
  238. }
  239. // otherwise lower the last element byte value by 1 and pad with 0xffs
  240. tmp := make([]byte, len(rsk), len(rsk)+len(rowPadding))
  241. copy(tmp, rsk)
  242. tmp[len(tmp)-1] = tmp[len(tmp)-1] - 1
  243. s.startRow = append(tmp, rowPadding...)
  244. }
  245. }
  246. // shouldClose check if this scanner should be closed and should stop fetching new results
  247. func (s *scanner) shouldClose(resp *pb.ScanResponse, region hrpc.RegionInfo) bool {
  248. if resp.MoreResults != nil && !*resp.MoreResults {
  249. // the filter for the whole scan has been exhausted, close the scanner
  250. return true
  251. }
  252. if s.scannerID != noScannerID {
  253. // not done with this region yet
  254. return false
  255. }
  256. // Check to see if this region is the last we should scan because:
  257. // (1) it's the last region
  258. if len(region.StopKey()) == 0 && !s.rpc.Reversed() {
  259. return true
  260. }
  261. if s.rpc.Reversed() && len(region.StartKey()) == 0 {
  262. return true
  263. }
  264. // (3) because its stop_key is greater than or equal to the stop_key of this scanner,
  265. // provided that (2) we're not trying to scan until the end of the table.
  266. if !s.rpc.Reversed() {
  267. return len(s.rpc.StopRow()) != 0 && // (2)
  268. bytes.Compare(s.rpc.StopRow(), region.StopKey()) <= 0 // (3)
  269. }
  270. // Reversed Scanner
  271. return len(s.rpc.StopRow()) != 0 && // (2)
  272. bytes.Compare(s.rpc.StopRow(), region.StartKey()) >= 0 // (3)
  273. }