caches.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package gohbase
  6. import (
  7. "bytes"
  8. "io"
  9. "sync"
  10. "github.com/cznic/b"
  11. log "github.com/sirupsen/logrus"
  12. "github.com/tsuna/gohbase/hrpc"
  13. )
  14. // clientRegionCache is client -> region cache. Used to quickly
  15. // look up all the regioninfos that map to a specific client
  16. type clientRegionCache struct {
  17. m sync.RWMutex
  18. regions map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}
  19. }
  20. // put caches client and associates a region with it. Returns a client that is in cache.
  21. // TODO: obvious place for optimization (use map with address as key to lookup exisiting clients)
  22. func (rcc *clientRegionCache) put(c hrpc.RegionClient, r hrpc.RegionInfo) hrpc.RegionClient {
  23. rcc.m.Lock()
  24. for existingClient, regions := range rcc.regions {
  25. // check if client already exists, checking by host and port
  26. // because concurrent callers might try to put the same client
  27. if c.Addr() == existingClient.Addr() {
  28. // check client already knows about the region, checking
  29. // by pointer is enough because we make sure that there are
  30. // no regions with the same name around
  31. if _, ok := regions[r]; !ok {
  32. regions[r] = struct{}{}
  33. }
  34. rcc.m.Unlock()
  35. log.WithFields(log.Fields{
  36. "existingClient": existingClient,
  37. "client": c,
  38. }).Debug("region client is already in client's cache")
  39. return existingClient
  40. }
  41. }
  42. // no such client yet
  43. rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: struct{}{}}
  44. rcc.m.Unlock()
  45. log.WithField("client", c).Info("added new region client")
  46. return c
  47. }
  48. func (rcc *clientRegionCache) del(r hrpc.RegionInfo) {
  49. rcc.m.Lock()
  50. c := r.Client()
  51. if c != nil {
  52. r.SetClient(nil)
  53. regions := rcc.regions[c]
  54. delete(regions, r)
  55. }
  56. rcc.m.Unlock()
  57. }
  58. func (rcc *clientRegionCache) closeAll() {
  59. rcc.m.Lock()
  60. for client, regions := range rcc.regions {
  61. for region := range regions {
  62. region.MarkUnavailable()
  63. region.SetClient(nil)
  64. }
  65. client.Close()
  66. }
  67. rcc.m.Unlock()
  68. }
  69. func (rcc *clientRegionCache) clientDown(c hrpc.RegionClient) map[hrpc.RegionInfo]struct{} {
  70. rcc.m.Lock()
  71. downregions, ok := rcc.regions[c]
  72. delete(rcc.regions, c)
  73. rcc.m.Unlock()
  74. if ok {
  75. log.WithField("client", c).Info("removed region client")
  76. }
  77. return downregions
  78. }
  79. // TODO: obvious place for optimization (use map with address as key to lookup exisiting clients)
  80. func (rcc *clientRegionCache) checkForClient(addr string) hrpc.RegionClient {
  81. rcc.m.RLock()
  82. for client := range rcc.regions {
  83. if client.Addr() == addr {
  84. rcc.m.RUnlock()
  85. return client
  86. }
  87. }
  88. rcc.m.RUnlock()
  89. return nil
  90. }
  91. // key -> region cache.
  92. type keyRegionCache struct {
  93. m sync.RWMutex
  94. // Maps a []byte of a region start key to a hrpc.RegionInfo
  95. regions *b.Tree
  96. }
  97. func (krc *keyRegionCache) get(key []byte) ([]byte, hrpc.RegionInfo) {
  98. krc.m.RLock()
  99. enum, ok := krc.regions.Seek(key)
  100. if ok {
  101. krc.m.RUnlock()
  102. log.Fatalf("WTF: got exact match for region search key %q", key)
  103. return nil, nil
  104. }
  105. k, v, err := enum.Prev()
  106. enum.Close()
  107. krc.m.RUnlock()
  108. if err == io.EOF {
  109. // we are the beginning of the tree
  110. return nil, nil
  111. }
  112. return k.([]byte), v.(hrpc.RegionInfo)
  113. }
  114. func isRegionOverlap(regA, regB hrpc.RegionInfo) bool {
  115. // if region's stop key is empty, it's assumed to be the greatest key
  116. return bytes.Equal(regA.Namespace(), regB.Namespace()) &&
  117. bytes.Equal(regA.Table(), regB.Table()) &&
  118. (len(regB.StopKey()) == 0 || bytes.Compare(regA.StartKey(), regB.StopKey()) < 0) &&
  119. (len(regA.StopKey()) == 0 || bytes.Compare(regA.StopKey(), regB.StartKey()) > 0)
  120. }
  121. func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo {
  122. var overlaps []hrpc.RegionInfo
  123. var v interface{}
  124. var err error
  125. // deal with empty tree in the beginning so that we don't have to check
  126. // EOF errors for enum later
  127. if krc.regions.Len() == 0 {
  128. return overlaps
  129. }
  130. // check if key created from new region falls into any cached regions
  131. key := createRegionSearchKey(fullyQualifiedTable(reg), reg.StartKey())
  132. enum, ok := krc.regions.Seek(key)
  133. if ok {
  134. log.Fatalf("WTF: found a region with exact name as the search key %q", key)
  135. }
  136. // case 1: landed before the first region in cache
  137. // enum.Prev() returns io.EOF
  138. // enum.Next() returns io.EOF
  139. // SeekFirst() + enum.Next() returns the first region, which has larger start key
  140. // case 2: landed before the second region in cache
  141. // enum.Prev() returns the first region X and moves pointer to -infinity
  142. // enum.Next() returns io.EOF
  143. // SeekFirst() + enum.Next() returns first region X, which has smaller start key
  144. // case 3: landed anywhere after the second region
  145. // enum.Prev() returns the region X before it landed, moves pointer to the region X - 1
  146. // enum.Next() returns X - 1 and move pointer to X, which has smaller start key
  147. enum.Prev()
  148. _, _, err = enum.Next()
  149. if err == io.EOF {
  150. // we are in the beginning of tree, get new enum starting
  151. // from first region
  152. enum.Close()
  153. enum, err = krc.regions.SeekFirst()
  154. if err != nil {
  155. log.Fatalf(
  156. "error seeking first region when getting overlaps for region %v: %v", reg, err)
  157. }
  158. }
  159. _, v, err = enum.Next()
  160. if isRegionOverlap(v.(hrpc.RegionInfo), reg) {
  161. overlaps = append(overlaps, v.(hrpc.RegionInfo))
  162. }
  163. _, v, err = enum.Next()
  164. // now append all regions that overlap until the end of the tree
  165. // or until they don't overlap
  166. for err != io.EOF && isRegionOverlap(v.(hrpc.RegionInfo), reg) {
  167. overlaps = append(overlaps, v.(hrpc.RegionInfo))
  168. _, v, err = enum.Next()
  169. }
  170. enum.Close()
  171. return overlaps
  172. }
  173. // put looks up if there's already region with this name in regions cache
  174. // and if there's, returns it in overlaps and doesn't modify the cache.
  175. // Otherwise, it puts the region and removes all overlaps in case all of
  176. // them are older. Returns a slice of overlapping regions and whether
  177. // passed region was put in the cache.
  178. func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo, replaced bool) {
  179. krc.m.Lock()
  180. krc.regions.Put(reg.Name(), func(v interface{}, exists bool) (interface{}, bool) {
  181. if exists {
  182. // region is already in cache,
  183. // note: regions with the same name have the same age
  184. overlaps = []hrpc.RegionInfo{v.(hrpc.RegionInfo)}
  185. return nil, false
  186. }
  187. // find all entries that are overlapping with the range of the new region.
  188. overlaps = krc.getOverlaps(reg)
  189. for _, o := range overlaps {
  190. if o.ID() > reg.ID() {
  191. // overlapping region is younger,
  192. // don't replace any regions
  193. // TODO: figure out if there can a case where we might
  194. // have both older and younger overlapping regions, for
  195. // now we only replace if all overlaps are older
  196. return nil, false
  197. }
  198. }
  199. // all overlaps are older, put the new region
  200. replaced = true
  201. return reg, true
  202. })
  203. if !replaced {
  204. krc.m.Unlock()
  205. log.WithFields(log.Fields{
  206. "region": reg,
  207. "overlaps": overlaps,
  208. "replaced": replaced,
  209. }).Debug("region is already in cache")
  210. return
  211. }
  212. // delete overlapping regions
  213. // TODO: in case overlaps are always either younger or older,
  214. // we can just greedily remove them in Put function
  215. for _, o := range overlaps {
  216. krc.regions.Delete(o.Name())
  217. // let region establishers know that they can give up
  218. o.MarkDead()
  219. }
  220. krc.m.Unlock()
  221. log.WithFields(log.Fields{
  222. "region": reg,
  223. "overlaps": overlaps,
  224. "replaced": replaced,
  225. }).Info("added new region")
  226. return
  227. }
  228. func (krc *keyRegionCache) del(reg hrpc.RegionInfo) bool {
  229. krc.m.Lock()
  230. success := krc.regions.Delete(reg.Name())
  231. krc.m.Unlock()
  232. // let region establishers know that they can give up
  233. reg.MarkDead()
  234. log.WithFields(log.Fields{
  235. "region": reg,
  236. }).Debug("removed region")
  237. return success
  238. }