info.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. // Package region contains data structures to represent HBase regions.
  6. package region
  7. import (
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "fmt"
  12. "sync"
  13. "github.com/golang/protobuf/proto"
  14. "github.com/tsuna/gohbase/hrpc"
  15. "github.com/tsuna/gohbase/pb"
  16. )
  17. var defaultNamespace = []byte("default")
  18. // OfflineRegionError is returned if region is offline
  19. type OfflineRegionError struct {
  20. n string
  21. }
  22. func (e OfflineRegionError) Error() string {
  23. return fmt.Sprintf("region %s is offline", e.n)
  24. }
  25. // info describes a region.
  26. type info struct {
  27. id uint64 // A timestamp when the region is created
  28. namespace []byte
  29. table []byte
  30. name []byte
  31. startKey []byte
  32. stopKey []byte
  33. ctx context.Context
  34. cancel context.CancelFunc
  35. // The attributes before this mutex are supposed to be immutable.
  36. // The attributes defined below can be changed and accesses must
  37. // be protected with this mutex.
  38. m sync.RWMutex
  39. client hrpc.RegionClient
  40. // Once a region becomes unreachable, this channel is created, and any
  41. // functions that wish to be notified when the region becomes available
  42. // again can read from this channel, which will be closed when the region
  43. // is available again
  44. available chan struct{}
  45. }
  46. // NewInfo creates a new region info
  47. func NewInfo(id uint64, namespace, table, name, startKey, stopKey []byte) hrpc.RegionInfo {
  48. ctx, cancel := context.WithCancel(context.Background())
  49. return &info{
  50. id: id,
  51. ctx: ctx,
  52. cancel: cancel,
  53. namespace: namespace,
  54. table: table,
  55. name: name,
  56. startKey: startKey,
  57. stopKey: stopKey,
  58. }
  59. }
  60. // infoFromCell parses a KeyValue from the meta table and creates the
  61. // corresponding Info object.
  62. func infoFromCell(cell *hrpc.Cell) (hrpc.RegionInfo, error) {
  63. value := cell.Value
  64. if len(value) == 0 {
  65. return nil, fmt.Errorf("empty value in %q", cell)
  66. } else if value[0] != 'P' {
  67. return nil, fmt.Errorf("unsupported region info version %d in %q", value[0], cell)
  68. }
  69. const pbufMagic = 1346524486 // 4 bytes: "PBUF"
  70. magic := binary.BigEndian.Uint32(value[:4])
  71. if magic != pbufMagic {
  72. return nil, fmt.Errorf("invalid magic number in %q", cell)
  73. }
  74. var regInfo pb.RegionInfo
  75. err := proto.UnmarshalMerge(value[4:], &regInfo)
  76. if err != nil {
  77. return nil, fmt.Errorf("failed to decode %q: %s", cell, err)
  78. }
  79. if regInfo.GetOffline() {
  80. return nil, OfflineRegionError{n: string(cell.Row)}
  81. }
  82. var namespace []byte
  83. if !bytes.Equal(regInfo.TableName.Namespace, defaultNamespace) {
  84. // if default namespace, pretend there's no namespace
  85. namespace = regInfo.TableName.Namespace
  86. }
  87. return NewInfo(
  88. regInfo.GetRegionId(),
  89. namespace,
  90. regInfo.TableName.Qualifier,
  91. cell.Row,
  92. regInfo.StartKey,
  93. regInfo.EndKey,
  94. ), nil
  95. }
  96. // ParseRegionInfo parses the contents of a row from the meta table.
  97. // It's guaranteed to return a region info and a host:port OR return an error.
  98. func ParseRegionInfo(metaRow *hrpc.Result) (hrpc.RegionInfo, string, error) {
  99. var reg hrpc.RegionInfo
  100. var addr string
  101. for _, cell := range metaRow.Cells {
  102. switch string(cell.Qualifier) {
  103. case "regioninfo":
  104. var err error
  105. reg, err = infoFromCell(cell)
  106. if err != nil {
  107. return nil, "", err
  108. }
  109. case "server":
  110. value := cell.Value
  111. if len(value) == 0 {
  112. continue // Empty during NSRE.
  113. }
  114. addr = string(value)
  115. default:
  116. // Other kinds of qualifiers: ignore them.
  117. // TODO: If this is the parent of a split region, there are two other
  118. // KVs that could be useful: `info:splitA' and `info:splitB'.
  119. // Need to investigate whether we can use those as a hint to update our
  120. // regions_cache with the daughter regions of the split.
  121. }
  122. }
  123. if reg == nil {
  124. // There was no region in the row in meta, this is really not expected.
  125. return nil, "", fmt.Errorf("meta seems to be broken, there was no region in %v", metaRow)
  126. }
  127. if len(addr) == 0 {
  128. return nil, "", fmt.Errorf("meta doesn't have a server location in %v", metaRow)
  129. }
  130. return reg, addr, nil
  131. }
  132. // IsUnavailable returns true if this region has been marked as unavailable.
  133. func (i *info) IsUnavailable() bool {
  134. i.m.RLock()
  135. res := i.available != nil
  136. i.m.RUnlock()
  137. return res
  138. }
  139. // AvailabilityChan returns a channel that can be used to wait on for
  140. // notification that a connection to this region has been reestablished.
  141. // If this region is not marked as unavailable, nil will be returned.
  142. func (i *info) AvailabilityChan() <-chan struct{} {
  143. i.m.RLock()
  144. ch := i.available
  145. i.m.RUnlock()
  146. return ch
  147. }
  148. // MarkUnavailable will mark this region as unavailable, by creating the struct
  149. // returned by AvailabilityChan. If this region was marked as available
  150. // before this, true will be returned.
  151. func (i *info) MarkUnavailable() bool {
  152. created := false
  153. i.m.Lock()
  154. if i.available == nil {
  155. i.available = make(chan struct{})
  156. created = true
  157. }
  158. i.m.Unlock()
  159. return created
  160. }
  161. // MarkAvailable will mark this region as available again, by closing the struct
  162. // returned by AvailabilityChan
  163. func (i *info) MarkAvailable() {
  164. i.m.Lock()
  165. ch := i.available
  166. i.available = nil
  167. close(ch)
  168. i.m.Unlock()
  169. }
  170. // MarkDead will mark this region as not useful anymore to notify everyone
  171. // who's trying to use it that there's no point
  172. func (i *info) MarkDead() {
  173. i.cancel()
  174. }
  175. // Context to check if the region is dead
  176. func (i *info) Context() context.Context {
  177. return i.ctx
  178. }
  179. func (i *info) String() string {
  180. return fmt.Sprintf(
  181. "RegionInfo{Name: %q, ID: %d, Namespace: %q, Table: %q, StartKey: %q, StopKey: %q}",
  182. i.name, i.id, i.namespace, i.table, i.startKey, i.stopKey)
  183. }
  184. // ID returns region's age
  185. func (i *info) ID() uint64 {
  186. return i.id
  187. }
  188. // Name returns region name
  189. func (i *info) Name() []byte {
  190. return i.name
  191. }
  192. // StopKey return region stop key
  193. func (i *info) StopKey() []byte {
  194. return i.stopKey
  195. }
  196. // StartKey return region start key
  197. func (i *info) StartKey() []byte {
  198. return i.startKey
  199. }
  200. // Namespace returns region table
  201. func (i *info) Namespace() []byte {
  202. return i.namespace
  203. }
  204. // Table returns region table
  205. func (i *info) Table() []byte {
  206. return i.table
  207. }
  208. // Client returns region client
  209. func (i *info) Client() hrpc.RegionClient {
  210. i.m.RLock()
  211. c := i.client
  212. i.m.RUnlock()
  213. return c
  214. }
  215. // SetClient sets region client
  216. func (i *info) SetClient(c hrpc.RegionClient) {
  217. i.m.Lock()
  218. i.client = c
  219. i.m.Unlock()
  220. }
  221. // CompareGeneric is the same thing as Compare but for interface{}.
  222. func CompareGeneric(a, b interface{}) int {
  223. return Compare(a.([]byte), b.([]byte))
  224. }
  225. // Compare compares two region names.
  226. // We can't just use bytes.Compare() because it doesn't play nicely
  227. // with the way META keys are built as the first region has an empty start
  228. // key. Let's assume we know about those 2 regions in our cache:
  229. // .META.,,1
  230. // tableA,,1273018455182
  231. // We're given an RPC to execute on "tableA", row "\x00" (1 byte row key
  232. // containing a 0). If we use Compare() to sort the entries in the cache,
  233. // when we search for the entry right before "tableA,\000,:"
  234. // we'll erroneously find ".META.,,1" instead of the entry for first
  235. // region of "tableA".
  236. //
  237. // Since this scheme breaks natural ordering, we need this comparator to
  238. // implement a special version of comparison to handle this scenario.
  239. func Compare(a, b []byte) int {
  240. var length int
  241. if la, lb := len(a), len(b); la < lb {
  242. length = la
  243. } else {
  244. length = lb
  245. }
  246. // Reminder: region names are of the form:
  247. // table_name,start_key,timestamp[.MD5.]
  248. // First compare the table names.
  249. var i int
  250. for i = 0; i < length; i++ {
  251. ai := a[i] // Saves one pointer deference every iteration.
  252. bi := b[i] // Saves one pointer deference every iteration.
  253. if ai != bi { // The name of the tables differ.
  254. if ai == ',' {
  255. return -1001 // `a' has a smaller table name. a < b
  256. } else if bi == ',' {
  257. return 1001 // `b' has a smaller table name. a > b
  258. }
  259. return int(ai) - int(bi)
  260. }
  261. if ai == ',' { // Remember: at this point ai == bi.
  262. break // We're done comparing the table names. They're equal.
  263. }
  264. }
  265. // Now find the last comma in both `a' and `b'. We need to start the
  266. // search from the end as the row key could have an arbitrary number of
  267. // commas and we don't know its length.
  268. aComma := findCommaFromEnd(a, i)
  269. bComma := findCommaFromEnd(b, i)
  270. // If either `a' or `b' is followed immediately by another comma, then
  271. // they are the first region (it's the empty start key).
  272. i++ // No need to check against `length', there MUST be more bytes.
  273. // Compare keys.
  274. var firstComma int
  275. if aComma < bComma {
  276. firstComma = aComma
  277. } else {
  278. firstComma = bComma
  279. }
  280. for ; i < firstComma; i++ {
  281. ai := a[i]
  282. bi := b[i]
  283. if ai != bi { // The keys differ.
  284. return int(ai) - int(bi)
  285. }
  286. }
  287. if aComma < bComma {
  288. return -1002 // `a' has a shorter key. a < b
  289. } else if bComma < aComma {
  290. return 1002 // `b' has a shorter key. a > b
  291. }
  292. // Keys have the same length and have compared identical. Compare the
  293. // rest, which essentially means: use start code as a tie breaker.
  294. for ; /*nothing*/ i < length; i++ {
  295. ai := a[i]
  296. bi := b[i]
  297. if ai != bi { // The start codes differ.
  298. return int(ai) - int(bi)
  299. }
  300. }
  301. return len(a) - len(b)
  302. }
  303. // Because there is no `LastIndexByte()' in the standard `bytes' package.
  304. func findCommaFromEnd(b []byte, offset int) int {
  305. for i := len(b) - 1; i > offset; i-- {
  306. if b[i] == ',' {
  307. return i
  308. }
  309. }
  310. panic(fmt.Errorf("no comma found in %q after offset %d", b, offset))
  311. }