|
- // Copyright (C) 2017 The GoHBase Authors. All rights reserved.
- // This file is part of GoHBase.
- // Use of this source code is governed by the Apache License 2.0
- // that can be found in the COPYING file.
- package gohbase
- import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "io"
- "math"
- "github.com/golang/protobuf/proto"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/pb"
- )
- const noScannerID = math.MaxUint64
- // rowPadding used to pad the row key when constructing a row before
- var rowPadding = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
- type scanner struct {
- RPCClient
- // rpc is original scan query
- rpc *hrpc.Scan
- // scannerID is the id of scanner on current region
- scannerID uint64
- // startRow is the start row in the current region
- startRow []byte
- results []*pb.Result
- closed bool
- }
- func (s *scanner) Close() error {
- if s.closed {
- return errors.New("scanner has already been closed")
- }
- s.closed = true
- if s.scannerID != noScannerID {
- go func() {
- // if we are closing in the middle of scanning a region,
- // send a close scanner request
- // TODO: add a deadline
- rpc, err := hrpc.NewScanRange(context.Background(),
- s.rpc.Table(), s.startRow, nil,
- hrpc.ScannerID(s.scannerID),
- hrpc.CloseScanner(),
- hrpc.NumberOfRows(0))
- if err != nil {
- panic(fmt.Sprintf("should not happen: %s", err))
- }
- // If the request fails, the scanner lease will be expired
- // and it will be closed automatically by hbase.
- // No need to bother clients about that.
- s.SendRPC(rpc)
- }()
- }
- return nil
- }
- func (s *scanner) fetch() ([]*pb.Result, error) {
- // keep looping until we have error, some non-empty result or until close
- for {
- resp, region, err := s.request()
- if err != nil {
- s.Close()
- return nil, err
- }
- s.update(resp, region)
- if s.shouldClose(resp, region) {
- s.Close()
- }
- if rs := resp.Results; len(rs) > 0 {
- return rs, nil
- } else if s.closed {
- return nil, io.EOF
- }
- }
- }
- func (s *scanner) peek() (*pb.Result, error) {
- if len(s.results) == 0 {
- if s.closed {
- // done scanning
- return nil, io.EOF
- }
- rs, err := s.fetch()
- if err != nil {
- return nil, err
- }
- // fetch cannot return zero results
- s.results = rs
- }
- return s.results[0], nil
- }
- func (s *scanner) shift() {
- if len(s.results) == 0 {
- return
- }
- // set to nil so that GC isn't blocked to clean up the result
- s.results[0] = nil
- s.results = s.results[1:]
- }
- // coalesce combines result with partial if they belong to the same row
- // and returns the coalesced result and whether coalescing happened
- func (s *scanner) coalesce(result, partial *pb.Result) (*pb.Result, bool) {
- if result == nil {
- return partial, true
- }
- if !result.GetPartial() {
- // results is not partial, shouldn't coalesce
- return result, false
- }
- if len(partial.Cell) > 0 && !bytes.Equal(result.Cell[0].Row, partial.Cell[0].Row) {
- // new row
- result.Partial = proto.Bool(false)
- return result, false
- }
- // same row, add the partial
- result.Cell = append(result.Cell, partial.Cell...)
- if partial.GetStale() {
- result.Stale = proto.Bool(partial.GetStale())
- }
- return result, true
- }
- func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner {
- return &scanner{
- RPCClient: c,
- rpc: rpc,
- startRow: rpc.StartRow(),
- scannerID: noScannerID,
- }
- }
- func toLocalResult(r *pb.Result) *hrpc.Result {
- if r == nil {
- return nil
- }
- return hrpc.ToLocalResult(r)
- }
- func (s *scanner) Next() (*hrpc.Result, error) {
- var (
- result, partial *pb.Result
- err error
- )
- select {
- case <-s.rpc.Context().Done():
- s.Close()
- return nil, s.rpc.Context().Err()
- default:
- }
- if s.rpc.AllowPartialResults() {
- // if client handles partials, just return it
- result, err := s.peek()
- if err != nil {
- return nil, err
- }
- s.shift()
- return toLocalResult(result), nil
- }
- for {
- partial, err = s.peek()
- if err == io.EOF && result != nil {
- // no more results, return what we have. Next call to the Next() will get EOF
- result.Partial = proto.Bool(false)
- return toLocalResult(result), nil
- }
- if err != nil {
- // return whatever we have so far and the error
- return toLocalResult(result), err
- }
- var done bool
- result, done = s.coalesce(result, partial)
- if done {
- s.shift()
- }
- if !result.GetPartial() {
- // if not partial anymore, return it
- return toLocalResult(result), nil
- }
- }
- }
- func (s *scanner) request() (*pb.ScanResponse, hrpc.RegionInfo, error) {
- var (
- rpc *hrpc.Scan
- err error
- )
- if s.scannerID == noScannerID {
- // starting to scan on a new region
- rpc, err = hrpc.NewScanRange(
- s.rpc.Context(),
- s.rpc.Table(),
- s.startRow,
- s.rpc.StopRow(),
- s.rpc.Options()...)
- } else {
- // continuing to scan current region
- rpc, err = hrpc.NewScanRange(s.rpc.Context(),
- s.rpc.Table(),
- s.startRow,
- nil,
- hrpc.ScannerID(s.scannerID),
- hrpc.NumberOfRows(s.rpc.NumberOfRows()))
- }
- if err != nil {
- return nil, nil, err
- }
- res, err := s.SendRPC(rpc)
- if err != nil {
- return nil, nil, err
- }
- scanres, ok := res.(*pb.ScanResponse)
- if !ok {
- return nil, nil, errors.New("got non-ScanResponse for scan request")
- }
- return scanres, rpc.Region(), nil
- }
- // update updates the scanner for the next scan request
- func (s *scanner) update(resp *pb.ScanResponse, region hrpc.RegionInfo) {
- if resp.GetMoreResultsInRegion() {
- if resp.ScannerId != nil {
- s.scannerID = resp.GetScannerId()
- }
- } else {
- // we are done with this region, prepare scan for next region
- s.scannerID = noScannerID
- // Normal Scan
- if !s.rpc.Reversed() {
- s.startRow = region.StopKey()
- return
- }
- // Reversed Scan
- // return if we are at the end
- if len(region.StartKey()) == 0 {
- s.startRow = region.StartKey()
- return
- }
- // create the nearest value lower than the current region startKey
- rsk := region.StartKey()
- // if last element is 0x0, just shorten the slice
- if rsk[len(rsk)-1] == 0x0 {
- s.startRow = rsk[:len(rsk)-1]
- return
- }
- // otherwise lower the last element byte value by 1 and pad with 0xffs
- tmp := make([]byte, len(rsk), len(rsk)+len(rowPadding))
- copy(tmp, rsk)
- tmp[len(tmp)-1] = tmp[len(tmp)-1] - 1
- s.startRow = append(tmp, rowPadding...)
- }
- }
- // shouldClose check if this scanner should be closed and should stop fetching new results
- func (s *scanner) shouldClose(resp *pb.ScanResponse, region hrpc.RegionInfo) bool {
- if resp.MoreResults != nil && !*resp.MoreResults {
- // the filter for the whole scan has been exhausted, close the scanner
- return true
- }
- if s.scannerID != noScannerID {
- // not done with this region yet
- return false
- }
- // Check to see if this region is the last we should scan because:
- // (1) it's the last region
- if len(region.StopKey()) == 0 && !s.rpc.Reversed() {
- return true
- }
- if s.rpc.Reversed() && len(region.StartKey()) == 0 {
- return true
- }
- // (3) because its stop_key is greater than or equal to the stop_key of this scanner,
- // provided that (2) we're not trying to scan until the end of the table.
- if !s.rpc.Reversed() {
- return len(s.rpc.StopRow()) != 0 && // (2)
- bytes.Compare(s.rpc.StopRow(), region.StopKey()) <= 0 // (3)
- }
- // Reversed Scanner
- return len(s.rpc.StopRow()) != 0 && // (2)
- bytes.Compare(s.rpc.StopRow(), region.StartKey()) >= 0 // (3)
- }
|