multi.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. // Copyright (C) 2017 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package region
  6. import (
  7. "context"
  8. "errors"
  9. "fmt"
  10. "sync"
  11. "github.com/golang/protobuf/proto"
  12. "github.com/tsuna/gohbase/hrpc"
  13. "github.com/tsuna/gohbase/pb"
  14. )
  15. var multiPool = sync.Pool{
  16. New: func() interface{} {
  17. return &multi{}
  18. },
  19. }
  20. func freeMulti(m *multi) {
  21. m.calls = m.calls[:0]
  22. m.regions = m.regions[:0]
  23. m.size = 0
  24. multiPool.Put(m)
  25. }
  26. type multi struct {
  27. size int
  28. calls []hrpc.Call
  29. // regions preserves the order of regions to match against RegionActionResults
  30. regions []hrpc.RegionInfo
  31. }
  32. func newMulti(queueSize int) *multi {
  33. m := multiPool.Get().(*multi)
  34. m.size = queueSize
  35. return m
  36. }
  37. // Name returns the name of this RPC call.
  38. func (m *multi) Name() string {
  39. return "Multi"
  40. }
  41. // ToProto converts all request in multi batch to a protobuf message.
  42. func (m *multi) ToProto() proto.Message {
  43. // aggregate calls per region
  44. actionsPerReg := map[hrpc.RegionInfo][]*pb.Action{}
  45. for i, c := range m.calls {
  46. select {
  47. case <-c.Context().Done():
  48. // context has expired, don't bother sending it
  49. m.calls[i] = nil
  50. continue
  51. default:
  52. }
  53. msg := c.ToProto()
  54. a := &pb.Action{
  55. Index: proto.Uint32(uint32(i) + 1), // +1 because 0 index means there's no index
  56. }
  57. switch r := msg.(type) {
  58. case *pb.GetRequest:
  59. a.Get = r.Get
  60. case *pb.MutateRequest:
  61. a.Mutation = r.Mutation
  62. default:
  63. panic(fmt.Sprintf("unsupported call type for Multi: %T", c))
  64. }
  65. actionsPerReg[c.Region()] = append(actionsPerReg[c.Region()], a)
  66. }
  67. // construct the multi proto
  68. ra := make([]*pb.RegionAction, len(actionsPerReg))
  69. m.regions = make([]hrpc.RegionInfo, len(actionsPerReg))
  70. i := 0
  71. for r, as := range actionsPerReg {
  72. ra[i] = &pb.RegionAction{
  73. Region: &pb.RegionSpecifier{
  74. Type: pb.RegionSpecifier_REGION_NAME.Enum(),
  75. Value: r.Name(),
  76. },
  77. Action: as,
  78. }
  79. // Track the order of RegionActions,
  80. // so that we can handle whole region exceptions.
  81. m.regions[i] = r
  82. i++
  83. }
  84. return &pb.MultiRequest{RegionAction: ra}
  85. }
  86. // NewResponse creates an empty protobuf message to read the response of this RPC.
  87. func (m *multi) NewResponse() proto.Message {
  88. return &pb.MultiResponse{}
  89. }
  90. // DeserializeCellBlocks deserializes action results from cell blocks.
  91. func (m *multi) DeserializeCellBlocks(msg proto.Message, b []byte) (uint32, error) {
  92. mr := msg.(*pb.MultiResponse)
  93. var nread uint32
  94. for _, rar := range mr.GetRegionActionResult() {
  95. if e := rar.GetException(); e != nil {
  96. if l := len(rar.GetResultOrException()); l != 0 {
  97. return 0, fmt.Errorf(
  98. "got exception for region, but still have %d result(s) returned from it", l)
  99. }
  100. continue
  101. }
  102. for _, roe := range rar.GetResultOrException() {
  103. e := roe.GetException()
  104. r := roe.GetResult()
  105. i := roe.GetIndex()
  106. if i == 0 {
  107. return 0, errors.New("no index for result in multi response")
  108. } else if r == nil && e == nil {
  109. return 0, errors.New("no result or exception for action in multi response")
  110. } else if r != nil && e != nil {
  111. return 0, errors.New("got result and exception for action in multi response")
  112. } else if e != nil {
  113. continue
  114. }
  115. c := m.get(i) // TODO: maybe return error if it's out-of-bounds
  116. d := c.(canDeserializeCellBlocks) // let it panic, because then it's our bug
  117. response := c.NewResponse()
  118. switch rsp := response.(type) {
  119. case *pb.GetResponse:
  120. rsp.Result = r
  121. case *pb.MutateResponse:
  122. rsp.Result = r
  123. default:
  124. panic(fmt.Sprintf("unsupported response type for Multi: %T", response))
  125. }
  126. // TODO: don't bother deserializing if the call's context has already expired
  127. n, err := d.DeserializeCellBlocks(response, b[nread:])
  128. if err != nil {
  129. return 0, fmt.Errorf(
  130. "error deserializing cellblocks for %q call as part of MultiResponse: %v",
  131. c.Name(), err)
  132. }
  133. nread += n
  134. }
  135. }
  136. return nread, nil
  137. }
  138. func (m *multi) returnResults(msg proto.Message, err error) {
  139. defer freeMulti(m)
  140. if err != nil {
  141. for _, c := range m.calls {
  142. if c == nil {
  143. continue
  144. }
  145. c.ResultChan() <- hrpc.RPCResult{Error: err}
  146. }
  147. return
  148. }
  149. mr := msg.(*pb.MultiResponse)
  150. // Here we can assume that everything has been deserialized correctly.
  151. // Dispatch results to appropriate calls.
  152. for i, rar := range mr.GetRegionActionResult() {
  153. if e := rar.GetException(); e != nil {
  154. // Got an exception for the whole region,
  155. // fail all the calls for that region.
  156. reg := m.regions[i]
  157. err := exceptionToError(*e.Name, string(e.Value))
  158. for _, c := range m.calls {
  159. if c == nil {
  160. continue
  161. }
  162. if c.Region() == reg {
  163. c.ResultChan() <- hrpc.RPCResult{Error: err}
  164. }
  165. }
  166. continue
  167. }
  168. for _, roe := range rar.GetResultOrException() {
  169. i := roe.GetIndex()
  170. e := roe.GetException()
  171. r := roe.GetResult()
  172. c := m.get(i)
  173. // TODO: don't bother if the call's context has already expired
  174. if e != nil {
  175. c.ResultChan() <- hrpc.RPCResult{
  176. Error: exceptionToError(*e.Name, string(e.Value)),
  177. }
  178. continue
  179. }
  180. response := c.NewResponse()
  181. switch rsp := response.(type) {
  182. case *pb.GetResponse:
  183. rsp.Result = r
  184. case *pb.MutateResponse:
  185. rsp.Result = r
  186. default:
  187. panic(fmt.Sprintf("unsupported response type for Multi: %T", response))
  188. }
  189. c.ResultChan() <- hrpc.RPCResult{Msg: response}
  190. }
  191. }
  192. }
  193. // add adds the call and returns wether the batch is full.
  194. func (m *multi) add(call hrpc.Call) bool {
  195. m.calls = append(m.calls, call)
  196. return len(m.calls) == m.size
  197. }
  198. // len returns number of batched calls.
  199. func (m *multi) len() int {
  200. return len(m.calls)
  201. }
  202. // get retruns an rpc at index. Indicies start from 1 since 0 means that
  203. // region server didn't set an index for the action result.
  204. func (m *multi) get(i uint32) hrpc.Call {
  205. if i == 0 {
  206. panic("index cannot be 0")
  207. }
  208. return m.calls[i-1]
  209. }
  210. // Table is not supported for Multi.
  211. func (m *multi) Table() []byte {
  212. panic("'Table' is not supported for 'Multi'")
  213. }
  214. // Reqion is not supported for Multi.
  215. func (m *multi) Region() hrpc.RegionInfo {
  216. panic("'Region' is not supported for 'Multi'")
  217. }
  218. // SetRegion is not supported for Multi.
  219. func (m *multi) SetRegion(r hrpc.RegionInfo) {
  220. panic("'SetRegion' is not supported for 'Multi'")
  221. }
  222. // ResultChan is not supported for Multi.
  223. func (m *multi) ResultChan() chan hrpc.RPCResult {
  224. panic("'ResultChan' is not supported for 'Multi'")
  225. }
  226. // Context is not supported for Multi.
  227. func (m *multi) Context() context.Context {
  228. // TODO: maybe pick the one with the longest deadline and use a context that has that deadline?
  229. return context.Background()
  230. }
  231. // Key is not supported for Multi RPC.
  232. func (m *multi) Key() []byte {
  233. panic("'Key' is not supported for 'Multi'")
  234. }