123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- package region
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "net"
- "sync"
- "sync/atomic"
- "time"
- log "github.com/sirupsen/logrus"
- "github.com/golang/protobuf/proto"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/pb"
- )
- type ClientType string
- type canDeserializeCellBlocks interface {
-
-
- DeserializeCellBlocks(proto.Message, []byte) (uint32, error)
- }
- var (
-
-
- ErrMissingCallID = errors.New("got a response with a nonsensical call ID")
-
-
- ErrClientDead = UnrecoverableError{errors.New("client is dead")}
-
-
-
-
- javaRetryableExceptions = map[string]struct{}{
- "org.apache.hadoop.hbase.CallQueueTooBigException": struct{}{},
- "org.apache.hadoop.hbase.NotServingRegionException": struct{}{},
- "org.apache.hadoop.hbase.exceptions.RegionMovedException": struct{}{},
- "org.apache.hadoop.hbase.exceptions.RegionOpeningException": struct{}{},
- "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException": struct{}{},
- "org.apache.hadoop.hbase.quotas.RpcThrottlingException": struct{}{},
- "org.apache.hadoop.hbase.RetryImmediatelyException": struct{}{},
- }
-
-
-
-
- javaUnrecoverableExceptions = map[string]struct{}{
- "org.apache.hadoop.hbase.regionserver.RegionServerAbortedException": struct{}{},
- "org.apache.hadoop.hbase.regionserver.RegionServerStoppedException": struct{}{},
- }
- )
- const (
-
- DefaultLookupTimeout = 30 * time.Second
-
- DefaultReadTimeout = 30 * time.Second
-
- RegionClient = ClientType("ClientService")
-
-
- MasterClient = ClientType("MasterService")
- )
- var bufferPool = sync.Pool{
- New: func() interface{} {
- var b []byte
- return b
- },
- }
- func newBuffer(size int) []byte {
- b := bufferPool.Get().([]byte)
- if cap(b) < size {
- doublecap := 2 * cap(b)
- if doublecap > size {
- return make([]byte, size, doublecap)
- }
- return make([]byte, size)
- }
- return b[:size]
- }
- func freeBuffer(b []byte) {
- bufferPool.Put(b[:0])
- }
- type UnrecoverableError struct {
- error
- }
- func (e UnrecoverableError) Error() string {
- return e.error.Error()
- }
- type RetryableError struct {
- error
- }
- func (e RetryableError) Error() string {
- return e.error.Error()
- }
- type client struct {
- conn net.Conn
-
- addr string
-
- once sync.Once
- rpcs chan hrpc.Call
- done chan struct{}
-
-
- sentM sync.Mutex
- sent map[uint32]hrpc.Call
-
- inFlightM sync.Mutex
- inFlight uint32
- id uint32
- rpcQueueSize int
- flushInterval time.Duration
- effectiveUser string
-
- readTimeout time.Duration
- }
- func (c *client) QueueRPC(rpc hrpc.Call) {
- if b, ok := rpc.(hrpc.Batchable); ok && c.rpcQueueSize > 1 && !b.SkipBatch() {
-
- select {
- case <-rpc.Context().Done():
-
- case <-c.done:
- returnResult(rpc, nil, ErrClientDead)
- case c.rpcs <- rpc:
- }
- } else {
- if err := c.trySend(rpc); err != nil {
- returnResult(rpc, nil, err)
- }
- }
- }
- func (c *client) Close() {
- c.fail(ErrClientDead)
- }
- func (c *client) Addr() string {
- return c.addr
- }
- func (c *client) String() string {
- return fmt.Sprintf("RegionClient{Addr: %s}", c.addr)
- }
- func (c *client) inFlightUp() {
- c.inFlightM.Lock()
- c.inFlight++
-
- c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
- c.inFlightM.Unlock()
- }
- func (c *client) inFlightDown() {
- c.inFlightM.Lock()
- c.inFlight--
-
-
- if c.inFlight == 0 {
- c.conn.SetReadDeadline(time.Time{})
- }
- c.inFlightM.Unlock()
- }
- func (c *client) fail(err error) {
- c.once.Do(func() {
- log.WithFields(log.Fields{
- "client": c,
- "err": err,
- }).Error("error occured, closing region client")
-
-
-
-
- close(c.done)
-
-
-
- c.conn.Close()
- c.failSentRPCs()
- })
- }
- func (c *client) failSentRPCs() {
-
- c.sentM.Lock()
- sent := c.sent
- c.sent = make(map[uint32]hrpc.Call)
- c.sentM.Unlock()
- log.WithFields(log.Fields{
- "client": c,
- "count": len(sent),
- }).Debug("failing awaiting RPCs")
-
- for _, rpc := range sent {
- returnResult(rpc, nil, ErrClientDead)
- }
- }
- func (c *client) registerRPC(rpc hrpc.Call) uint32 {
- currID := atomic.AddUint32(&c.id, 1)
- c.sentM.Lock()
- c.sent[currID] = rpc
- c.sentM.Unlock()
- return currID
- }
- func (c *client) unregisterRPC(id uint32) hrpc.Call {
- c.sentM.Lock()
- rpc := c.sent[id]
- delete(c.sent, id)
- c.sentM.Unlock()
- return rpc
- }
- func (c *client) processRPCs() {
-
-
- m := newMulti(c.rpcQueueSize)
- defer func() {
- m.returnResults(nil, ErrClientDead)
- }()
- flush := func() {
- if log.GetLevel() == log.DebugLevel {
- log.WithFields(log.Fields{
- "len": m.len(),
- "addr": c.Addr(),
- }).Debug("flushing MultiRequest")
- }
- if err := c.trySend(m); err != nil {
- m.returnResults(nil, err)
- }
- m = newMulti(c.rpcQueueSize)
- }
- for {
-
-
-
- for {
- select {
- case <-c.done:
- return
- case rpc := <-c.rpcs:
-
- if !m.add(rpc) {
-
- continue
- }
- default:
-
- }
- break
- }
- if l := m.len(); l == 0 {
-
- select {
- case <-c.done:
- return
- case rpc := <-c.rpcs:
- m.add(rpc)
- }
- continue
- } else if l == c.rpcQueueSize || c.flushInterval == 0 {
-
- flush()
- continue
- }
-
-
-
- timer := time.NewTimer(c.flushInterval)
- for {
- select {
- case <-c.done:
- return
- case <-timer.C:
-
- case rpc := <-c.rpcs:
- if !m.add(rpc) {
-
- continue
- }
-
- if !timer.Stop() {
- <-timer.C
- }
- }
- break
- }
- flush()
- }
- }
- func returnResult(c hrpc.Call, msg proto.Message, err error) {
- if m, ok := c.(*multi); ok {
- m.returnResults(msg, err)
- } else {
- c.ResultChan() <- hrpc.RPCResult{Msg: msg, Error: err}
- }
- }
- func (c *client) trySend(rpc hrpc.Call) error {
- select {
- case <-c.done:
-
-
-
- return ErrClientDead
- case <-rpc.Context().Done():
-
-
-
- return nil
- default:
- if id, err := c.send(rpc); err != nil {
- if _, ok := err.(UnrecoverableError); ok {
- c.fail(err)
- }
- if r := c.unregisterRPC(id); r != nil {
-
-
- return err
- }
- }
- return nil
- }
- }
- func (c *client) receiveRPCs() {
- for {
- select {
- case <-c.done:
- return
- default:
- if err := c.receive(); err != nil {
- switch err.(type) {
- case UnrecoverableError:
- c.fail(err)
- return
- case RetryableError:
- continue
- }
- }
- }
- }
- }
- func (c *client) receive() (err error) {
- var (
- sz [4]byte
- header pb.ResponseHeader
- response proto.Message
- )
- err = c.readFully(sz[:])
- if err != nil {
- return UnrecoverableError{err}
- }
- size := binary.BigEndian.Uint32(sz[:])
- b := make([]byte, size)
- err = c.readFully(b)
- if err != nil {
- return UnrecoverableError{err}
- }
- buf := proto.NewBuffer(b)
- if err = buf.DecodeMessage(&header); err != nil {
- return fmt.Errorf("failed to decode the response header: %s", err)
- }
- if header.CallId == nil {
- return ErrMissingCallID
- }
- callID := *header.CallId
- rpc := c.unregisterRPC(callID)
- if rpc == nil {
- return fmt.Errorf("got a response with an unexpected call ID: %d", callID)
- }
- c.inFlightDown()
- select {
- case <-rpc.Context().Done():
-
- return
- default:
- }
-
-
-
- defer func() { returnResult(rpc, response, err) }()
- if header.Exception == nil {
- response = rpc.NewResponse()
- if err = buf.DecodeMessage(response); err != nil {
- err = fmt.Errorf("failed to decode the response: %s", err)
- return
- }
- var cellsLen uint32
- if header.CellBlockMeta != nil {
- cellsLen = header.CellBlockMeta.GetLength()
- }
- if d, ok := rpc.(canDeserializeCellBlocks); cellsLen > 0 && ok {
- b := buf.Bytes()[size-cellsLen:]
- var nread uint32
- nread, err = d.DeserializeCellBlocks(response, b)
- if err != nil {
- err = fmt.Errorf("failed to decode the response: %s", err)
- return
- }
- if int(nread) < len(b) {
- err = fmt.Errorf("short read: buffer len %d, read %d", len(b), nread)
- return
- }
- }
- } else {
- err = exceptionToError(*header.Exception.ExceptionClassName, *header.Exception.StackTrace)
- }
- return
- }
- func exceptionToError(class, stack string) error {
- err := fmt.Errorf("HBase Java exception %s:\n%s", class, stack)
- if _, ok := javaRetryableExceptions[class]; ok {
- return RetryableError{err}
- } else if _, ok := javaUnrecoverableExceptions[class]; ok {
- return UnrecoverableError{err}
- }
- return err
- }
- func (c *client) write(buf []byte) error {
- _, err := c.conn.Write(buf)
- return err
- }
- func (c *client) readFully(buf []byte) error {
- _, err := io.ReadFull(c.conn, buf)
- return err
- }
- func (c *client) sendHello(ctype ClientType) error {
- connHeader := &pb.ConnectionHeader{
- UserInfo: &pb.UserInformation{
- EffectiveUser: proto.String(c.effectiveUser),
- },
- ServiceName: proto.String(string(ctype)),
- CellBlockCodecClass: proto.String("org.apache.hadoop.hbase.codec.KeyValueCodec"),
- }
- data, err := proto.Marshal(connHeader)
- if err != nil {
- return fmt.Errorf("failed to marshal connection header: %s", err)
- }
- const header = "HBas\x00\x50"
- buf := make([]byte, 0, len(header)+4+len(data))
- buf = append(buf, header...)
- buf = buf[:len(header)+4]
- binary.BigEndian.PutUint32(buf[6:], uint32(len(data)))
- buf = append(buf, data...)
- return c.write(buf)
- }
- func (c *client) send(rpc hrpc.Call) (uint32, error) {
- b := newBuffer(4)
- defer func() { freeBuffer(b) }()
- buf := proto.NewBuffer(b[4:])
- buf.Reset()
- request := rpc.ToProto()
-
-
-
-
-
- id := c.registerRPC(rpc)
- header := &pb.RequestHeader{
- CallId: &id,
- MethodName: proto.String(rpc.Name()),
- RequestParam: proto.Bool(true),
- }
- if err := buf.EncodeMessage(header); err != nil {
- return id, fmt.Errorf("failed to marshal request header: %s", err)
- }
- if err := buf.EncodeMessage(request); err != nil {
- return id, fmt.Errorf("failed to marshal request: %s", err)
- }
- payload := buf.Bytes()
- binary.BigEndian.PutUint32(b, uint32(len(payload)))
- b = append(b[:4], payload...)
- if err := c.write(b); err != nil {
- return id, UnrecoverableError{err}
- }
- c.inFlightUp()
- return id, nil
- }
|