test_new.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. // +build testing
  6. package region
  7. import (
  8. "bytes"
  9. "context"
  10. "fmt"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/golang/protobuf/proto"
  15. "github.com/tsuna/gohbase/hrpc"
  16. "github.com/tsuna/gohbase/pb"
  17. )
  18. type testClient struct {
  19. addr string
  20. numNSRE int32
  21. }
  22. var nsreRegion = &pb.Result{Cell: []*pb.Cell{
  23. &pb.Cell{
  24. Row: []byte("nsre,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
  25. Family: []byte("info"),
  26. Qualifier: []byte("regioninfo"),
  27. Value: []byte("PBUF\b\xc4\xcd\xe9\x99\xe0)\x12\x0f\n\adefault\x12\x04nsre" +
  28. "\x1a\x00\"\x00(\x000\x008\x00"),
  29. },
  30. &pb.Cell{
  31. Row: []byte("nsre,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
  32. Family: []byte("info"),
  33. Qualifier: []byte("seqnumDuringOpen"),
  34. Value: []byte("\x00\x00\x00\x00\x00\x00\x00\x02"),
  35. },
  36. &pb.Cell{
  37. Row: []byte("nsre,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
  38. Family: []byte("info"),
  39. Qualifier: []byte("server"),
  40. Value: []byte("regionserver:1"),
  41. },
  42. &pb.Cell{
  43. Row: []byte("nsre,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
  44. Family: []byte("info"),
  45. Qualifier: []byte("serverstartcode"),
  46. Value: []byte("\x00\x00\x01N\x02\x92R\xb1"),
  47. },
  48. }}
  49. // makeRegionResult returns a region that spans the whole table
  50. // and uses name of the table as the hostname of the regionserver
  51. func makeRegionResult(key []byte) *pb.ScanResponse {
  52. s := bytes.SplitN(key, []byte(","), 2)
  53. fqtable := s[0]
  54. row := append(fqtable, []byte(",,1434573235908.56f833d5569a27c7a43fbf547b4924a4.")...)
  55. t := bytes.SplitN(fqtable, []byte{':'}, 2)
  56. var namespace, table []byte
  57. if len(t) == 2 {
  58. namespace = t[0]
  59. table = t[1]
  60. } else {
  61. namespace = []byte("default")
  62. table = fqtable
  63. }
  64. regionInfo := &pb.RegionInfo{
  65. RegionId: proto.Uint64(1434573235908),
  66. TableName: &pb.TableName{
  67. Namespace: namespace,
  68. Qualifier: table,
  69. },
  70. Offline: proto.Bool(false),
  71. }
  72. regionInfoValue, err := proto.Marshal(regionInfo)
  73. if err != nil {
  74. panic(err)
  75. }
  76. regionInfoValue = append([]byte("PBUF"), regionInfoValue...)
  77. return &pb.ScanResponse{Results: []*pb.Result{
  78. &pb.Result{Cell: []*pb.Cell{
  79. &pb.Cell{
  80. Row: row,
  81. Family: []byte("info"),
  82. Qualifier: []byte("regioninfo"),
  83. Value: regionInfoValue,
  84. },
  85. &pb.Cell{
  86. Row: row,
  87. Family: []byte("info"),
  88. Qualifier: []byte("seqnumDuringOpen"),
  89. Value: []byte("\x00\x00\x00\x00\x00\x00\x00\x02"),
  90. },
  91. &pb.Cell{
  92. Row: row,
  93. Family: []byte("info"),
  94. Qualifier: []byte("server"),
  95. Value: fqtable,
  96. },
  97. &pb.Cell{
  98. Row: row,
  99. Family: []byte("info"),
  100. Qualifier: []byte("serverstartcode"),
  101. Value: []byte("\x00\x00\x01N\x02\x92R\xb1"),
  102. },
  103. }}}}
  104. }
  105. var metaRow = &pb.Result{Cell: []*pb.Cell{
  106. &pb.Cell{
  107. Row: []byte("test,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
  108. Family: []byte("info"),
  109. Qualifier: []byte("regioninfo"),
  110. Value: []byte("PBUF\b\xc4\xcd\xe9\x99\xe0)\x12\x0f\n\adefault\x12\x04test" +
  111. "\x1a\x00\"\x00(\x000\x008\x00"),
  112. },
  113. &pb.Cell{
  114. Row: []byte("test,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
  115. Family: []byte("info"),
  116. Qualifier: []byte("seqnumDuringOpen"),
  117. Value: []byte("\x00\x00\x00\x00\x00\x00\x00\x02"),
  118. },
  119. &pb.Cell{
  120. Row: []byte("test,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
  121. Family: []byte("info"),
  122. Qualifier: []byte("server"),
  123. Value: []byte("regionserver:2"),
  124. },
  125. &pb.Cell{
  126. Row: []byte("test,,1434573235908.56f833d5569a27c7a43fbf547b4924a4."),
  127. Family: []byte("info"),
  128. Qualifier: []byte("serverstartcode"),
  129. Value: []byte("\x00\x00\x01N\x02\x92R\xb1"),
  130. },
  131. }}
  132. var test1SplitA = &pb.Result{Cell: []*pb.Cell{
  133. &pb.Cell{
  134. Row: []byte("test1,,1480547738107.825c5c7e480c76b73d6d2bad5d3f7bb8."),
  135. Family: []byte("info"),
  136. Qualifier: []byte("regioninfo"),
  137. Value: []byte("PBUF\b\xfbÖ\xbc\x8b+\x12\x10\n\adefault\x12\x05" +
  138. "test1\x1a\x00\"\x03baz(\x000\x008\x00"),
  139. },
  140. &pb.Cell{
  141. Row: []byte("test1,,1480547738107.825c5c7e480c76b73d6d2bad5d3f7bb8."),
  142. Family: []byte("info"),
  143. Qualifier: []byte("seqnumDuringOpen"),
  144. Value: []byte("\x00\x00\x00\x00\x00\x00\x00\v"),
  145. },
  146. &pb.Cell{
  147. Row: []byte("test1,,1480547738107.825c5c7e480c76b73d6d2bad5d3f7bb8."),
  148. Family: []byte("info"),
  149. Qualifier: []byte("server"),
  150. Value: []byte("regionserver:1"),
  151. },
  152. &pb.Cell{
  153. Row: []byte("test1,,1480547738107.825c5c7e480c76b73d6d2bad5d3f7bb8."),
  154. Family: []byte("info"),
  155. Qualifier: []byte("serverstartcode"),
  156. Value: []byte("\x00\x00\x01X\xb6\x83^3"),
  157. },
  158. }}
  159. var test1SplitB = &pb.Result{Cell: []*pb.Cell{
  160. &pb.Cell{
  161. Row: []byte("test1,baz,1480547738107.3f2483f5618e1b791f58f83a8ebba6a9."),
  162. Family: []byte("info"),
  163. Qualifier: []byte("regioninfo"),
  164. Value: []byte("PBUF\b\xfbÖ\xbc\x8b+\x12\x10\n\adefault\x12\x05" +
  165. "test1\x1a\x03baz\"\x00(\x000\x008\x00"),
  166. },
  167. &pb.Cell{
  168. Row: []byte("test1,baz,1480547738107.3f2483f5618e1b791f58f83a8ebba6a9."),
  169. Family: []byte("info"),
  170. Qualifier: []byte("seqnumDuringOpen"),
  171. Value: []byte("\x00\x00\x00\x00\x00\x00\x00\f"),
  172. },
  173. &pb.Cell{
  174. Row: []byte("test1,baz,1480547738107.3f2483f5618e1b791f58f83a8ebba6a9."),
  175. Family: []byte("info"),
  176. Qualifier: []byte("server"),
  177. Value: []byte("regionserver:3"),
  178. },
  179. &pb.Cell{
  180. Row: []byte("test1,baz,1480547738107.3f2483f5618e1b791f58f83a8ebba6a9."),
  181. Family: []byte("info"),
  182. Qualifier: []byte("serverstartcode"),
  183. Value: []byte("\x00\x00\x01X\xb6\x83^3"),
  184. },
  185. }}
  186. var m sync.RWMutex
  187. var clients map[string]uint32
  188. func init() {
  189. clients = make(map[string]uint32)
  190. }
  191. // NewClient creates a new test region client.
  192. func NewClient(ctx context.Context, addr string, ctype ClientType,
  193. queueSize int, flushInterval time.Duration, effectiveUser string,
  194. readTimeout time.Duration) (hrpc.RegionClient, error) {
  195. m.Lock()
  196. clients[addr]++
  197. m.Unlock()
  198. return &testClient{addr: addr}, nil
  199. }
  200. func (c *testClient) Addr() string {
  201. return c.addr
  202. }
  203. func (c *testClient) String() string {
  204. return fmt.Sprintf("RegionClient{Addr: %s}", c.addr)
  205. }
  206. func (c *testClient) QueueRPC(call hrpc.Call) {
  207. // ignore timed out rpcs to mock the region client
  208. select {
  209. case <-call.Context().Done():
  210. return
  211. default:
  212. }
  213. if !bytes.Equal(call.Table(), []byte("hbase:meta")) {
  214. _, ok := call.(*hrpc.Get)
  215. if !ok || !bytes.HasSuffix(call.Key(), bytes.Repeat([]byte{0}, 17)) {
  216. // not a get and not a region probe
  217. // just return as the mock call should just populate the ResultChan in test
  218. return
  219. }
  220. // region probe, fail for the nsre region 3 times to force retry
  221. if bytes.Equal(call.Table(), []byte("nsre")) {
  222. i := atomic.AddInt32(&c.numNSRE, 1)
  223. if i <= 3 {
  224. call.ResultChan() <- hrpc.RPCResult{Error: RetryableError{}}
  225. return
  226. }
  227. }
  228. m.RLock()
  229. i := clients[c.addr]
  230. m.RUnlock()
  231. // if we are connected to this client the first time,
  232. // pretend it's down to fail the probe and start a reconnect
  233. if bytes.Equal(call.Table(), []byte("down")) {
  234. if i <= 1 {
  235. call.ResultChan() <- hrpc.RPCResult{Error: UnrecoverableError{}}
  236. } else {
  237. // otherwise, the region is fine
  238. call.ResultChan() <- hrpc.RPCResult{}
  239. }
  240. return
  241. }
  242. }
  243. if bytes.HasSuffix(call.Key(), bytes.Repeat([]byte{0}, 17)) {
  244. // meta region probe, return empty to signify that region is online
  245. call.ResultChan() <- hrpc.RPCResult{}
  246. } else if bytes.HasPrefix(call.Key(), []byte("test,")) {
  247. call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
  248. Results: []*pb.Result{metaRow}}}
  249. } else if bytes.HasPrefix(call.Key(), []byte("test1,,")) {
  250. call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
  251. Results: []*pb.Result{test1SplitA}}}
  252. } else if bytes.HasPrefix(call.Key(), []byte("nsre,,")) {
  253. call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
  254. Results: []*pb.Result{nsreRegion}}}
  255. } else if bytes.HasPrefix(call.Key(), []byte("tablenotfound,")) {
  256. call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
  257. Results: []*pb.Result{},
  258. MoreResults: proto.Bool(false),
  259. }}
  260. } else {
  261. call.ResultChan() <- hrpc.RPCResult{Msg: makeRegionResult(call.Key())}
  262. }
  263. }
  264. func (c *testClient) Close() {}