123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
- // This file is part of GoHBase.
- // Use of this source code is governed by the Apache License 2.0
- // that can be found in the COPYING file.
- package gohbase
- import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "io"
- "strconv"
- "time"
- "github.com/golang/protobuf/proto"
- log "github.com/sirupsen/logrus"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/region"
- "github.com/tsuna/gohbase/zk"
- )
- // Constants
- var (
- // Name of the meta region.
- metaTableName = []byte("hbase:meta")
- infoFamily = map[string][]string{
- "info": nil,
- }
- // ErrRegionUnavailable is returned when sending rpc to a region that is unavailable
- ErrRegionUnavailable = errors.New("region unavailable")
- // TableNotFound is returned when attempting to access a table that
- // doesn't exist on this cluster.
- TableNotFound = errors.New("table not found")
- // ErrCannotFindRegion is returned when it took too many tries to find a
- // region for the request. It's likely that hbase:meta has overlaps or some other
- // inconsistency.
- ErrConnotFindRegion = errors.New("cannot find region for the rpc")
- // ErrClientClosed is returned when the gohbase client has been closed
- ErrClientClosed = errors.New("client is closed")
- // errMetaLookupThrottled is returned when a lookup for the rpc's region
- // has been throttled.
- errMetaLookupThrottled = errors.New("lookup to hbase:meta has been throttled")
- )
- const (
- // maxSendRPCTries is the maximum number of times to try to send an RPC
- maxSendRPCTries = 10
- backoffStart = 16 * time.Millisecond
- )
- func (c *client) SendRPC(rpc hrpc.Call) (proto.Message, error) {
- var err error
- for i := 0; i < maxSendRPCTries; i++ {
- // Check the cache for a region that can handle this request
- reg := c.getRegionFromCache(rpc.Table(), rpc.Key())
- if reg == nil {
- reg, err = c.findRegion(rpc.Context(), rpc.Table(), rpc.Key())
- if err == ErrRegionUnavailable {
- continue
- } else if err == errMetaLookupThrottled {
- // lookup for region has been throttled, check the cache
- // again but don't count this as SendRPC try as there
- // might be just too many request going on at a time.
- i--
- continue
- } else if err != nil {
- return nil, err
- }
- }
- msg, err := c.sendRPCToRegion(rpc, reg)
- switch err {
- case ErrRegionUnavailable:
- if ch := reg.AvailabilityChan(); ch != nil {
- // The region is unavailable. Wait for it to become available,
- // a new region or for the deadline to be exceeded.
- select {
- case <-rpc.Context().Done():
- return nil, rpc.Context().Err()
- case <-c.done:
- return nil, ErrClientClosed
- case <-ch:
- }
- }
- default:
- return msg, err
- }
- }
- return nil, ErrConnotFindRegion
- }
- func sendBlocking(rc hrpc.RegionClient, rpc hrpc.Call) (hrpc.RPCResult, error) {
- rc.QueueRPC(rpc)
- var res hrpc.RPCResult
- // Wait for the response
- select {
- case res = <-rpc.ResultChan():
- return res, nil
- case <-rpc.Context().Done():
- return res, rpc.Context().Err()
- }
- }
- func (c *client) sendRPCToRegion(rpc hrpc.Call, reg hrpc.RegionInfo) (proto.Message, error) {
- if reg.IsUnavailable() {
- return nil, ErrRegionUnavailable
- }
- rpc.SetRegion(reg)
- // Queue the RPC to be sent to the region
- client := reg.Client()
- if client == nil {
- // There was an error queueing the RPC.
- // Mark the region as unavailable.
- if reg.MarkUnavailable() {
- // If this was the first goroutine to mark the region as
- // unavailable, start a goroutine to reestablish a connection
- go c.reestablishRegion(reg)
- }
- return nil, ErrRegionUnavailable
- }
- res, err := sendBlocking(client, rpc)
- if err != nil {
- return nil, err
- }
- // Check for errors
- switch res.Error.(type) {
- case region.RetryableError:
- // There's an error specific to this region, but
- // our region client is fine. Mark this region as
- // unavailable (as opposed to all regions sharing
- // the client), and start a goroutine to reestablish
- // it.
- if reg.MarkUnavailable() {
- go c.reestablishRegion(reg)
- }
- return nil, ErrRegionUnavailable
- case region.UnrecoverableError:
- // If it was an unrecoverable error, the region client is
- // considered dead.
- if reg == c.adminRegionInfo {
- // If this is the admin client, mark the region
- // as unavailable and start up a goroutine to
- // reconnect if it wasn't already marked as such.
- if reg.MarkUnavailable() {
- go c.reestablishRegion(reg)
- }
- } else {
- c.clientDown(client)
- }
- // Fall through to the case of the region being unavailable,
- // which will result in blocking until it's available again.
- return nil, ErrRegionUnavailable
- default:
- // RPC was successfully sent, or an unknown type of error
- // occurred. In either case, return the results.
- return res.Msg, res.Error
- }
- }
- // clientDown removes client from cache and marks
- // all the regions sharing this region's
- // client as unavailable, and start a goroutine
- // to reconnect for each of them.
- func (c *client) clientDown(client hrpc.RegionClient) {
- downregions := c.clients.clientDown(client)
- for downreg := range downregions {
- if downreg.MarkUnavailable() {
- downreg.SetClient(nil)
- go c.reestablishRegion(downreg)
- }
- }
- }
- func (c *client) lookupRegion(ctx context.Context,
- table, key []byte) (hrpc.RegionInfo, string, error) {
- var reg hrpc.RegionInfo
- var addr string
- var err error
- backoff := backoffStart
- for {
- // If it takes longer than regionLookupTimeout, fail so that we can sleep
- lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout)
- if c.clientType == adminClient {
- log.WithField("resource", zk.Master).Debug("looking up master")
- addr, err = c.zkLookup(lookupCtx, zk.Master)
- cancel()
- reg = c.adminRegionInfo
- } else if bytes.Compare(table, metaTableName) == 0 {
- log.WithField("resource", zk.Meta).Debug("looking up region server of hbase:meta")
- addr, err = c.zkLookup(lookupCtx, zk.Meta)
- cancel()
- reg = c.metaRegionInfo
- } else {
- log.WithFields(log.Fields{
- "table": strconv.Quote(string(table)),
- "key": strconv.Quote(string(key)),
- }).Debug("looking up region")
- reg, addr, err = c.metaLookup(lookupCtx, table, key)
- cancel()
- if err == TableNotFound {
- log.WithFields(log.Fields{
- "table": strconv.Quote(string(table)),
- "key": strconv.Quote(string(key)),
- "err": err,
- }).Debug("hbase:meta does not know about this table/key")
- return nil, "", err
- } else if err == errMetaLookupThrottled {
- return nil, "", err
- } else if err == ErrClientClosed {
- return nil, "", err
- }
- }
- if err == nil {
- log.WithFields(log.Fields{
- "table": strconv.Quote(string(table)),
- "key": strconv.Quote(string(key)),
- "region": reg,
- "addr": addr,
- }).Debug("looked up a region")
- return reg, addr, nil
- }
- log.WithFields(log.Fields{
- "table": strconv.Quote(string(table)),
- "key": strconv.Quote(string(key)),
- "backoff": backoff,
- "err": err,
- }).Error("failed looking up region")
- // This will be hit if there was an error locating the region
- backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
- if err != nil {
- return nil, "", err
- }
- }
- }
- func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) {
- // The region was not in the cache, it
- // must be looked up in the meta table
- reg, addr, err := c.lookupRegion(ctx, table, key)
- if err != nil {
- return nil, err
- }
- // We are the ones that looked up the region, so we need to
- // mark in unavailable and find a client for it.
- reg.MarkUnavailable()
- if reg != c.metaRegionInfo && reg != c.adminRegionInfo {
- // Check that the region wasn't added to
- // the cache while we were looking it up.
- overlaps, replaced := c.regions.put(reg)
- if !replaced {
- // the same or younger regions are already in cache, retry looking up in cache
- return nil, ErrRegionUnavailable
- }
- // otherwise, new region in cache, delete overlaps from client's cache
- for _, r := range overlaps {
- c.clients.del(r)
- }
- }
- // Start a goroutine to connect to the region
- go c.establishRegion(reg, addr)
- // Wait for the new region to become
- // available, and then send the RPC
- return reg, nil
- }
- // Searches in the regions cache for the region hosting the given row.
- func (c *client) getRegionFromCache(table, key []byte) hrpc.RegionInfo {
- if c.clientType == adminClient {
- return c.adminRegionInfo
- } else if bytes.Equal(table, metaTableName) {
- return c.metaRegionInfo
- }
- regionName := createRegionSearchKey(table, key)
- _, region := c.regions.get(regionName)
- if region == nil {
- return nil
- }
- // make sure the returned region is for the same table
- if !bytes.Equal(fullyQualifiedTable(region), table) {
- // not the same table, can happen if we got the last region
- return nil
- }
- if len(region.StopKey()) != 0 &&
- // If the stop key is an empty byte array, it means this region is the
- // last region for this table and this key ought to be in that region.
- bytes.Compare(key, region.StopKey()) >= 0 {
- return nil
- }
- return region
- }
- // Creates the META key to search for in order to locate the given key.
- func createRegionSearchKey(table, key []byte) []byte {
- metaKey := make([]byte, 0, len(table)+len(key)+3)
- metaKey = append(metaKey, table...)
- metaKey = append(metaKey, ',')
- metaKey = append(metaKey, key...)
- metaKey = append(metaKey, ',')
- // ':' is the first byte greater than '9'. We always want to find the
- // entry with the greatest timestamp, so by looking right before ':'
- // we'll find it.
- metaKey = append(metaKey, ':')
- return metaKey
- }
- // lookupLimit throttles lookups to hbase:meta to metaLookupLimit requests
- // per metaLookupInterval. It returns nil if we were lucky enough to
- // reserve right away and errMetaLookupThrottled or context's error otherwise.
- func (c *client) metaLookupLimit(ctx context.Context) error {
- r := c.metaLookupLimiter.Reserve()
- if !r.OK() {
- panic("wtf: cannot reserve a meta lookup")
- }
- delay := r.Delay()
- if delay <= 0 {
- return nil
- }
- // We've been rate limitted
- t := time.NewTimer(delay)
- defer t.Stop()
- select {
- case <-t.C:
- return errMetaLookupThrottled
- case <-ctx.Done():
- r.Cancel()
- return ctx.Err()
- }
- }
- // metaLookup checks meta table for the region in which the given row key for the given table is.
- func (c *client) metaLookup(ctx context.Context,
- table, key []byte) (hrpc.RegionInfo, string, error) {
- metaKey := createRegionSearchKey(table, key)
- rpc, err := hrpc.NewScanRange(ctx, metaTableName, metaKey, table,
- hrpc.Families(infoFamily),
- hrpc.Reversed(),
- hrpc.CloseScanner(),
- hrpc.NumberOfRows(1))
- if err != nil {
- return nil, "", err
- }
- scanner := c.Scan(rpc)
- resp, err := scanner.Next()
- if err == io.EOF {
- return nil, "", TableNotFound
- }
- if err != nil {
- return nil, "", err
- }
- reg, addr, err := region.ParseRegionInfo(resp)
- if err != nil {
- return nil, "", err
- }
- if !bytes.Equal(table, fullyQualifiedTable(reg)) {
- // This would indicate a bug in HBase.
- return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong table!"+
- " Looked up table=%q key=%q got region=%s", table, key, reg)
- } else if len(reg.StopKey()) != 0 &&
- bytes.Compare(key, reg.StopKey()) >= 0 {
- // This would indicate a hole in the meta table.
- return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong region!"+
- " Looked up table=%q key=%q got region=%s", table, key, reg)
- }
- return reg, addr, nil
- }
- func fullyQualifiedTable(reg hrpc.RegionInfo) []byte {
- namespace := reg.Namespace()
- table := reg.Table()
- if namespace == nil {
- return table
- }
- // non-default namespace table
- fqTable := make([]byte, 0, len(namespace)+1+len(table))
- fqTable = append(fqTable, namespace...)
- fqTable = append(fqTable, byte(':'))
- fqTable = append(fqTable, table...)
- return fqTable
- }
- func (c *client) reestablishRegion(reg hrpc.RegionInfo) {
- select {
- case <-c.done:
- return
- default:
- }
- log.WithField("region", reg).Debug("reestablishing region")
- c.establishRegion(reg, "")
- }
- // probeKey returns a key in region that is unlikely to have data at it
- // in order to test if the region is online. This prevents the Get request
- // to actually fetch the data from the storage which consumes resources
- // of the region server
- func probeKey(reg hrpc.RegionInfo) []byte {
- // now we create a probe key: reg.StartKey() + 17 zeros
- probe := make([]byte, len(reg.StartKey())+17)
- copy(probe, reg.StartKey())
- return probe
- }
- // isRegionEstablished checks whether regionserver accepts rpcs for the region.
- // Returns the cause if not established.
- func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error {
- probe, err := hrpc.NewGet(context.Background(), fullyQualifiedTable(reg), probeKey(reg),
- hrpc.SkipBatch())
- if err != nil {
- panic(fmt.Sprintf("should not happen: %s", err))
- }
- probe.ExistsOnly()
- probe.SetRegion(reg)
- res, err := sendBlocking(rc, probe)
- if err != nil {
- panic(fmt.Sprintf("should not happen: %s", err))
- }
- switch res.Error.(type) {
- case region.RetryableError, region.UnrecoverableError:
- return res.Error
- default:
- return nil
- }
- }
- func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
- var backoff time.Duration
- var err error
- for {
- backoff, err = sleepAndIncreaseBackoff(reg.Context(), backoff)
- if err != nil {
- // region is dead
- reg.MarkAvailable()
- return
- }
- if addr == "" {
- // need to look up region and address of the regionserver
- originalReg := reg
- // lookup region forever until we get it or we learn that it doesn't exist
- reg, addr, err = c.lookupRegion(originalReg.Context(),
- fullyQualifiedTable(originalReg), originalReg.StartKey())
- if err == TableNotFound {
- // region doesn't exist, delete it from caches
- c.regions.del(originalReg)
- c.clients.del(originalReg)
- originalReg.MarkAvailable()
- log.WithFields(log.Fields{
- "region": originalReg.String(),
- "err": err,
- "backoff": backoff,
- }).Info("region does not exist anymore")
- return
- } else if originalReg.Context().Err() != nil {
- // region is dead
- originalReg.MarkAvailable()
- log.WithFields(log.Fields{
- "region": originalReg.String(),
- "err": err,
- "backoff": backoff,
- }).Info("region became dead while establishing client for it")
- return
- } else if err == errMetaLookupThrottled {
- // We've been throttled, backoff and retry the lookup
- // TODO: backoff might be unnecessary
- reg = originalReg
- continue
- } else if err == ErrClientClosed {
- // client has been closed
- return
- } else if err != nil {
- log.WithFields(log.Fields{
- "region": originalReg.String(),
- "err": err,
- "backoff": backoff,
- }).Fatal("unknown error occured when looking up region")
- }
- if !bytes.Equal(reg.Name(), originalReg.Name()) {
- // put new region and remove overlapping ones.
- // Should remove the original region as well.
- reg.MarkUnavailable()
- overlaps, replaced := c.regions.put(reg)
- if !replaced {
- // a region that is the same or younger is already in cache
- reg.MarkAvailable()
- originalReg.MarkAvailable()
- return
- }
- // otherwise delete the overlapped regions in cache
- for _, r := range overlaps {
- c.clients.del(r)
- }
- // let rpcs know that they can retry and either get the newly
- // added region from cache or lookup the one they need
- originalReg.MarkAvailable()
- } else {
- // same region, discard the looked up one
- reg = originalReg
- }
- }
- // connect to the region's regionserver
- client, err := c.establishRegionClient(reg, addr)
- if err == nil {
- if reg == c.adminRegionInfo {
- reg.SetClient(client)
- reg.MarkAvailable()
- return
- }
- if existing := c.clients.put(client, reg); existing != client {
- // a client for this regionserver is already in cache, discard this one.
- client.Close()
- client = existing
- }
- if err = isRegionEstablished(client, reg); err == nil {
- // set region client so that as soon as we mark it available,
- // concurrent readers are able to find the client
- reg.SetClient(client)
- reg.MarkAvailable()
- return
- } else if _, ok := err.(region.UnrecoverableError); ok {
- // the client we got died
- c.clientDown(client)
- }
- } else if err == context.Canceled {
- // region is dead
- reg.MarkAvailable()
- return
- }
- log.WithFields(log.Fields{
- "region": reg,
- "backoff": backoff,
- "err": err,
- }).Debug("region was not established, retrying")
- // reset address because we weren't able to connect to it
- // or regionserver says it's still offline, should look up again
- addr = ""
- }
- }
- func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) {
- if backoff == 0 {
- return backoffStart, nil
- }
- select {
- case <-time.After(backoff):
- case <-ctx.Done():
- return 0, ctx.Err()
- }
- // TODO: Revisit how we back off here.
- if backoff < 5000*time.Millisecond {
- return backoff * 2, nil
- }
- return backoff + 5000*time.Millisecond, nil
- }
- func (c *client) establishRegionClient(reg hrpc.RegionInfo,
- addr string) (hrpc.RegionClient, error) {
- if c.clientType != adminClient {
- // if rpc is not for hbasemaster, check if client for regionserver
- // already exists
- if client := c.clients.checkForClient(addr); client != nil {
- // There's already a client
- return client, nil
- }
- }
- var clientType region.ClientType
- if c.clientType == standardClient {
- clientType = region.RegionClient
- } else {
- clientType = region.MasterClient
- }
- clientCtx, cancel := context.WithTimeout(reg.Context(), c.regionLookupTimeout)
- defer cancel()
- return region.NewClient(clientCtx, addr, clientType,
- c.rpcQueueSize, c.flushInterval, c.effectiveUser,
- c.regionReadTimeout)
- }
- // zkResult contains the result of a ZooKeeper lookup (when we're looking for
- // the meta region or the HMaster).
- type zkResult struct {
- addr string
- err error
- }
- // zkLookup asynchronously looks up the meta region or HMaster in ZooKeeper.
- func (c *client) zkLookup(ctx context.Context, resource zk.ResourceName) (string, error) {
- // We make this a buffered channel so that if we stop waiting due to a
- // timeout, we won't block the zkLookupSync() that we start in a
- // separate goroutine.
- reschan := make(chan zkResult, 1)
- go func() {
- addr, err := c.zkClient.LocateResource(resource.Prepend(c.zkRoot))
- // This is guaranteed to never block as the channel is always buffered.
- reschan <- zkResult{addr, err}
- }()
- select {
- case res := <-reschan:
- return res.addr, res.err
- case <-ctx.Done():
- return "", ctx.Err()
- }
- }
|