admin_client.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package gohbase
  6. import (
  7. "context"
  8. "errors"
  9. "fmt"
  10. log "github.com/sirupsen/logrus"
  11. "github.com/tsuna/gohbase/hrpc"
  12. "github.com/tsuna/gohbase/pb"
  13. "github.com/tsuna/gohbase/region"
  14. "github.com/tsuna/gohbase/zk"
  15. )
  16. // AdminClient to perform admistrative operations with HMaster
  17. type AdminClient interface {
  18. CreateTable(t *hrpc.CreateTable) error
  19. DeleteTable(t *hrpc.DeleteTable) error
  20. EnableTable(t *hrpc.EnableTable) error
  21. DisableTable(t *hrpc.DisableTable) error
  22. ClusterStatus() (*pb.ClusterStatus, error)
  23. }
  24. // NewAdminClient creates an admin HBase client.
  25. func NewAdminClient(zkquorum string, options ...Option) AdminClient {
  26. return newAdminClient(zkquorum, options...)
  27. }
  28. func newAdminClient(zkquorum string, options ...Option) AdminClient {
  29. log.WithFields(log.Fields{
  30. "Host": zkquorum,
  31. }).Debug("Creating new admin client.")
  32. c := &client{
  33. clientType: adminClient,
  34. rpcQueueSize: defaultRPCQueueSize,
  35. flushInterval: defaultFlushInterval,
  36. // empty region in order to be able to set client to it
  37. adminRegionInfo: region.NewInfo(0, nil, nil, nil, nil, nil),
  38. zkTimeout: defaultZkTimeout,
  39. zkRoot: defaultZkRoot,
  40. effectiveUser: defaultEffectiveUser,
  41. regionLookupTimeout: region.DefaultLookupTimeout,
  42. regionReadTimeout: region.DefaultReadTimeout,
  43. }
  44. for _, option := range options {
  45. option(c)
  46. }
  47. c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
  48. return c
  49. }
  50. //Get the status of the cluster
  51. func (c *client) ClusterStatus() (*pb.ClusterStatus, error) {
  52. pbmsg, err := c.SendRPC(hrpc.NewClusterStatus())
  53. if err != nil {
  54. return nil, err
  55. }
  56. r, ok := pbmsg.(*pb.GetClusterStatusResponse)
  57. if !ok {
  58. return nil, fmt.Errorf("sendRPC returned not a ClusterStatusResponse")
  59. }
  60. return r.GetClusterStatus(), nil
  61. }
  62. func (c *client) CreateTable(t *hrpc.CreateTable) error {
  63. pbmsg, err := c.SendRPC(t)
  64. if err != nil {
  65. return err
  66. }
  67. r, ok := pbmsg.(*pb.CreateTableResponse)
  68. if !ok {
  69. return fmt.Errorf("sendRPC returned not a CreateTableResponse")
  70. }
  71. return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
  72. }
  73. func (c *client) DeleteTable(t *hrpc.DeleteTable) error {
  74. pbmsg, err := c.SendRPC(t)
  75. if err != nil {
  76. return err
  77. }
  78. r, ok := pbmsg.(*pb.DeleteTableResponse)
  79. if !ok {
  80. return fmt.Errorf("sendRPC returned not a DeleteTableResponse")
  81. }
  82. return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
  83. }
  84. func (c *client) EnableTable(t *hrpc.EnableTable) error {
  85. pbmsg, err := c.SendRPC(t)
  86. if err != nil {
  87. return err
  88. }
  89. r, ok := pbmsg.(*pb.EnableTableResponse)
  90. if !ok {
  91. return fmt.Errorf("sendRPC returned not a EnableTableResponse")
  92. }
  93. return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
  94. }
  95. func (c *client) DisableTable(t *hrpc.DisableTable) error {
  96. pbmsg, err := c.SendRPC(t)
  97. if err != nil {
  98. return err
  99. }
  100. r, ok := pbmsg.(*pb.DisableTableResponse)
  101. if !ok {
  102. return fmt.Errorf("sendRPC returned not a DisableTableResponse")
  103. }
  104. return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
  105. }
  106. func (c *client) checkProcedureWithBackoff(ctx context.Context, procID uint64) error {
  107. backoff := backoffStart
  108. for {
  109. pbmsg, err := c.SendRPC(hrpc.NewGetProcedureState(ctx, procID))
  110. if err != nil {
  111. return err
  112. }
  113. res := pbmsg.(*pb.GetProcedureResultResponse)
  114. switch res.GetState() {
  115. case pb.GetProcedureResultResponse_NOT_FOUND:
  116. return fmt.Errorf("procedure not found")
  117. case pb.GetProcedureResultResponse_FINISHED:
  118. if fe := res.Exception; fe != nil {
  119. ge := fe.GenericException
  120. if ge == nil {
  121. return errors.New("got unexpected empty exception")
  122. }
  123. return fmt.Errorf("procedure exception: %s: %s", ge.GetClassName(), ge.GetMessage())
  124. }
  125. return nil
  126. default:
  127. backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
  128. if err != nil {
  129. return err
  130. }
  131. }
  132. }
  133. }