client.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. // Package zk encapsulates our interactions with ZooKeeper.
  6. package zk
  7. import (
  8. "encoding/binary"
  9. "fmt"
  10. "net"
  11. "path"
  12. "strings"
  13. "time"
  14. log "github.com/sirupsen/logrus"
  15. "github.com/golang/protobuf/proto"
  16. "github.com/samuel/go-zookeeper/zk"
  17. "github.com/tsuna/gohbase/pb"
  18. )
  19. type logger struct{}
  20. func (l *logger) Printf(format string, args ...interface{}) {
  21. log.Debugf(format, args...)
  22. }
  23. func init() {
  24. zk.DefaultLogger = &logger{}
  25. }
  26. // ResourceName is a type alias that is used to represent different resources
  27. // in ZooKeeper
  28. type ResourceName string
  29. // Prepend creates a new ResourceName with prefix prepended to the former ResourceName.
  30. func (r ResourceName) Prepend(prefix string) ResourceName {
  31. return ResourceName(path.Join(prefix, string(r)))
  32. }
  33. const (
  34. // Meta is a ResourceName that indicates that the location of the Meta
  35. // table is what will be fetched
  36. Meta = ResourceName("/meta-region-server")
  37. // Master is a ResourceName that indicates that the location of the Master
  38. // server is what will be fetched
  39. Master = ResourceName("/master")
  40. )
  41. // Client is an interface of client that retrieves meta infomation from zookeeper
  42. type Client interface {
  43. LocateResource(ResourceName) (string, error)
  44. }
  45. type client struct {
  46. zks []string
  47. sessionTimeout time.Duration
  48. }
  49. // NewClient establishes connection to zookeeper and returns the client
  50. func NewClient(zkquorum string, st time.Duration) Client {
  51. return &client{
  52. zks: strings.Split(zkquorum, ","),
  53. sessionTimeout: st,
  54. }
  55. }
  56. // LocateResource returns address of the server for the specified resource.
  57. func (c *client) LocateResource(resource ResourceName) (string, error) {
  58. conn, _, err := zk.Connect(c.zks, c.sessionTimeout)
  59. if err != nil {
  60. return "", fmt.Errorf("error connecting to ZooKeeper at %v: %s", c.zks, err)
  61. }
  62. defer conn.Close()
  63. buf, _, err := conn.Get(string(resource))
  64. if err != nil {
  65. return "", fmt.Errorf("failed to read the %s znode: %s", resource, err)
  66. }
  67. if len(buf) == 0 {
  68. log.Fatalf("%s was empty!", resource)
  69. } else if buf[0] != 0xFF {
  70. return "", fmt.Errorf("the first byte of %s was 0x%x, not 0xFF", resource, buf[0])
  71. }
  72. metadataLen := binary.BigEndian.Uint32(buf[1:])
  73. if metadataLen < 1 || metadataLen > 65000 {
  74. return "", fmt.Errorf("invalid metadata length for %s: %d", resource, metadataLen)
  75. }
  76. buf = buf[1+4+metadataLen:]
  77. magic := binary.BigEndian.Uint32(buf)
  78. const pbufMagic = 1346524486 // 4 bytes: "PBUF"
  79. if magic != pbufMagic {
  80. return "", fmt.Errorf("invalid magic number for %s: %d", resource, magic)
  81. }
  82. buf = buf[4:]
  83. var server *pb.ServerName
  84. if resource == Meta {
  85. meta := &pb.MetaRegionServer{}
  86. err = proto.UnmarshalMerge(buf, meta)
  87. if err != nil {
  88. return "",
  89. fmt.Errorf("failed to deserialize the MetaRegionServer entry from ZK: %s", err)
  90. }
  91. server = meta.Server
  92. } else {
  93. master := &pb.Master{}
  94. err = proto.UnmarshalMerge(buf, master)
  95. if err != nil {
  96. return "",
  97. fmt.Errorf("failed to deserialize the Master entry from ZK: %s", err)
  98. }
  99. server = master.Master
  100. }
  101. return net.JoinHostPort(*server.HostName, fmt.Sprint(*server.Port)), nil
  102. }