123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
- // This file is part of GoHBase.
- // Use of this source code is governed by the Apache License 2.0
- // that can be found in the COPYING file.
- // +build testing
- package region
- import (
- "bytes"
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/pb"
- )
- type testClient struct {
- addr string
- numNSRE int32
- }
- var nsreRegion = &pb.Result{Cell: []*pb.Cell{
- &pb.Cell{
- Row: []byte("nsre,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
- Family: []byte("info"),
- Qualifier: []byte("regioninfo"),
- Value: []byte("PBUF\b\xc4\xcd\xe9\x99\xe0)\x12\x0f\n\adefault\x12\x04nsre" +
- "\x1a\x00\"\x00(\x000\x008\x00"),
- },
- &pb.Cell{
- Row: []byte("nsre,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
- Family: []byte("info"),
- Qualifier: []byte("seqnumDuringOpen"),
- Value: []byte("\x00\x00\x00\x00\x00\x00\x00\x02"),
- },
- &pb.Cell{
- Row: []byte("nsre,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
- Family: []byte("info"),
- Qualifier: []byte("server"),
- Value: []byte("regionserver:1"),
- },
- &pb.Cell{
- Row: []byte("nsre,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
- Family: []byte("info"),
- Qualifier: []byte("serverstartcode"),
- Value: []byte("\x00\x00\x01N\x02\x92R\xb1"),
- },
- }}
- // makeRegionResult returns a region that spans the whole table
- // and uses name of the table as the hostname of the regionserver
- func makeRegionResult(key []byte) *pb.ScanResponse {
- s := bytes.SplitN(key, []byte(","), 2)
- fqtable := s[0]
- row := append(fqtable, []byte(",,1434573235908.56f833d5569a27c7a43fbf547b4924a4.")...)
- t := bytes.SplitN(fqtable, []byte{':'}, 2)
- var namespace, table []byte
- if len(t) == 2 {
- namespace = t[0]
- table = t[1]
- } else {
- namespace = []byte("default")
- table = fqtable
- }
- regionInfo := &pb.RegionInfo{
- RegionId: proto.Uint64(1434573235908),
- TableName: &pb.TableName{
- Namespace: namespace,
- Qualifier: table,
- },
- Offline: proto.Bool(false),
- }
- regionInfoValue, err := proto.Marshal(regionInfo)
- if err != nil {
- panic(err)
- }
- regionInfoValue = append([]byte("PBUF"), regionInfoValue...)
- return &pb.ScanResponse{Results: []*pb.Result{
- &pb.Result{Cell: []*pb.Cell{
- &pb.Cell{
- Row: row,
- Family: []byte("info"),
- Qualifier: []byte("regioninfo"),
- Value: regionInfoValue,
- },
- &pb.Cell{
- Row: row,
- Family: []byte("info"),
- Qualifier: []byte("seqnumDuringOpen"),
- Value: []byte("\x00\x00\x00\x00\x00\x00\x00\x02"),
- },
- &pb.Cell{
- Row: row,
- Family: []byte("info"),
- Qualifier: []byte("server"),
- Value: fqtable,
- },
- &pb.Cell{
- Row: row,
- Family: []byte("info"),
- Qualifier: []byte("serverstartcode"),
- Value: []byte("\x00\x00\x01N\x02\x92R\xb1"),
- },
- }}}}
- }
- var metaRow = &pb.Result{Cell: []*pb.Cell{
- &pb.Cell{
- Row: []byte("test,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
- Family: []byte("info"),
- Qualifier: []byte("regioninfo"),
- Value: []byte("PBUF\b\xc4\xcd\xe9\x99\xe0)\x12\x0f\n\adefault\x12\x04test" +
- "\x1a\x00\"\x00(\x000\x008\x00"),
- },
- &pb.Cell{
- Row: []byte("test,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
- Family: []byte("info"),
- Qualifier: []byte("seqnumDuringOpen"),
- Value: []byte("\x00\x00\x00\x00\x00\x00\x00\x02"),
- },
- &pb.Cell{
- Row: []byte("test,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
- Family: []byte("info"),
- Qualifier: []byte("server"),
- Value: []byte("regionserver:2"),
- },
- &pb.Cell{
- Row: []byte("test,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
- Family: []byte("info"),
- Qualifier: []byte("serverstartcode"),
- Value: []byte("\x00\x00\x01N\x02\x92R\xb1"),
- },
- }}
- var test1SplitA = &pb.Result{Cell: []*pb.Cell{
- &pb.Cell{
- Row: []byte("test1,,1480547738107.825c5c7e480c76b73d6d2bad5d3f7bb8."),
- Family: []byte("info"),
- Qualifier: []byte("regioninfo"),
- Value: []byte("PBUF\b\xfbÖ\xbc\x8b+\x12\x10\n\adefault\x12\x05" +
- "test1\x1a\x00\"\x03baz(\x000\x008\x00"),
- },
- &pb.Cell{
- Row: []byte("test1,,1480547738107.825c5c7e480c76b73d6d2bad5d3f7bb8."),
- Family: []byte("info"),
- Qualifier: []byte("seqnumDuringOpen"),
- Value: []byte("\x00\x00\x00\x00\x00\x00\x00\v"),
- },
- &pb.Cell{
- Row: []byte("test1,,1480547738107.825c5c7e480c76b73d6d2bad5d3f7bb8."),
- Family: []byte("info"),
- Qualifier: []byte("server"),
- Value: []byte("regionserver:1"),
- },
- &pb.Cell{
- Row: []byte("test1,,1480547738107.825c5c7e480c76b73d6d2bad5d3f7bb8."),
- Family: []byte("info"),
- Qualifier: []byte("serverstartcode"),
- Value: []byte("\x00\x00\x01X\xb6\x83^3"),
- },
- }}
- var test1SplitB = &pb.Result{Cell: []*pb.Cell{
- &pb.Cell{
- Row: []byte("test1,baz,1480547738107.3f2483f5618e1b791f58f83a8ebba6a9."),
- Family: []byte("info"),
- Qualifier: []byte("regioninfo"),
- Value: []byte("PBUF\b\xfbÖ\xbc\x8b+\x12\x10\n\adefault\x12\x05" +
- "test1\x1a\x03baz\"\x00(\x000\x008\x00"),
- },
- &pb.Cell{
- Row: []byte("test1,baz,1480547738107.3f2483f5618e1b791f58f83a8ebba6a9."),
- Family: []byte("info"),
- Qualifier: []byte("seqnumDuringOpen"),
- Value: []byte("\x00\x00\x00\x00\x00\x00\x00\f"),
- },
- &pb.Cell{
- Row: []byte("test1,baz,1480547738107.3f2483f5618e1b791f58f83a8ebba6a9."),
- Family: []byte("info"),
- Qualifier: []byte("server"),
- Value: []byte("regionserver:3"),
- },
- &pb.Cell{
- Row: []byte("test1,baz,1480547738107.3f2483f5618e1b791f58f83a8ebba6a9."),
- Family: []byte("info"),
- Qualifier: []byte("serverstartcode"),
- Value: []byte("\x00\x00\x01X\xb6\x83^3"),
- },
- }}
- var m sync.RWMutex
- var clients map[string]uint32
- func init() {
- clients = make(map[string]uint32)
- }
- // NewClient creates a new test region client.
- func NewClient(ctx context.Context, addr string, ctype ClientType,
- queueSize int, flushInterval time.Duration, effectiveUser string,
- readTimeout time.Duration) (hrpc.RegionClient, error) {
- m.Lock()
- clients[addr]++
- m.Unlock()
- return &testClient{addr: addr}, nil
- }
- func (c *testClient) Addr() string {
- return c.addr
- }
- func (c *testClient) String() string {
- return fmt.Sprintf("RegionClient{Addr: %s}", c.addr)
- }
- func (c *testClient) QueueRPC(call hrpc.Call) {
- // ignore timed out rpcs to mock the region client
- select {
- case <-call.Context().Done():
- return
- default:
- }
- if !bytes.Equal(call.Table(), []byte("hbase:meta")) {
- _, ok := call.(*hrpc.Get)
- if !ok || !bytes.HasSuffix(call.Key(), bytes.Repeat([]byte{0}, 17)) {
- // not a get and not a region probe
- // just return as the mock call should just populate the ResultChan in test
- return
- }
- // region probe, fail for the nsre region 3 times to force retry
- if bytes.Equal(call.Table(), []byte("nsre")) {
- i := atomic.AddInt32(&c.numNSRE, 1)
- if i <= 3 {
- call.ResultChan() <- hrpc.RPCResult{Error: RetryableError{}}
- return
- }
- }
- m.RLock()
- i := clients[c.addr]
- m.RUnlock()
- // if we are connected to this client the first time,
- // pretend it's down to fail the probe and start a reconnect
- if bytes.Equal(call.Table(), []byte("down")) {
- if i <= 1 {
- call.ResultChan() <- hrpc.RPCResult{Error: UnrecoverableError{}}
- } else {
- // otherwise, the region is fine
- call.ResultChan() <- hrpc.RPCResult{}
- }
- return
- }
- }
- if bytes.HasSuffix(call.Key(), bytes.Repeat([]byte{0}, 17)) {
- // meta region probe, return empty to signify that region is online
- call.ResultChan() <- hrpc.RPCResult{}
- } else if bytes.HasPrefix(call.Key(), []byte("test,")) {
- call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
- Results: []*pb.Result{metaRow}}}
- } else if bytes.HasPrefix(call.Key(), []byte("test1,,")) {
- call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
- Results: []*pb.Result{test1SplitA}}}
- } else if bytes.HasPrefix(call.Key(), []byte("nsre,,")) {
- call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
- Results: []*pb.Result{nsreRegion}}}
- } else if bytes.HasPrefix(call.Key(), []byte("tablenotfound,")) {
- call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
- Results: []*pb.Result{},
- MoreResults: proto.Bool(false),
- }}
- } else {
- call.ResultChan() <- hrpc.RPCResult{Msg: makeRegionResult(call.Key())}
- }
- }
- func (c *testClient) Close() {}
|