client.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package region
  6. import (
  7. "encoding/binary"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. log "github.com/sirupsen/logrus"
  16. "github.com/golang/protobuf/proto"
  17. "github.com/tsuna/gohbase/hrpc"
  18. "github.com/tsuna/gohbase/pb"
  19. )
  20. // ClientType is a type alias to represent the type of this region client
  21. type ClientType string
  22. type canDeserializeCellBlocks interface {
  23. // DeserializeCellBlocks populates passed protobuf message with results
  24. // deserialized from the reader and returns number of bytes read or error.
  25. DeserializeCellBlocks(proto.Message, []byte) (uint32, error)
  26. }
  27. var (
  28. // ErrMissingCallID is used when HBase sends us a response message for a
  29. // request that we didn't send
  30. ErrMissingCallID = errors.New("got a response with a nonsensical call ID")
  31. // ErrClientDead is returned to rpcs when Close() is called or when client
  32. // died because of failed send or receive
  33. ErrClientDead = UnrecoverableError{errors.New("client is dead")}
  34. // javaRetryableExceptions is a map where all Java exceptions that signify
  35. // the RPC should be sent again are listed (as keys). If a Java exception
  36. // listed here is returned by HBase, the client should attempt to resend
  37. // the RPC message, potentially via a different region client.
  38. javaRetryableExceptions = map[string]struct{}{
  39. "org.apache.hadoop.hbase.CallQueueTooBigException": struct{}{},
  40. "org.apache.hadoop.hbase.NotServingRegionException": struct{}{},
  41. "org.apache.hadoop.hbase.exceptions.RegionMovedException": struct{}{},
  42. "org.apache.hadoop.hbase.exceptions.RegionOpeningException": struct{}{},
  43. "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException": struct{}{},
  44. "org.apache.hadoop.hbase.quotas.RpcThrottlingException": struct{}{},
  45. "org.apache.hadoop.hbase.RetryImmediatelyException": struct{}{},
  46. }
  47. // javaUnrecoverableExceptions is a map where all Java exceptions that signify
  48. // the RPC should be sent again are listed (as keys). If a Java exception
  49. // listed here is returned by HBase, the RegionClient will be closed and a new
  50. // one should be established.
  51. javaUnrecoverableExceptions = map[string]struct{}{
  52. "org.apache.hadoop.hbase.regionserver.RegionServerAbortedException": struct{}{},
  53. "org.apache.hadoop.hbase.regionserver.RegionServerStoppedException": struct{}{},
  54. }
  55. )
  56. const (
  57. //DefaultLookupTimeout is the default region lookup timeout
  58. DefaultLookupTimeout = 30 * time.Second
  59. //DefaultReadTimeout is the default region read timeout
  60. DefaultReadTimeout = 30 * time.Second
  61. // RegionClient is a ClientType that means this will be a normal client
  62. RegionClient = ClientType("ClientService")
  63. // MasterClient is a ClientType that means this client will talk to the
  64. // master server
  65. MasterClient = ClientType("MasterService")
  66. )
  67. var bufferPool = sync.Pool{
  68. New: func() interface{} {
  69. var b []byte
  70. return b
  71. },
  72. }
  73. func newBuffer(size int) []byte {
  74. b := bufferPool.Get().([]byte)
  75. if cap(b) < size {
  76. doublecap := 2 * cap(b)
  77. if doublecap > size {
  78. return make([]byte, size, doublecap)
  79. }
  80. return make([]byte, size)
  81. }
  82. return b[:size]
  83. }
  84. func freeBuffer(b []byte) {
  85. bufferPool.Put(b[:0])
  86. }
  87. // UnrecoverableError is an error that this region.Client can't recover from.
  88. // The connection to the RegionServer has to be closed and all queued and
  89. // outstanding RPCs will be failed / retried.
  90. type UnrecoverableError struct {
  91. error
  92. }
  93. func (e UnrecoverableError) Error() string {
  94. return e.error.Error()
  95. }
  96. // RetryableError is an error that indicates the RPC should be retried because
  97. // the error is transient (e.g. a region being momentarily unavailable).
  98. type RetryableError struct {
  99. error
  100. }
  101. func (e RetryableError) Error() string {
  102. return e.error.Error()
  103. }
  104. // client manages a connection to a RegionServer.
  105. type client struct {
  106. conn net.Conn
  107. // Address of the RegionServer.
  108. addr string
  109. // once used for concurrent calls to fail
  110. once sync.Once
  111. rpcs chan hrpc.Call
  112. done chan struct{}
  113. // sent contains the mapping of sent call IDs to RPC calls, so that when
  114. // a response is received it can be tied to the correct RPC
  115. sentM sync.Mutex // protects sent
  116. sent map[uint32]hrpc.Call
  117. // inFlight is number of rpcs sent to regionserver awaiting response
  118. inFlightM sync.Mutex // protects inFlight and SetReadDeadline
  119. inFlight uint32
  120. id uint32
  121. rpcQueueSize int
  122. flushInterval time.Duration
  123. effectiveUser string
  124. // readTimeout is the maximum amount of time to wait for regionserver reply
  125. readTimeout time.Duration
  126. }
  127. // QueueRPC will add an rpc call to the queue for processing by the writer goroutine
  128. func (c *client) QueueRPC(rpc hrpc.Call) {
  129. if b, ok := rpc.(hrpc.Batchable); ok && c.rpcQueueSize > 1 && !b.SkipBatch() {
  130. // queue up the rpc
  131. select {
  132. case <-rpc.Context().Done():
  133. // rpc timed out before being processed
  134. case <-c.done:
  135. returnResult(rpc, nil, ErrClientDead)
  136. case c.rpcs <- rpc:
  137. }
  138. } else {
  139. if err := c.trySend(rpc); err != nil {
  140. returnResult(rpc, nil, err)
  141. }
  142. }
  143. }
  144. // Close asks this region.Client to close its connection to the RegionServer.
  145. // All queued and outstanding RPCs, if any, will be failed as if a connection
  146. // error had happened.
  147. func (c *client) Close() {
  148. c.fail(ErrClientDead)
  149. }
  150. // Addr returns address of the region server the client is connected to
  151. func (c *client) Addr() string {
  152. return c.addr
  153. }
  154. // String returns a string represintation of the current region client
  155. func (c *client) String() string {
  156. return fmt.Sprintf("RegionClient{Addr: %s}", c.addr)
  157. }
  158. func (c *client) inFlightUp() {
  159. c.inFlightM.Lock()
  160. c.inFlight++
  161. // we expect that at least the last request can be completed within readTimeout
  162. c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
  163. c.inFlightM.Unlock()
  164. }
  165. func (c *client) inFlightDown() {
  166. c.inFlightM.Lock()
  167. c.inFlight--
  168. // reset read timeout if we are not waiting for any responses
  169. // in order to prevent from closing this client if there are no request
  170. if c.inFlight == 0 {
  171. c.conn.SetReadDeadline(time.Time{})
  172. }
  173. c.inFlightM.Unlock()
  174. }
  175. func (c *client) fail(err error) {
  176. c.once.Do(func() {
  177. log.WithFields(log.Fields{
  178. "client": c,
  179. "err": err,
  180. }).Error("error occured, closing region client")
  181. // we don't close c.rpcs channel to make it block in select of QueueRPC
  182. // and avoid dealing with synchronization of closing it while someone
  183. // might be sending to it. Go's GC will take care of it.
  184. // tell goroutines to stop
  185. close(c.done)
  186. // close connection to the regionserver
  187. // to let it know that we can't receive anymore
  188. // and fail all the rpcs being sent
  189. c.conn.Close()
  190. c.failSentRPCs()
  191. })
  192. }
  193. func (c *client) failSentRPCs() {
  194. // channel is closed, clean up awaiting rpcs
  195. c.sentM.Lock()
  196. sent := c.sent
  197. c.sent = make(map[uint32]hrpc.Call)
  198. c.sentM.Unlock()
  199. log.WithFields(log.Fields{
  200. "client": c,
  201. "count": len(sent),
  202. }).Debug("failing awaiting RPCs")
  203. // send error to awaiting rpcs
  204. for _, rpc := range sent {
  205. returnResult(rpc, nil, ErrClientDead)
  206. }
  207. }
  208. func (c *client) registerRPC(rpc hrpc.Call) uint32 {
  209. currID := atomic.AddUint32(&c.id, 1)
  210. c.sentM.Lock()
  211. c.sent[currID] = rpc
  212. c.sentM.Unlock()
  213. return currID
  214. }
  215. func (c *client) unregisterRPC(id uint32) hrpc.Call {
  216. c.sentM.Lock()
  217. rpc := c.sent[id]
  218. delete(c.sent, id)
  219. c.sentM.Unlock()
  220. return rpc
  221. }
  222. func (c *client) processRPCs() {
  223. // TODO: flush when the size is too large
  224. // TODO: if multi has only one call, send that call instead
  225. m := newMulti(c.rpcQueueSize)
  226. defer func() {
  227. m.returnResults(nil, ErrClientDead)
  228. }()
  229. flush := func() {
  230. if log.GetLevel() == log.DebugLevel {
  231. log.WithFields(log.Fields{
  232. "len": m.len(),
  233. "addr": c.Addr(),
  234. }).Debug("flushing MultiRequest")
  235. }
  236. if err := c.trySend(m); err != nil {
  237. m.returnResults(nil, err)
  238. }
  239. m = newMulti(c.rpcQueueSize)
  240. }
  241. for {
  242. // first loop is to accomodate request heavy workload
  243. // it will batch as long as conccurent writers are sending
  244. // new rpcs or until multi is filled up
  245. for {
  246. select {
  247. case <-c.done:
  248. return
  249. case rpc := <-c.rpcs:
  250. // have things queued up, batch them
  251. if !m.add(rpc) {
  252. // can still put more rpcs into batch
  253. continue
  254. }
  255. default:
  256. // no more rpcs queued up
  257. }
  258. break
  259. }
  260. if l := m.len(); l == 0 {
  261. // wait for the next batch
  262. select {
  263. case <-c.done:
  264. return
  265. case rpc := <-c.rpcs:
  266. m.add(rpc)
  267. }
  268. continue
  269. } else if l == c.rpcQueueSize || c.flushInterval == 0 {
  270. // batch is full, flush
  271. flush()
  272. continue
  273. }
  274. // second loop is to accomodate less frequent callers
  275. // that would like to maximize their batches at the expense
  276. // of waiting for flushInteval
  277. timer := time.NewTimer(c.flushInterval)
  278. for {
  279. select {
  280. case <-c.done:
  281. return
  282. case <-timer.C:
  283. // time to flush
  284. case rpc := <-c.rpcs:
  285. if !m.add(rpc) {
  286. // can still put more rpcs into batch
  287. continue
  288. }
  289. // batch is full
  290. if !timer.Stop() {
  291. <-timer.C
  292. }
  293. }
  294. break
  295. }
  296. flush()
  297. }
  298. }
  299. func returnResult(c hrpc.Call, msg proto.Message, err error) {
  300. if m, ok := c.(*multi); ok {
  301. m.returnResults(msg, err)
  302. } else {
  303. c.ResultChan() <- hrpc.RPCResult{Msg: msg, Error: err}
  304. }
  305. }
  306. func (c *client) trySend(rpc hrpc.Call) error {
  307. select {
  308. case <-c.done:
  309. // An unrecoverable error has occured,
  310. // region client has been stopped,
  311. // don't send rpcs
  312. return ErrClientDead
  313. case <-rpc.Context().Done():
  314. // If the deadline has been exceeded, don't bother sending the
  315. // request. The function that placed the RPC in our queue should
  316. // stop waiting for a result and return an error.
  317. return nil
  318. default:
  319. if id, err := c.send(rpc); err != nil {
  320. if _, ok := err.(UnrecoverableError); ok {
  321. c.fail(err)
  322. }
  323. if r := c.unregisterRPC(id); r != nil {
  324. // we are the ones to unregister the rpc,
  325. // return err to notify client of it
  326. return err
  327. }
  328. }
  329. return nil
  330. }
  331. }
  332. func (c *client) receiveRPCs() {
  333. for {
  334. select {
  335. case <-c.done:
  336. return
  337. default:
  338. if err := c.receive(); err != nil {
  339. switch err.(type) {
  340. case UnrecoverableError:
  341. c.fail(err)
  342. return
  343. case RetryableError:
  344. continue
  345. }
  346. }
  347. }
  348. }
  349. }
  350. func (c *client) receive() (err error) {
  351. var (
  352. sz [4]byte
  353. header pb.ResponseHeader
  354. response proto.Message
  355. )
  356. err = c.readFully(sz[:])
  357. if err != nil {
  358. return UnrecoverableError{err}
  359. }
  360. size := binary.BigEndian.Uint32(sz[:])
  361. b := make([]byte, size)
  362. err = c.readFully(b)
  363. if err != nil {
  364. return UnrecoverableError{err}
  365. }
  366. buf := proto.NewBuffer(b)
  367. if err = buf.DecodeMessage(&header); err != nil {
  368. return fmt.Errorf("failed to decode the response header: %s", err)
  369. }
  370. if header.CallId == nil {
  371. return ErrMissingCallID
  372. }
  373. callID := *header.CallId
  374. rpc := c.unregisterRPC(callID)
  375. if rpc == nil {
  376. return fmt.Errorf("got a response with an unexpected call ID: %d", callID)
  377. }
  378. c.inFlightDown()
  379. select {
  380. case <-rpc.Context().Done():
  381. // context has expired, don't bother deserializing
  382. return
  383. default:
  384. }
  385. // Here we know for sure that we got a response for rpc we asked.
  386. // It's our responsibility to deliver the response or error to the
  387. // caller as we unregistered the rpc.
  388. defer func() { returnResult(rpc, response, err) }()
  389. if header.Exception == nil {
  390. response = rpc.NewResponse()
  391. if err = buf.DecodeMessage(response); err != nil {
  392. err = fmt.Errorf("failed to decode the response: %s", err)
  393. return
  394. }
  395. var cellsLen uint32
  396. if header.CellBlockMeta != nil {
  397. cellsLen = header.CellBlockMeta.GetLength()
  398. }
  399. if d, ok := rpc.(canDeserializeCellBlocks); cellsLen > 0 && ok {
  400. b := buf.Bytes()[size-cellsLen:]
  401. var nread uint32
  402. nread, err = d.DeserializeCellBlocks(response, b)
  403. if err != nil {
  404. err = fmt.Errorf("failed to decode the response: %s", err)
  405. return
  406. }
  407. if int(nread) < len(b) {
  408. err = fmt.Errorf("short read: buffer len %d, read %d", len(b), nread)
  409. return
  410. }
  411. }
  412. } else {
  413. err = exceptionToError(*header.Exception.ExceptionClassName, *header.Exception.StackTrace)
  414. }
  415. return
  416. }
  417. func exceptionToError(class, stack string) error {
  418. err := fmt.Errorf("HBase Java exception %s:\n%s", class, stack)
  419. if _, ok := javaRetryableExceptions[class]; ok {
  420. return RetryableError{err}
  421. } else if _, ok := javaUnrecoverableExceptions[class]; ok {
  422. return UnrecoverableError{err}
  423. }
  424. return err
  425. }
  426. // write sends the given buffer to the RegionServer.
  427. func (c *client) write(buf []byte) error {
  428. _, err := c.conn.Write(buf)
  429. return err
  430. }
  431. // Tries to read enough data to fully fill up the given buffer.
  432. func (c *client) readFully(buf []byte) error {
  433. _, err := io.ReadFull(c.conn, buf)
  434. return err
  435. }
  436. // sendHello sends the "hello" message needed when opening a new connection.
  437. func (c *client) sendHello(ctype ClientType) error {
  438. connHeader := &pb.ConnectionHeader{
  439. UserInfo: &pb.UserInformation{
  440. EffectiveUser: proto.String(c.effectiveUser),
  441. },
  442. ServiceName: proto.String(string(ctype)),
  443. CellBlockCodecClass: proto.String("org.apache.hadoop.hbase.codec.KeyValueCodec"),
  444. }
  445. data, err := proto.Marshal(connHeader)
  446. if err != nil {
  447. return fmt.Errorf("failed to marshal connection header: %s", err)
  448. }
  449. const header = "HBas\x00\x50" // \x50 = Simple Auth.
  450. buf := make([]byte, 0, len(header)+4+len(data))
  451. buf = append(buf, header...)
  452. buf = buf[:len(header)+4]
  453. binary.BigEndian.PutUint32(buf[6:], uint32(len(data)))
  454. buf = append(buf, data...)
  455. return c.write(buf)
  456. }
  457. // send sends an RPC out to the wire.
  458. // Returns the response (for now, as the call is synchronous).
  459. func (c *client) send(rpc hrpc.Call) (uint32, error) {
  460. b := newBuffer(4)
  461. defer func() { freeBuffer(b) }()
  462. buf := proto.NewBuffer(b[4:])
  463. buf.Reset()
  464. request := rpc.ToProto()
  465. // we have to register rpc after we marhsal because
  466. // registered rpc can fail before it was even sent
  467. // in all the cases where c.fail() is called.
  468. // If that happens, client can retry sending the rpc
  469. // again potentially changing it's contents.
  470. id := c.registerRPC(rpc)
  471. header := &pb.RequestHeader{
  472. CallId: &id,
  473. MethodName: proto.String(rpc.Name()),
  474. RequestParam: proto.Bool(true),
  475. }
  476. if err := buf.EncodeMessage(header); err != nil {
  477. return id, fmt.Errorf("failed to marshal request header: %s", err)
  478. }
  479. if err := buf.EncodeMessage(request); err != nil {
  480. return id, fmt.Errorf("failed to marshal request: %s", err)
  481. }
  482. payload := buf.Bytes()
  483. binary.BigEndian.PutUint32(b, uint32(len(payload)))
  484. b = append(b[:4], payload...)
  485. if err := c.write(b); err != nil {
  486. return id, UnrecoverableError{err}
  487. }
  488. c.inFlightUp()
  489. return id, nil
  490. }