// Copyright (C) 2016 The GoHBase Authors. All rights reserved. // This file is part of GoHBase. // Use of this source code is governed by the Apache License 2.0 // that can be found in the COPYING file. package gohbase import ( "bytes" "io" "sync" "github.com/cznic/b" log "github.com/sirupsen/logrus" "github.com/tsuna/gohbase/hrpc" ) // clientRegionCache is client -> region cache. Used to quickly // look up all the regioninfos that map to a specific client type clientRegionCache struct { m sync.RWMutex regions map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{} } // put caches client and associates a region with it. Returns a client that is in cache. // TODO: obvious place for optimization (use map with address as key to lookup exisiting clients) func (rcc *clientRegionCache) put(c hrpc.RegionClient, r hrpc.RegionInfo) hrpc.RegionClient { rcc.m.Lock() for existingClient, regions := range rcc.regions { // check if client already exists, checking by host and port // because concurrent callers might try to put the same client if c.Addr() == existingClient.Addr() { // check client already knows about the region, checking // by pointer is enough because we make sure that there are // no regions with the same name around if _, ok := regions[r]; !ok { regions[r] = struct{}{} } rcc.m.Unlock() log.WithFields(log.Fields{ "existingClient": existingClient, "client": c, }).Debug("region client is already in client's cache") return existingClient } } // no such client yet rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: struct{}{}} rcc.m.Unlock() log.WithField("client", c).Info("added new region client") return c } func (rcc *clientRegionCache) del(r hrpc.RegionInfo) { rcc.m.Lock() c := r.Client() if c != nil { r.SetClient(nil) regions := rcc.regions[c] delete(regions, r) } rcc.m.Unlock() } func (rcc *clientRegionCache) closeAll() { rcc.m.Lock() for client, regions := range rcc.regions { for region := range regions { region.MarkUnavailable() region.SetClient(nil) } client.Close() } rcc.m.Unlock() } func (rcc *clientRegionCache) clientDown(c hrpc.RegionClient) map[hrpc.RegionInfo]struct{} { rcc.m.Lock() downregions, ok := rcc.regions[c] delete(rcc.regions, c) rcc.m.Unlock() if ok { log.WithField("client", c).Info("removed region client") } return downregions } // TODO: obvious place for optimization (use map with address as key to lookup exisiting clients) func (rcc *clientRegionCache) checkForClient(addr string) hrpc.RegionClient { rcc.m.RLock() for client := range rcc.regions { if client.Addr() == addr { rcc.m.RUnlock() return client } } rcc.m.RUnlock() return nil } // key -> region cache. type keyRegionCache struct { m sync.RWMutex // Maps a []byte of a region start key to a hrpc.RegionInfo regions *b.Tree } func (krc *keyRegionCache) get(key []byte) ([]byte, hrpc.RegionInfo) { krc.m.RLock() enum, ok := krc.regions.Seek(key) if ok { krc.m.RUnlock() log.Fatalf("WTF: got exact match for region search key %q", key) return nil, nil } k, v, err := enum.Prev() enum.Close() krc.m.RUnlock() if err == io.EOF { // we are the beginning of the tree return nil, nil } return k.([]byte), v.(hrpc.RegionInfo) } func isRegionOverlap(regA, regB hrpc.RegionInfo) bool { // if region's stop key is empty, it's assumed to be the greatest key return bytes.Equal(regA.Namespace(), regB.Namespace()) && bytes.Equal(regA.Table(), regB.Table()) && (len(regB.StopKey()) == 0 || bytes.Compare(regA.StartKey(), regB.StopKey()) < 0) && (len(regA.StopKey()) == 0 || bytes.Compare(regA.StopKey(), regB.StartKey()) > 0) } func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo { var overlaps []hrpc.RegionInfo var v interface{} var err error // deal with empty tree in the beginning so that we don't have to check // EOF errors for enum later if krc.regions.Len() == 0 { return overlaps } // check if key created from new region falls into any cached regions key := createRegionSearchKey(fullyQualifiedTable(reg), reg.StartKey()) enum, ok := krc.regions.Seek(key) if ok { log.Fatalf("WTF: found a region with exact name as the search key %q", key) } // case 1: landed before the first region in cache // enum.Prev() returns io.EOF // enum.Next() returns io.EOF // SeekFirst() + enum.Next() returns the first region, which has larger start key // case 2: landed before the second region in cache // enum.Prev() returns the first region X and moves pointer to -infinity // enum.Next() returns io.EOF // SeekFirst() + enum.Next() returns first region X, which has smaller start key // case 3: landed anywhere after the second region // enum.Prev() returns the region X before it landed, moves pointer to the region X - 1 // enum.Next() returns X - 1 and move pointer to X, which has smaller start key enum.Prev() _, _, err = enum.Next() if err == io.EOF { // we are in the beginning of tree, get new enum starting // from first region enum.Close() enum, err = krc.regions.SeekFirst() if err != nil { log.Fatalf( "error seeking first region when getting overlaps for region %v: %v", reg, err) } } _, v, err = enum.Next() if isRegionOverlap(v.(hrpc.RegionInfo), reg) { overlaps = append(overlaps, v.(hrpc.RegionInfo)) } _, v, err = enum.Next() // now append all regions that overlap until the end of the tree // or until they don't overlap for err != io.EOF && isRegionOverlap(v.(hrpc.RegionInfo), reg) { overlaps = append(overlaps, v.(hrpc.RegionInfo)) _, v, err = enum.Next() } enum.Close() return overlaps } // put looks up if there's already region with this name in regions cache // and if there's, returns it in overlaps and doesn't modify the cache. // Otherwise, it puts the region and removes all overlaps in case all of // them are older. Returns a slice of overlapping regions and whether // passed region was put in the cache. func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo, replaced bool) { krc.m.Lock() krc.regions.Put(reg.Name(), func(v interface{}, exists bool) (interface{}, bool) { if exists { // region is already in cache, // note: regions with the same name have the same age overlaps = []hrpc.RegionInfo{v.(hrpc.RegionInfo)} return nil, false } // find all entries that are overlapping with the range of the new region. overlaps = krc.getOverlaps(reg) for _, o := range overlaps { if o.ID() > reg.ID() { // overlapping region is younger, // don't replace any regions // TODO: figure out if there can a case where we might // have both older and younger overlapping regions, for // now we only replace if all overlaps are older return nil, false } } // all overlaps are older, put the new region replaced = true return reg, true }) if !replaced { krc.m.Unlock() log.WithFields(log.Fields{ "region": reg, "overlaps": overlaps, "replaced": replaced, }).Debug("region is already in cache") return } // delete overlapping regions // TODO: in case overlaps are always either younger or older, // we can just greedily remove them in Put function for _, o := range overlaps { krc.regions.Delete(o.Name()) // let region establishers know that they can give up o.MarkDead() } krc.m.Unlock() log.WithFields(log.Fields{ "region": reg, "overlaps": overlaps, "replaced": replaced, }).Info("added new region") return } func (krc *keyRegionCache) del(reg hrpc.RegionInfo) bool { krc.m.Lock() success := krc.regions.Delete(reg.Name()) krc.m.Unlock() // let region establishers know that they can give up reg.MarkDead() log.WithFields(log.Fields{ "region": reg, }).Debug("removed region") return success }