new.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. // +build !testing
  6. package region
  7. import (
  8. "context"
  9. "fmt"
  10. "net"
  11. "time"
  12. "github.com/tsuna/gohbase/hrpc"
  13. )
  14. // NewClient creates a new RegionClient.
  15. func NewClient(ctx context.Context, addr string, ctype ClientType,
  16. queueSize int, flushInterval time.Duration, effectiveUser string,
  17. readTimeout time.Duration) (hrpc.RegionClient, error) {
  18. var d net.Dialer
  19. conn, err := d.DialContext(ctx, "tcp", addr)
  20. if err != nil {
  21. return nil, fmt.Errorf("failed to connect to the RegionServer at %s: %s", addr, err)
  22. }
  23. c := &client{
  24. addr: addr,
  25. conn: conn,
  26. rpcs: make(chan hrpc.Call),
  27. done: make(chan struct{}),
  28. sent: make(map[uint32]hrpc.Call),
  29. rpcQueueSize: queueSize,
  30. flushInterval: flushInterval,
  31. effectiveUser: effectiveUser,
  32. readTimeout: readTimeout,
  33. }
  34. // time out send hello if it take long
  35. // TODO: do we even need to bother, we are going to retry anyway?
  36. if deadline, ok := ctx.Deadline(); ok {
  37. conn.SetWriteDeadline(deadline)
  38. }
  39. if err := c.sendHello(ctype); err != nil {
  40. conn.Close()
  41. return nil, fmt.Errorf("failed to send hello to the RegionServer at %s: %s", addr, err)
  42. }
  43. // reset write deadline
  44. conn.SetWriteDeadline(time.Time{})
  45. if ctype == RegionClient {
  46. go c.processRPCs() // Batching goroutine
  47. }
  48. go c.receiveRPCs() // Reader goroutine
  49. return c, nil
  50. }