123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- package hrpc
- import (
- "context"
- "errors"
- "fmt"
- "math"
- "github.com/golang/protobuf/proto"
- "github.com/tsuna/gohbase/pb"
- )
- const (
-
- DefaultMaxVersions uint32 = 1
-
- MinTimestamp uint64 = 0
-
- MaxTimestamp = math.MaxUint64
-
-
-
- DefaultMaxResultSize = 2097152
-
- DefaultNumberOfRows = math.MaxInt32
-
-
- DefaultMaxResultsPerColumnFamily = math.MaxInt32
- )
- type Scanner interface {
-
-
-
-
-
-
- Next() (*Result, error)
-
-
-
-
- Close() error
- }
- type Scan struct {
- base
- baseQuery
- startRow []byte
- stopRow []byte
- scannerID uint64
- maxResultSize uint64
- numberOfRows uint32
- reversed bool
- closeScanner bool
- allowPartialResults bool
- }
- func baseScan(ctx context.Context, table []byte,
- options ...func(Call) error) (*Scan, error) {
- s := &Scan{
- base: base{
- table: table,
- ctx: ctx,
- resultch: make(chan RPCResult, 1),
- },
- baseQuery: newBaseQuery(),
- scannerID: math.MaxUint64,
- maxResultSize: DefaultMaxResultSize,
- numberOfRows: DefaultNumberOfRows,
- reversed: false,
- }
- err := applyOptions(s, options...)
- if err != nil {
- return nil, err
- }
- return s, nil
- }
- func (s *Scan) String() string {
- return fmt.Sprintf("Scan{Table=%q StartRow=%q StopRow=%q TimeRange=(%d, %d) "+
- "MaxVersions=%d NumberOfRows=%d MaxResultSize=%d Familes=%v Filter=%v "+
- "StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v}",
- s.table, s.startRow, s.stopRow, s.fromTimestamp, s.toTimestamp,
- s.maxVersions, s.numberOfRows, s.maxResultSize, s.families, s.filter,
- s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner)
- }
- func NewScan(ctx context.Context, table []byte, options ...func(Call) error) (*Scan, error) {
- return baseScan(ctx, table, options...)
- }
- func NewScanRange(ctx context.Context, table, startRow, stopRow []byte,
- options ...func(Call) error) (*Scan, error) {
- scan, err := baseScan(ctx, table, options...)
- if err != nil {
- return nil, err
- }
- scan.startRow = startRow
- scan.stopRow = stopRow
- scan.key = startRow
- return scan, nil
- }
- func NewScanStr(ctx context.Context, table string, options ...func(Call) error) (*Scan, error) {
- return NewScan(ctx, []byte(table), options...)
- }
- func NewScanRangeStr(ctx context.Context, table, startRow, stopRow string,
- options ...func(Call) error) (*Scan, error) {
- return NewScanRange(ctx, []byte(table), []byte(startRow), []byte(stopRow), options...)
- }
- func (s *Scan) Name() string {
- return "Scan"
- }
- func (s *Scan) StopRow() []byte {
- return s.stopRow
- }
- func (s *Scan) StartRow() []byte {
- return s.startRow
- }
- func (s *Scan) IsClosing() bool {
- return s.closeScanner
- }
- func (s *Scan) AllowPartialResults() bool {
- return s.allowPartialResults
- }
- func (s *Scan) Reversed() bool {
- return s.reversed
- }
- func (s *Scan) NumberOfRows() uint32 {
- return s.numberOfRows
- }
- func (s *Scan) ToProto() proto.Message {
- scan := &pb.ScanRequest{
- Region: s.regionSpecifier(),
- CloseScanner: &s.closeScanner,
- NumberOfRows: &s.numberOfRows,
-
- ClientHandlesPartials: proto.Bool(true),
-
-
- ClientHandlesHeartbeats: proto.Bool(true),
- }
- if s.scannerID != math.MaxUint64 {
- scan.ScannerId = &s.scannerID
- return scan
- }
- scan.Scan = &pb.Scan{
- Column: familiesToColumn(s.families),
- StartRow: s.startRow,
- StopRow: s.stopRow,
- TimeRange: &pb.TimeRange{},
- MaxResultSize: &s.maxResultSize,
- }
- if s.maxVersions != DefaultMaxVersions {
- scan.Scan.MaxVersions = &s.maxVersions
- }
-
- if s.storeLimit != DefaultMaxResultsPerColumnFamily {
- scan.Scan.StoreLimit = &s.storeLimit
- }
- if s.storeOffset != 0 {
- scan.Scan.StoreOffset = &s.storeOffset
- }
- if s.fromTimestamp != MinTimestamp {
- scan.Scan.TimeRange.From = &s.fromTimestamp
- }
- if s.toTimestamp != MaxTimestamp {
- scan.Scan.TimeRange.To = &s.toTimestamp
- }
- if s.reversed {
- scan.Scan.Reversed = &s.reversed
- }
- scan.Scan.Filter = s.filter
- return scan
- }
- func (s *Scan) NewResponse() proto.Message {
- return &pb.ScanResponse{}
- }
- func (s *Scan) DeserializeCellBlocks(m proto.Message, b []byte) (uint32, error) {
- scanResp := m.(*pb.ScanResponse)
- partials := scanResp.GetPartialFlagPerResult()
- scanResp.Results = make([]*pb.Result, len(partials))
- var readLen uint32
- for i, numCells := range scanResp.GetCellsPerResult() {
- cells, l, err := deserializeCellBlocks(b[readLen:], numCells)
- if err != nil {
- return 0, err
- }
- scanResp.Results[i] = &pb.Result{
- Cell: cells,
- Partial: proto.Bool(partials[i]),
- }
- readLen += l
- }
- return readLen, nil
- }
- func ScannerID(id uint64) func(Call) error {
- return func(s Call) error {
- scan, ok := s.(*Scan)
- if !ok {
- return errors.New("'ScannerID' option can only be used with Scan queries")
- }
- scan.scannerID = id
- return nil
- }
- }
- func CloseScanner() func(Call) error {
- return func(s Call) error {
- scan, ok := s.(*Scan)
- if !ok {
- return errors.New("'Close' option can only be used with Scan queries")
- }
- scan.closeScanner = true
- return nil
- }
- }
- func MaxResultSize(n uint64) func(Call) error {
- return func(g Call) error {
- scan, ok := g.(*Scan)
- if !ok {
- return errors.New("'MaxResultSize' option can only be used with Scan queries")
- }
- if n == 0 {
- return errors.New("'MaxResultSize' option must be greater than 0")
- }
- scan.maxResultSize = n
- return nil
- }
- }
- func NumberOfRows(n uint32) func(Call) error {
- return func(g Call) error {
- scan, ok := g.(*Scan)
- if !ok {
- return errors.New("'NumberOfRows' option can only be used with Scan queries")
- }
- scan.numberOfRows = n
- return nil
- }
- }
- func AllowPartialResults() func(Call) error {
- return func(g Call) error {
- scan, ok := g.(*Scan)
- if !ok {
- return errors.New("'AllowPartialResults' option can only be used with Scan queries")
- }
- scan.allowPartialResults = true
- return nil
- }
- }
- func Reversed() func(Call) error {
- return func(g Call) error {
- scan, ok := g.(*Scan)
- if !ok {
- return errors.New("'Reversed' option can only be used with Scan queries")
- }
- scan.reversed = true
- return nil
- }
- }
|