123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- // Copyright (C) 2017 The GoHBase Authors. All rights reserved.
- // This file is part of GoHBase.
- // Use of this source code is governed by the Apache License 2.0
- // that can be found in the COPYING file.
- package region
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "github.com/golang/protobuf/proto"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/pb"
- )
- var multiPool = sync.Pool{
- New: func() interface{} {
- return &multi{}
- },
- }
- func freeMulti(m *multi) {
- m.calls = m.calls[:0]
- m.regions = m.regions[:0]
- m.size = 0
- multiPool.Put(m)
- }
- type multi struct {
- size int
- calls []hrpc.Call
- // regions preserves the order of regions to match against RegionActionResults
- regions []hrpc.RegionInfo
- }
- func newMulti(queueSize int) *multi {
- m := multiPool.Get().(*multi)
- m.size = queueSize
- return m
- }
- // Name returns the name of this RPC call.
- func (m *multi) Name() string {
- return "Multi"
- }
- // ToProto converts all request in multi batch to a protobuf message.
- func (m *multi) ToProto() proto.Message {
- // aggregate calls per region
- actionsPerReg := map[hrpc.RegionInfo][]*pb.Action{}
- for i, c := range m.calls {
- select {
- case <-c.Context().Done():
- // context has expired, don't bother sending it
- m.calls[i] = nil
- continue
- default:
- }
- msg := c.ToProto()
- a := &pb.Action{
- Index: proto.Uint32(uint32(i) + 1), // +1 because 0 index means there's no index
- }
- switch r := msg.(type) {
- case *pb.GetRequest:
- a.Get = r.Get
- case *pb.MutateRequest:
- a.Mutation = r.Mutation
- default:
- panic(fmt.Sprintf("unsupported call type for Multi: %T", c))
- }
- actionsPerReg[c.Region()] = append(actionsPerReg[c.Region()], a)
- }
- // construct the multi proto
- ra := make([]*pb.RegionAction, len(actionsPerReg))
- m.regions = make([]hrpc.RegionInfo, len(actionsPerReg))
- i := 0
- for r, as := range actionsPerReg {
- ra[i] = &pb.RegionAction{
- Region: &pb.RegionSpecifier{
- Type: pb.RegionSpecifier_REGION_NAME.Enum(),
- Value: r.Name(),
- },
- Action: as,
- }
- // Track the order of RegionActions,
- // so that we can handle whole region exceptions.
- m.regions[i] = r
- i++
- }
- return &pb.MultiRequest{RegionAction: ra}
- }
- // NewResponse creates an empty protobuf message to read the response of this RPC.
- func (m *multi) NewResponse() proto.Message {
- return &pb.MultiResponse{}
- }
- // DeserializeCellBlocks deserializes action results from cell blocks.
- func (m *multi) DeserializeCellBlocks(msg proto.Message, b []byte) (uint32, error) {
- mr := msg.(*pb.MultiResponse)
- var nread uint32
- for _, rar := range mr.GetRegionActionResult() {
- if e := rar.GetException(); e != nil {
- if l := len(rar.GetResultOrException()); l != 0 {
- return 0, fmt.Errorf(
- "got exception for region, but still have %d result(s) returned from it", l)
- }
- continue
- }
- for _, roe := range rar.GetResultOrException() {
- e := roe.GetException()
- r := roe.GetResult()
- i := roe.GetIndex()
- if i == 0 {
- return 0, errors.New("no index for result in multi response")
- } else if r == nil && e == nil {
- return 0, errors.New("no result or exception for action in multi response")
- } else if r != nil && e != nil {
- return 0, errors.New("got result and exception for action in multi response")
- } else if e != nil {
- continue
- }
- c := m.get(i) // TODO: maybe return error if it's out-of-bounds
- d := c.(canDeserializeCellBlocks) // let it panic, because then it's our bug
- response := c.NewResponse()
- switch rsp := response.(type) {
- case *pb.GetResponse:
- rsp.Result = r
- case *pb.MutateResponse:
- rsp.Result = r
- default:
- panic(fmt.Sprintf("unsupported response type for Multi: %T", response))
- }
- // TODO: don't bother deserializing if the call's context has already expired
- n, err := d.DeserializeCellBlocks(response, b[nread:])
- if err != nil {
- return 0, fmt.Errorf(
- "error deserializing cellblocks for %q call as part of MultiResponse: %v",
- c.Name(), err)
- }
- nread += n
- }
- }
- return nread, nil
- }
- func (m *multi) returnResults(msg proto.Message, err error) {
- defer freeMulti(m)
- if err != nil {
- for _, c := range m.calls {
- if c == nil {
- continue
- }
- c.ResultChan() <- hrpc.RPCResult{Error: err}
- }
- return
- }
- mr := msg.(*pb.MultiResponse)
- // Here we can assume that everything has been deserialized correctly.
- // Dispatch results to appropriate calls.
- for i, rar := range mr.GetRegionActionResult() {
- if e := rar.GetException(); e != nil {
- // Got an exception for the whole region,
- // fail all the calls for that region.
- reg := m.regions[i]
- err := exceptionToError(*e.Name, string(e.Value))
- for _, c := range m.calls {
- if c == nil {
- continue
- }
- if c.Region() == reg {
- c.ResultChan() <- hrpc.RPCResult{Error: err}
- }
- }
- continue
- }
- for _, roe := range rar.GetResultOrException() {
- i := roe.GetIndex()
- e := roe.GetException()
- r := roe.GetResult()
- c := m.get(i)
- // TODO: don't bother if the call's context has already expired
- if e != nil {
- c.ResultChan() <- hrpc.RPCResult{
- Error: exceptionToError(*e.Name, string(e.Value)),
- }
- continue
- }
- response := c.NewResponse()
- switch rsp := response.(type) {
- case *pb.GetResponse:
- rsp.Result = r
- case *pb.MutateResponse:
- rsp.Result = r
- default:
- panic(fmt.Sprintf("unsupported response type for Multi: %T", response))
- }
- c.ResultChan() <- hrpc.RPCResult{Msg: response}
- }
- }
- }
- // add adds the call and returns wether the batch is full.
- func (m *multi) add(call hrpc.Call) bool {
- m.calls = append(m.calls, call)
- return len(m.calls) == m.size
- }
- // len returns number of batched calls.
- func (m *multi) len() int {
- return len(m.calls)
- }
- // get retruns an rpc at index. Indicies start from 1 since 0 means that
- // region server didn't set an index for the action result.
- func (m *multi) get(i uint32) hrpc.Call {
- if i == 0 {
- panic("index cannot be 0")
- }
- return m.calls[i-1]
- }
- // Table is not supported for Multi.
- func (m *multi) Table() []byte {
- panic("'Table' is not supported for 'Multi'")
- }
- // Reqion is not supported for Multi.
- func (m *multi) Region() hrpc.RegionInfo {
- panic("'Region' is not supported for 'Multi'")
- }
- // SetRegion is not supported for Multi.
- func (m *multi) SetRegion(r hrpc.RegionInfo) {
- panic("'SetRegion' is not supported for 'Multi'")
- }
- // ResultChan is not supported for Multi.
- func (m *multi) ResultChan() chan hrpc.RPCResult {
- panic("'ResultChan' is not supported for 'Multi'")
- }
- // Context is not supported for Multi.
- func (m *multi) Context() context.Context {
- // TODO: maybe pick the one with the longest deadline and use a context that has that deadline?
- return context.Background()
- }
- // Key is not supported for Multi RPC.
- func (m *multi) Key() []byte {
- panic("'Key' is not supported for 'Multi'")
- }
|