rpc.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package gohbase
  6. import (
  7. "bytes"
  8. "context"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "strconv"
  13. "time"
  14. "github.com/golang/protobuf/proto"
  15. log "github.com/sirupsen/logrus"
  16. "github.com/tsuna/gohbase/hrpc"
  17. "github.com/tsuna/gohbase/region"
  18. "github.com/tsuna/gohbase/zk"
  19. )
  20. // Constants
  21. var (
  22. // Name of the meta region.
  23. metaTableName = []byte("hbase:meta")
  24. infoFamily = map[string][]string{
  25. "info": nil,
  26. }
  27. // ErrRegionUnavailable is returned when sending rpc to a region that is unavailable
  28. ErrRegionUnavailable = errors.New("region unavailable")
  29. // TableNotFound is returned when attempting to access a table that
  30. // doesn't exist on this cluster.
  31. TableNotFound = errors.New("table not found")
  32. // ErrCannotFindRegion is returned when it took too many tries to find a
  33. // region for the request. It's likely that hbase:meta has overlaps or some other
  34. // inconsistency.
  35. ErrConnotFindRegion = errors.New("cannot find region for the rpc")
  36. // ErrClientClosed is returned when the gohbase client has been closed
  37. ErrClientClosed = errors.New("client is closed")
  38. // errMetaLookupThrottled is returned when a lookup for the rpc's region
  39. // has been throttled.
  40. errMetaLookupThrottled = errors.New("lookup to hbase:meta has been throttled")
  41. )
  42. const (
  43. // maxSendRPCTries is the maximum number of times to try to send an RPC
  44. maxSendRPCTries = 10
  45. backoffStart = 16 * time.Millisecond
  46. )
  47. func (c *client) SendRPC(rpc hrpc.Call) (proto.Message, error) {
  48. var err error
  49. for i := 0; i < maxSendRPCTries; i++ {
  50. // Check the cache for a region that can handle this request
  51. reg := c.getRegionFromCache(rpc.Table(), rpc.Key())
  52. if reg == nil {
  53. reg, err = c.findRegion(rpc.Context(), rpc.Table(), rpc.Key())
  54. if err == ErrRegionUnavailable {
  55. continue
  56. } else if err == errMetaLookupThrottled {
  57. // lookup for region has been throttled, check the cache
  58. // again but don't count this as SendRPC try as there
  59. // might be just too many request going on at a time.
  60. i--
  61. continue
  62. } else if err != nil {
  63. return nil, err
  64. }
  65. }
  66. msg, err := c.sendRPCToRegion(rpc, reg)
  67. switch err {
  68. case ErrRegionUnavailable:
  69. if ch := reg.AvailabilityChan(); ch != nil {
  70. // The region is unavailable. Wait for it to become available,
  71. // a new region or for the deadline to be exceeded.
  72. select {
  73. case <-rpc.Context().Done():
  74. return nil, rpc.Context().Err()
  75. case <-c.done:
  76. return nil, ErrClientClosed
  77. case <-ch:
  78. }
  79. }
  80. default:
  81. return msg, err
  82. }
  83. }
  84. return nil, ErrConnotFindRegion
  85. }
  86. func sendBlocking(rc hrpc.RegionClient, rpc hrpc.Call) (hrpc.RPCResult, error) {
  87. rc.QueueRPC(rpc)
  88. var res hrpc.RPCResult
  89. // Wait for the response
  90. select {
  91. case res = <-rpc.ResultChan():
  92. return res, nil
  93. case <-rpc.Context().Done():
  94. return res, rpc.Context().Err()
  95. }
  96. }
  97. func (c *client) sendRPCToRegion(rpc hrpc.Call, reg hrpc.RegionInfo) (proto.Message, error) {
  98. if reg.IsUnavailable() {
  99. return nil, ErrRegionUnavailable
  100. }
  101. rpc.SetRegion(reg)
  102. // Queue the RPC to be sent to the region
  103. client := reg.Client()
  104. if client == nil {
  105. // There was an error queueing the RPC.
  106. // Mark the region as unavailable.
  107. if reg.MarkUnavailable() {
  108. // If this was the first goroutine to mark the region as
  109. // unavailable, start a goroutine to reestablish a connection
  110. go c.reestablishRegion(reg)
  111. }
  112. return nil, ErrRegionUnavailable
  113. }
  114. res, err := sendBlocking(client, rpc)
  115. if err != nil {
  116. return nil, err
  117. }
  118. // Check for errors
  119. switch res.Error.(type) {
  120. case region.RetryableError:
  121. // There's an error specific to this region, but
  122. // our region client is fine. Mark this region as
  123. // unavailable (as opposed to all regions sharing
  124. // the client), and start a goroutine to reestablish
  125. // it.
  126. if reg.MarkUnavailable() {
  127. go c.reestablishRegion(reg)
  128. }
  129. return nil, ErrRegionUnavailable
  130. case region.UnrecoverableError:
  131. // If it was an unrecoverable error, the region client is
  132. // considered dead.
  133. if reg == c.adminRegionInfo {
  134. // If this is the admin client, mark the region
  135. // as unavailable and start up a goroutine to
  136. // reconnect if it wasn't already marked as such.
  137. if reg.MarkUnavailable() {
  138. go c.reestablishRegion(reg)
  139. }
  140. } else {
  141. c.clientDown(client)
  142. }
  143. // Fall through to the case of the region being unavailable,
  144. // which will result in blocking until it's available again.
  145. return nil, ErrRegionUnavailable
  146. default:
  147. // RPC was successfully sent, or an unknown type of error
  148. // occurred. In either case, return the results.
  149. return res.Msg, res.Error
  150. }
  151. }
  152. // clientDown removes client from cache and marks
  153. // all the regions sharing this region's
  154. // client as unavailable, and start a goroutine
  155. // to reconnect for each of them.
  156. func (c *client) clientDown(client hrpc.RegionClient) {
  157. downregions := c.clients.clientDown(client)
  158. for downreg := range downregions {
  159. if downreg.MarkUnavailable() {
  160. downreg.SetClient(nil)
  161. go c.reestablishRegion(downreg)
  162. }
  163. }
  164. }
  165. func (c *client) lookupRegion(ctx context.Context,
  166. table, key []byte) (hrpc.RegionInfo, string, error) {
  167. var reg hrpc.RegionInfo
  168. var addr string
  169. var err error
  170. backoff := backoffStart
  171. for {
  172. // If it takes longer than regionLookupTimeout, fail so that we can sleep
  173. lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout)
  174. if c.clientType == adminClient {
  175. log.WithField("resource", zk.Master).Debug("looking up master")
  176. addr, err = c.zkLookup(lookupCtx, zk.Master)
  177. cancel()
  178. reg = c.adminRegionInfo
  179. } else if bytes.Compare(table, metaTableName) == 0 {
  180. log.WithField("resource", zk.Meta).Debug("looking up region server of hbase:meta")
  181. addr, err = c.zkLookup(lookupCtx, zk.Meta)
  182. cancel()
  183. reg = c.metaRegionInfo
  184. } else {
  185. log.WithFields(log.Fields{
  186. "table": strconv.Quote(string(table)),
  187. "key": strconv.Quote(string(key)),
  188. }).Debug("looking up region")
  189. reg, addr, err = c.metaLookup(lookupCtx, table, key)
  190. cancel()
  191. if err == TableNotFound {
  192. log.WithFields(log.Fields{
  193. "table": strconv.Quote(string(table)),
  194. "key": strconv.Quote(string(key)),
  195. "err": err,
  196. }).Debug("hbase:meta does not know about this table/key")
  197. return nil, "", err
  198. } else if err == errMetaLookupThrottled {
  199. return nil, "", err
  200. } else if err == ErrClientClosed {
  201. return nil, "", err
  202. }
  203. }
  204. if err == nil {
  205. log.WithFields(log.Fields{
  206. "table": strconv.Quote(string(table)),
  207. "key": strconv.Quote(string(key)),
  208. "region": reg,
  209. "addr": addr,
  210. }).Debug("looked up a region")
  211. return reg, addr, nil
  212. }
  213. log.WithFields(log.Fields{
  214. "table": strconv.Quote(string(table)),
  215. "key": strconv.Quote(string(key)),
  216. "backoff": backoff,
  217. "err": err,
  218. }).Error("failed looking up region")
  219. // This will be hit if there was an error locating the region
  220. backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
  221. if err != nil {
  222. return nil, "", err
  223. }
  224. }
  225. }
  226. func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) {
  227. // The region was not in the cache, it
  228. // must be looked up in the meta table
  229. reg, addr, err := c.lookupRegion(ctx, table, key)
  230. if err != nil {
  231. return nil, err
  232. }
  233. // We are the ones that looked up the region, so we need to
  234. // mark in unavailable and find a client for it.
  235. reg.MarkUnavailable()
  236. if reg != c.metaRegionInfo && reg != c.adminRegionInfo {
  237. // Check that the region wasn't added to
  238. // the cache while we were looking it up.
  239. overlaps, replaced := c.regions.put(reg)
  240. if !replaced {
  241. // the same or younger regions are already in cache, retry looking up in cache
  242. return nil, ErrRegionUnavailable
  243. }
  244. // otherwise, new region in cache, delete overlaps from client's cache
  245. for _, r := range overlaps {
  246. c.clients.del(r)
  247. }
  248. }
  249. // Start a goroutine to connect to the region
  250. go c.establishRegion(reg, addr)
  251. // Wait for the new region to become
  252. // available, and then send the RPC
  253. return reg, nil
  254. }
  255. // Searches in the regions cache for the region hosting the given row.
  256. func (c *client) getRegionFromCache(table, key []byte) hrpc.RegionInfo {
  257. if c.clientType == adminClient {
  258. return c.adminRegionInfo
  259. } else if bytes.Equal(table, metaTableName) {
  260. return c.metaRegionInfo
  261. }
  262. regionName := createRegionSearchKey(table, key)
  263. _, region := c.regions.get(regionName)
  264. if region == nil {
  265. return nil
  266. }
  267. // make sure the returned region is for the same table
  268. if !bytes.Equal(fullyQualifiedTable(region), table) {
  269. // not the same table, can happen if we got the last region
  270. return nil
  271. }
  272. if len(region.StopKey()) != 0 &&
  273. // If the stop key is an empty byte array, it means this region is the
  274. // last region for this table and this key ought to be in that region.
  275. bytes.Compare(key, region.StopKey()) >= 0 {
  276. return nil
  277. }
  278. return region
  279. }
  280. // Creates the META key to search for in order to locate the given key.
  281. func createRegionSearchKey(table, key []byte) []byte {
  282. metaKey := make([]byte, 0, len(table)+len(key)+3)
  283. metaKey = append(metaKey, table...)
  284. metaKey = append(metaKey, ',')
  285. metaKey = append(metaKey, key...)
  286. metaKey = append(metaKey, ',')
  287. // ':' is the first byte greater than '9'. We always want to find the
  288. // entry with the greatest timestamp, so by looking right before ':'
  289. // we'll find it.
  290. metaKey = append(metaKey, ':')
  291. return metaKey
  292. }
  293. // lookupLimit throttles lookups to hbase:meta to metaLookupLimit requests
  294. // per metaLookupInterval. It returns nil if we were lucky enough to
  295. // reserve right away and errMetaLookupThrottled or context's error otherwise.
  296. func (c *client) metaLookupLimit(ctx context.Context) error {
  297. r := c.metaLookupLimiter.Reserve()
  298. if !r.OK() {
  299. panic("wtf: cannot reserve a meta lookup")
  300. }
  301. delay := r.Delay()
  302. if delay <= 0 {
  303. return nil
  304. }
  305. // We've been rate limitted
  306. t := time.NewTimer(delay)
  307. defer t.Stop()
  308. select {
  309. case <-t.C:
  310. return errMetaLookupThrottled
  311. case <-ctx.Done():
  312. r.Cancel()
  313. return ctx.Err()
  314. }
  315. }
  316. // metaLookup checks meta table for the region in which the given row key for the given table is.
  317. func (c *client) metaLookup(ctx context.Context,
  318. table, key []byte) (hrpc.RegionInfo, string, error) {
  319. metaKey := createRegionSearchKey(table, key)
  320. rpc, err := hrpc.NewScanRange(ctx, metaTableName, metaKey, table,
  321. hrpc.Families(infoFamily),
  322. hrpc.Reversed(),
  323. hrpc.CloseScanner(),
  324. hrpc.NumberOfRows(1))
  325. if err != nil {
  326. return nil, "", err
  327. }
  328. scanner := c.Scan(rpc)
  329. resp, err := scanner.Next()
  330. if err == io.EOF {
  331. return nil, "", TableNotFound
  332. }
  333. if err != nil {
  334. return nil, "", err
  335. }
  336. reg, addr, err := region.ParseRegionInfo(resp)
  337. if err != nil {
  338. return nil, "", err
  339. }
  340. if !bytes.Equal(table, fullyQualifiedTable(reg)) {
  341. // This would indicate a bug in HBase.
  342. return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong table!"+
  343. " Looked up table=%q key=%q got region=%s", table, key, reg)
  344. } else if len(reg.StopKey()) != 0 &&
  345. bytes.Compare(key, reg.StopKey()) >= 0 {
  346. // This would indicate a hole in the meta table.
  347. return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong region!"+
  348. " Looked up table=%q key=%q got region=%s", table, key, reg)
  349. }
  350. return reg, addr, nil
  351. }
  352. func fullyQualifiedTable(reg hrpc.RegionInfo) []byte {
  353. namespace := reg.Namespace()
  354. table := reg.Table()
  355. if namespace == nil {
  356. return table
  357. }
  358. // non-default namespace table
  359. fqTable := make([]byte, 0, len(namespace)+1+len(table))
  360. fqTable = append(fqTable, namespace...)
  361. fqTable = append(fqTable, byte(':'))
  362. fqTable = append(fqTable, table...)
  363. return fqTable
  364. }
  365. func (c *client) reestablishRegion(reg hrpc.RegionInfo) {
  366. select {
  367. case <-c.done:
  368. return
  369. default:
  370. }
  371. log.WithField("region", reg).Debug("reestablishing region")
  372. c.establishRegion(reg, "")
  373. }
  374. // probeKey returns a key in region that is unlikely to have data at it
  375. // in order to test if the region is online. This prevents the Get request
  376. // to actually fetch the data from the storage which consumes resources
  377. // of the region server
  378. func probeKey(reg hrpc.RegionInfo) []byte {
  379. // now we create a probe key: reg.StartKey() + 17 zeros
  380. probe := make([]byte, len(reg.StartKey())+17)
  381. copy(probe, reg.StartKey())
  382. return probe
  383. }
  384. // isRegionEstablished checks whether regionserver accepts rpcs for the region.
  385. // Returns the cause if not established.
  386. func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error {
  387. probe, err := hrpc.NewGet(context.Background(), fullyQualifiedTable(reg), probeKey(reg),
  388. hrpc.SkipBatch())
  389. if err != nil {
  390. panic(fmt.Sprintf("should not happen: %s", err))
  391. }
  392. probe.ExistsOnly()
  393. probe.SetRegion(reg)
  394. res, err := sendBlocking(rc, probe)
  395. if err != nil {
  396. panic(fmt.Sprintf("should not happen: %s", err))
  397. }
  398. switch res.Error.(type) {
  399. case region.RetryableError, region.UnrecoverableError:
  400. return res.Error
  401. default:
  402. return nil
  403. }
  404. }
  405. func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
  406. var backoff time.Duration
  407. var err error
  408. for {
  409. backoff, err = sleepAndIncreaseBackoff(reg.Context(), backoff)
  410. if err != nil {
  411. // region is dead
  412. reg.MarkAvailable()
  413. return
  414. }
  415. if addr == "" {
  416. // need to look up region and address of the regionserver
  417. originalReg := reg
  418. // lookup region forever until we get it or we learn that it doesn't exist
  419. reg, addr, err = c.lookupRegion(originalReg.Context(),
  420. fullyQualifiedTable(originalReg), originalReg.StartKey())
  421. if err == TableNotFound {
  422. // region doesn't exist, delete it from caches
  423. c.regions.del(originalReg)
  424. c.clients.del(originalReg)
  425. originalReg.MarkAvailable()
  426. log.WithFields(log.Fields{
  427. "region": originalReg.String(),
  428. "err": err,
  429. "backoff": backoff,
  430. }).Info("region does not exist anymore")
  431. return
  432. } else if originalReg.Context().Err() != nil {
  433. // region is dead
  434. originalReg.MarkAvailable()
  435. log.WithFields(log.Fields{
  436. "region": originalReg.String(),
  437. "err": err,
  438. "backoff": backoff,
  439. }).Info("region became dead while establishing client for it")
  440. return
  441. } else if err == errMetaLookupThrottled {
  442. // We've been throttled, backoff and retry the lookup
  443. // TODO: backoff might be unnecessary
  444. reg = originalReg
  445. continue
  446. } else if err == ErrClientClosed {
  447. // client has been closed
  448. return
  449. } else if err != nil {
  450. log.WithFields(log.Fields{
  451. "region": originalReg.String(),
  452. "err": err,
  453. "backoff": backoff,
  454. }).Fatal("unknown error occured when looking up region")
  455. }
  456. if !bytes.Equal(reg.Name(), originalReg.Name()) {
  457. // put new region and remove overlapping ones.
  458. // Should remove the original region as well.
  459. reg.MarkUnavailable()
  460. overlaps, replaced := c.regions.put(reg)
  461. if !replaced {
  462. // a region that is the same or younger is already in cache
  463. reg.MarkAvailable()
  464. originalReg.MarkAvailable()
  465. return
  466. }
  467. // otherwise delete the overlapped regions in cache
  468. for _, r := range overlaps {
  469. c.clients.del(r)
  470. }
  471. // let rpcs know that they can retry and either get the newly
  472. // added region from cache or lookup the one they need
  473. originalReg.MarkAvailable()
  474. } else {
  475. // same region, discard the looked up one
  476. reg = originalReg
  477. }
  478. }
  479. // connect to the region's regionserver
  480. client, err := c.establishRegionClient(reg, addr)
  481. if err == nil {
  482. if reg == c.adminRegionInfo {
  483. reg.SetClient(client)
  484. reg.MarkAvailable()
  485. return
  486. }
  487. if existing := c.clients.put(client, reg); existing != client {
  488. // a client for this regionserver is already in cache, discard this one.
  489. client.Close()
  490. client = existing
  491. }
  492. if err = isRegionEstablished(client, reg); err == nil {
  493. // set region client so that as soon as we mark it available,
  494. // concurrent readers are able to find the client
  495. reg.SetClient(client)
  496. reg.MarkAvailable()
  497. return
  498. } else if _, ok := err.(region.UnrecoverableError); ok {
  499. // the client we got died
  500. c.clientDown(client)
  501. }
  502. } else if err == context.Canceled {
  503. // region is dead
  504. reg.MarkAvailable()
  505. return
  506. }
  507. log.WithFields(log.Fields{
  508. "region": reg,
  509. "backoff": backoff,
  510. "err": err,
  511. }).Debug("region was not established, retrying")
  512. // reset address because we weren't able to connect to it
  513. // or regionserver says it's still offline, should look up again
  514. addr = ""
  515. }
  516. }
  517. func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) {
  518. if backoff == 0 {
  519. return backoffStart, nil
  520. }
  521. select {
  522. case <-time.After(backoff):
  523. case <-ctx.Done():
  524. return 0, ctx.Err()
  525. }
  526. // TODO: Revisit how we back off here.
  527. if backoff < 5000*time.Millisecond {
  528. return backoff * 2, nil
  529. }
  530. return backoff + 5000*time.Millisecond, nil
  531. }
  532. func (c *client) establishRegionClient(reg hrpc.RegionInfo,
  533. addr string) (hrpc.RegionClient, error) {
  534. if c.clientType != adminClient {
  535. // if rpc is not for hbasemaster, check if client for regionserver
  536. // already exists
  537. if client := c.clients.checkForClient(addr); client != nil {
  538. // There's already a client
  539. return client, nil
  540. }
  541. }
  542. var clientType region.ClientType
  543. if c.clientType == standardClient {
  544. clientType = region.RegionClient
  545. } else {
  546. clientType = region.MasterClient
  547. }
  548. clientCtx, cancel := context.WithTimeout(reg.Context(), c.regionLookupTimeout)
  549. defer cancel()
  550. return region.NewClient(clientCtx, addr, clientType,
  551. c.rpcQueueSize, c.flushInterval, c.effectiveUser,
  552. c.regionReadTimeout)
  553. }
  554. // zkResult contains the result of a ZooKeeper lookup (when we're looking for
  555. // the meta region or the HMaster).
  556. type zkResult struct {
  557. addr string
  558. err error
  559. }
  560. // zkLookup asynchronously looks up the meta region or HMaster in ZooKeeper.
  561. func (c *client) zkLookup(ctx context.Context, resource zk.ResourceName) (string, error) {
  562. // We make this a buffered channel so that if we stop waiting due to a
  563. // timeout, we won't block the zkLookupSync() that we start in a
  564. // separate goroutine.
  565. reschan := make(chan zkResult, 1)
  566. go func() {
  567. addr, err := c.zkClient.LocateResource(resource.Prepend(c.zkRoot))
  568. // This is guaranteed to never block as the channel is always buffered.
  569. reschan <- zkResult{addr, err}
  570. }()
  571. select {
  572. case res := <-reschan:
  573. return res.addr, res.err
  574. case <-ctx.Done():
  575. return "", ctx.Err()
  576. }
  577. }