|
- // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
- // This file is part of GoHBase.
- // Use of this source code is governed by the Apache License 2.0
- // that can be found in the COPYING file.
- package region
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "net"
- "sync"
- "sync/atomic"
- "time"
- log "github.com/sirupsen/logrus"
- "github.com/golang/protobuf/proto"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/pb"
- )
- // ClientType is a type alias to represent the type of this region client
- type ClientType string
- type canDeserializeCellBlocks interface {
- // DeserializeCellBlocks populates passed protobuf message with results
- // deserialized from the reader and returns number of bytes read or error.
- DeserializeCellBlocks(proto.Message, []byte) (uint32, error)
- }
- var (
- // ErrMissingCallID is used when HBase sends us a response message for a
- // request that we didn't send
- ErrMissingCallID = errors.New("got a response with a nonsensical call ID")
- // ErrClientDead is returned to rpcs when Close() is called or when client
- // died because of failed send or receive
- ErrClientDead = UnrecoverableError{errors.New("client is dead")}
- // javaRetryableExceptions is a map where all Java exceptions that signify
- // the RPC should be sent again are listed (as keys). If a Java exception
- // listed here is returned by HBase, the client should attempt to resend
- // the RPC message, potentially via a different region client.
- javaRetryableExceptions = map[string]struct{}{
- "org.apache.hadoop.hbase.CallQueueTooBigException": struct{}{},
- "org.apache.hadoop.hbase.NotServingRegionException": struct{}{},
- "org.apache.hadoop.hbase.exceptions.RegionMovedException": struct{}{},
- "org.apache.hadoop.hbase.exceptions.RegionOpeningException": struct{}{},
- "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException": struct{}{},
- "org.apache.hadoop.hbase.quotas.RpcThrottlingException": struct{}{},
- "org.apache.hadoop.hbase.RetryImmediatelyException": struct{}{},
- }
- // javaUnrecoverableExceptions is a map where all Java exceptions that signify
- // the RPC should be sent again are listed (as keys). If a Java exception
- // listed here is returned by HBase, the RegionClient will be closed and a new
- // one should be established.
- javaUnrecoverableExceptions = map[string]struct{}{
- "org.apache.hadoop.hbase.regionserver.RegionServerAbortedException": struct{}{},
- "org.apache.hadoop.hbase.regionserver.RegionServerStoppedException": struct{}{},
- }
- )
- const (
- //DefaultLookupTimeout is the default region lookup timeout
- DefaultLookupTimeout = 30 * time.Second
- //DefaultReadTimeout is the default region read timeout
- DefaultReadTimeout = 30 * time.Second
- // RegionClient is a ClientType that means this will be a normal client
- RegionClient = ClientType("ClientService")
- // MasterClient is a ClientType that means this client will talk to the
- // master server
- MasterClient = ClientType("MasterService")
- )
- var bufferPool = sync.Pool{
- New: func() interface{} {
- var b []byte
- return b
- },
- }
- func newBuffer(size int) []byte {
- b := bufferPool.Get().([]byte)
- if cap(b) < size {
- doublecap := 2 * cap(b)
- if doublecap > size {
- return make([]byte, size, doublecap)
- }
- return make([]byte, size)
- }
- return b[:size]
- }
- func freeBuffer(b []byte) {
- bufferPool.Put(b[:0])
- }
- // UnrecoverableError is an error that this region.Client can't recover from.
- // The connection to the RegionServer has to be closed and all queued and
- // outstanding RPCs will be failed / retried.
- type UnrecoverableError struct {
- error
- }
- func (e UnrecoverableError) Error() string {
- return e.error.Error()
- }
- // RetryableError is an error that indicates the RPC should be retried because
- // the error is transient (e.g. a region being momentarily unavailable).
- type RetryableError struct {
- error
- }
- func (e RetryableError) Error() string {
- return e.error.Error()
- }
- // client manages a connection to a RegionServer.
- type client struct {
- conn net.Conn
- // Address of the RegionServer.
- addr string
- // once used for concurrent calls to fail
- once sync.Once
- rpcs chan hrpc.Call
- done chan struct{}
- // sent contains the mapping of sent call IDs to RPC calls, so that when
- // a response is received it can be tied to the correct RPC
- sentM sync.Mutex // protects sent
- sent map[uint32]hrpc.Call
- // inFlight is number of rpcs sent to regionserver awaiting response
- inFlightM sync.Mutex // protects inFlight and SetReadDeadline
- inFlight uint32
- id uint32
- rpcQueueSize int
- flushInterval time.Duration
- effectiveUser string
- // readTimeout is the maximum amount of time to wait for regionserver reply
- readTimeout time.Duration
- }
- // QueueRPC will add an rpc call to the queue for processing by the writer goroutine
- func (c *client) QueueRPC(rpc hrpc.Call) {
- if b, ok := rpc.(hrpc.Batchable); ok && c.rpcQueueSize > 1 && !b.SkipBatch() {
- // queue up the rpc
- select {
- case <-rpc.Context().Done():
- // rpc timed out before being processed
- case <-c.done:
- returnResult(rpc, nil, ErrClientDead)
- case c.rpcs <- rpc:
- }
- } else {
- if err := c.trySend(rpc); err != nil {
- returnResult(rpc, nil, err)
- }
- }
- }
- // Close asks this region.Client to close its connection to the RegionServer.
- // All queued and outstanding RPCs, if any, will be failed as if a connection
- // error had happened.
- func (c *client) Close() {
- c.fail(ErrClientDead)
- }
- // Addr returns address of the region server the client is connected to
- func (c *client) Addr() string {
- return c.addr
- }
- // String returns a string represintation of the current region client
- func (c *client) String() string {
- return fmt.Sprintf("RegionClient{Addr: %s}", c.addr)
- }
- func (c *client) inFlightUp() {
- c.inFlightM.Lock()
- c.inFlight++
- // we expect that at least the last request can be completed within readTimeout
- c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
- c.inFlightM.Unlock()
- }
- func (c *client) inFlightDown() {
- c.inFlightM.Lock()
- c.inFlight--
- // reset read timeout if we are not waiting for any responses
- // in order to prevent from closing this client if there are no request
- if c.inFlight == 0 {
- c.conn.SetReadDeadline(time.Time{})
- }
- c.inFlightM.Unlock()
- }
- func (c *client) fail(err error) {
- c.once.Do(func() {
- log.WithFields(log.Fields{
- "client": c,
- "err": err,
- }).Error("error occured, closing region client")
- // we don't close c.rpcs channel to make it block in select of QueueRPC
- // and avoid dealing with synchronization of closing it while someone
- // might be sending to it. Go's GC will take care of it.
- // tell goroutines to stop
- close(c.done)
- // close connection to the regionserver
- // to let it know that we can't receive anymore
- // and fail all the rpcs being sent
- c.conn.Close()
- c.failSentRPCs()
- })
- }
- func (c *client) failSentRPCs() {
- // channel is closed, clean up awaiting rpcs
- c.sentM.Lock()
- sent := c.sent
- c.sent = make(map[uint32]hrpc.Call)
- c.sentM.Unlock()
- log.WithFields(log.Fields{
- "client": c,
- "count": len(sent),
- }).Debug("failing awaiting RPCs")
- // send error to awaiting rpcs
- for _, rpc := range sent {
- returnResult(rpc, nil, ErrClientDead)
- }
- }
- func (c *client) registerRPC(rpc hrpc.Call) uint32 {
- currID := atomic.AddUint32(&c.id, 1)
- c.sentM.Lock()
- c.sent[currID] = rpc
- c.sentM.Unlock()
- return currID
- }
- func (c *client) unregisterRPC(id uint32) hrpc.Call {
- c.sentM.Lock()
- rpc := c.sent[id]
- delete(c.sent, id)
- c.sentM.Unlock()
- return rpc
- }
- func (c *client) processRPCs() {
- // TODO: flush when the size is too large
- // TODO: if multi has only one call, send that call instead
- m := newMulti(c.rpcQueueSize)
- defer func() {
- m.returnResults(nil, ErrClientDead)
- }()
- flush := func() {
- if log.GetLevel() == log.DebugLevel {
- log.WithFields(log.Fields{
- "len": m.len(),
- "addr": c.Addr(),
- }).Debug("flushing MultiRequest")
- }
- if err := c.trySend(m); err != nil {
- m.returnResults(nil, err)
- }
- m = newMulti(c.rpcQueueSize)
- }
- for {
- // first loop is to accomodate request heavy workload
- // it will batch as long as conccurent writers are sending
- // new rpcs or until multi is filled up
- for {
- select {
- case <-c.done:
- return
- case rpc := <-c.rpcs:
- // have things queued up, batch them
- if !m.add(rpc) {
- // can still put more rpcs into batch
- continue
- }
- default:
- // no more rpcs queued up
- }
- break
- }
- if l := m.len(); l == 0 {
- // wait for the next batch
- select {
- case <-c.done:
- return
- case rpc := <-c.rpcs:
- m.add(rpc)
- }
- continue
- } else if l == c.rpcQueueSize || c.flushInterval == 0 {
- // batch is full, flush
- flush()
- continue
- }
- // second loop is to accomodate less frequent callers
- // that would like to maximize their batches at the expense
- // of waiting for flushInteval
- timer := time.NewTimer(c.flushInterval)
- for {
- select {
- case <-c.done:
- return
- case <-timer.C:
- // time to flush
- case rpc := <-c.rpcs:
- if !m.add(rpc) {
- // can still put more rpcs into batch
- continue
- }
- // batch is full
- if !timer.Stop() {
- <-timer.C
- }
- }
- break
- }
- flush()
- }
- }
- func returnResult(c hrpc.Call, msg proto.Message, err error) {
- if m, ok := c.(*multi); ok {
- m.returnResults(msg, err)
- } else {
- c.ResultChan() <- hrpc.RPCResult{Msg: msg, Error: err}
- }
- }
- func (c *client) trySend(rpc hrpc.Call) error {
- select {
- case <-c.done:
- // An unrecoverable error has occured,
- // region client has been stopped,
- // don't send rpcs
- return ErrClientDead
- case <-rpc.Context().Done():
- // If the deadline has been exceeded, don't bother sending the
- // request. The function that placed the RPC in our queue should
- // stop waiting for a result and return an error.
- return nil
- default:
- if id, err := c.send(rpc); err != nil {
- if _, ok := err.(UnrecoverableError); ok {
- c.fail(err)
- }
- if r := c.unregisterRPC(id); r != nil {
- // we are the ones to unregister the rpc,
- // return err to notify client of it
- return err
- }
- }
- return nil
- }
- }
- func (c *client) receiveRPCs() {
- for {
- select {
- case <-c.done:
- return
- default:
- if err := c.receive(); err != nil {
- switch err.(type) {
- case UnrecoverableError:
- c.fail(err)
- return
- case RetryableError:
- continue
- }
- }
- }
- }
- }
- func (c *client) receive() (err error) {
- var (
- sz [4]byte
- header pb.ResponseHeader
- response proto.Message
- )
- err = c.readFully(sz[:])
- if err != nil {
- return UnrecoverableError{err}
- }
- size := binary.BigEndian.Uint32(sz[:])
- b := make([]byte, size)
- err = c.readFully(b)
- if err != nil {
- return UnrecoverableError{err}
- }
- buf := proto.NewBuffer(b)
- if err = buf.DecodeMessage(&header); err != nil {
- return fmt.Errorf("failed to decode the response header: %s", err)
- }
- if header.CallId == nil {
- return ErrMissingCallID
- }
- callID := *header.CallId
- rpc := c.unregisterRPC(callID)
- if rpc == nil {
- return fmt.Errorf("got a response with an unexpected call ID: %d", callID)
- }
- c.inFlightDown()
- select {
- case <-rpc.Context().Done():
- // context has expired, don't bother deserializing
- return
- default:
- }
- // Here we know for sure that we got a response for rpc we asked.
- // It's our responsibility to deliver the response or error to the
- // caller as we unregistered the rpc.
- defer func() { returnResult(rpc, response, err) }()
- if header.Exception == nil {
- response = rpc.NewResponse()
- if err = buf.DecodeMessage(response); err != nil {
- err = fmt.Errorf("failed to decode the response: %s", err)
- return
- }
- var cellsLen uint32
- if header.CellBlockMeta != nil {
- cellsLen = header.CellBlockMeta.GetLength()
- }
- if d, ok := rpc.(canDeserializeCellBlocks); cellsLen > 0 && ok {
- b := buf.Bytes()[size-cellsLen:]
- var nread uint32
- nread, err = d.DeserializeCellBlocks(response, b)
- if err != nil {
- err = fmt.Errorf("failed to decode the response: %s", err)
- return
- }
- if int(nread) < len(b) {
- err = fmt.Errorf("short read: buffer len %d, read %d", len(b), nread)
- return
- }
- }
- } else {
- err = exceptionToError(*header.Exception.ExceptionClassName, *header.Exception.StackTrace)
- }
- return
- }
- func exceptionToError(class, stack string) error {
- err := fmt.Errorf("HBase Java exception %s:\n%s", class, stack)
- if _, ok := javaRetryableExceptions[class]; ok {
- return RetryableError{err}
- } else if _, ok := javaUnrecoverableExceptions[class]; ok {
- return UnrecoverableError{err}
- }
- return err
- }
- // write sends the given buffer to the RegionServer.
- func (c *client) write(buf []byte) error {
- _, err := c.conn.Write(buf)
- return err
- }
- // Tries to read enough data to fully fill up the given buffer.
- func (c *client) readFully(buf []byte) error {
- _, err := io.ReadFull(c.conn, buf)
- return err
- }
- // sendHello sends the "hello" message needed when opening a new connection.
- func (c *client) sendHello(ctype ClientType) error {
- connHeader := &pb.ConnectionHeader{
- UserInfo: &pb.UserInformation{
- EffectiveUser: proto.String(c.effectiveUser),
- },
- ServiceName: proto.String(string(ctype)),
- CellBlockCodecClass: proto.String("org.apache.hadoop.hbase.codec.KeyValueCodec"),
- }
- data, err := proto.Marshal(connHeader)
- if err != nil {
- return fmt.Errorf("failed to marshal connection header: %s", err)
- }
- const header = "HBas\x00\x50" // \x50 = Simple Auth.
- buf := make([]byte, 0, len(header)+4+len(data))
- buf = append(buf, header...)
- buf = buf[:len(header)+4]
- binary.BigEndian.PutUint32(buf[6:], uint32(len(data)))
- buf = append(buf, data...)
- return c.write(buf)
- }
- // send sends an RPC out to the wire.
- // Returns the response (for now, as the call is synchronous).
- func (c *client) send(rpc hrpc.Call) (uint32, error) {
- b := newBuffer(4)
- defer func() { freeBuffer(b) }()
- buf := proto.NewBuffer(b[4:])
- buf.Reset()
- request := rpc.ToProto()
- // we have to register rpc after we marhsal because
- // registered rpc can fail before it was even sent
- // in all the cases where c.fail() is called.
- // If that happens, client can retry sending the rpc
- // again potentially changing it's contents.
- id := c.registerRPC(rpc)
- header := &pb.RequestHeader{
- CallId: &id,
- MethodName: proto.String(rpc.Name()),
- RequestParam: proto.Bool(true),
- }
- if err := buf.EncodeMessage(header); err != nil {
- return id, fmt.Errorf("failed to marshal request header: %s", err)
- }
- if err := buf.EncodeMessage(request); err != nil {
- return id, fmt.Errorf("failed to marshal request: %s", err)
- }
- payload := buf.Bytes()
- binary.BigEndian.PutUint32(b, uint32(len(payload)))
- b = append(b[:4], payload...)
- if err := c.write(b); err != nil {
- return id, UnrecoverableError{err}
- }
- c.inFlightUp()
- return id, nil
- }
|