mutate.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package hrpc
  6. import (
  7. "context"
  8. "encoding/binary"
  9. "errors"
  10. "time"
  11. "github.com/golang/protobuf/proto"
  12. "github.com/tsuna/gohbase/pb"
  13. )
  14. var (
  15. // ErrNotAStruct is returned by any of the *Ref functions when something
  16. // other than a struct is passed in to their data argument
  17. ErrNotAStruct = errors.New("data must be a struct")
  18. // ErrUnsupportedUints is returned when this message is serialized and uints
  19. // are unsupported on your platform (this will probably never happen)
  20. ErrUnsupportedUints = errors.New("uints are unsupported on your platform")
  21. // ErrUnsupportedInts is returned when this message is serialized and ints
  22. // are unsupported on your platform (this will probably never happen)
  23. ErrUnsupportedInts = errors.New("ints are unsupported on your platform")
  24. attributeNameTTL = "_ttl"
  25. )
  26. // DurabilityType is used to set durability for Durability option
  27. type DurabilityType int32
  28. const (
  29. // UseDefault is USER_DEFAULT
  30. UseDefault DurabilityType = iota
  31. // SkipWal is SKIP_WAL
  32. SkipWal
  33. // AsyncWal is ASYNC_WAL
  34. AsyncWal
  35. // SyncWal is SYNC_WAL
  36. SyncWal
  37. // FsyncWal is FSYNC_WAL
  38. FsyncWal
  39. )
  40. // Mutate represents a mutation on HBase.
  41. type Mutate struct {
  42. base
  43. mutationType pb.MutationProto_MutationType //*int32
  44. // values is a map of column families to a map of column qualifiers to bytes
  45. values map[string]map[string][]byte
  46. ttl []byte
  47. timestamp uint64
  48. durability DurabilityType
  49. deleteOneVersion bool
  50. skipbatch bool
  51. }
  52. // TTL sets a time-to-live for mutation queries.
  53. // The value will be in millisecond resolution.
  54. func TTL(t time.Duration) func(Call) error {
  55. return func(o Call) error {
  56. m, ok := o.(*Mutate)
  57. if !ok {
  58. return errors.New("'TTL' option can only be used with mutation queries")
  59. }
  60. buf := make([]byte, 8)
  61. binary.BigEndian.PutUint64(buf, uint64(t.Nanoseconds()/1e6))
  62. m.ttl = buf
  63. return nil
  64. }
  65. }
  66. // Timestamp sets timestamp for mutation queries.
  67. // The time object passed will be rounded to a millisecond resolution, as by default,
  68. // if no timestamp is provided, HBase sets it to current time in milliseconds.
  69. // In order to have custom time precision, use TimestampUint64 call option for
  70. // mutation requests and corresponding TimeRangeUint64 for retrieval requests.
  71. func Timestamp(ts time.Time) func(Call) error {
  72. return func(o Call) error {
  73. m, ok := o.(*Mutate)
  74. if !ok {
  75. return errors.New("'Timestamp' option can only be used with mutation queries")
  76. }
  77. m.timestamp = uint64(ts.UnixNano() / 1e6)
  78. return nil
  79. }
  80. }
  81. // TimestampUint64 sets timestamp for mutation queries.
  82. func TimestampUint64(ts uint64) func(Call) error {
  83. return func(o Call) error {
  84. m, ok := o.(*Mutate)
  85. if !ok {
  86. return errors.New("'TimestampUint64' option can only be used with mutation queries")
  87. }
  88. m.timestamp = ts
  89. return nil
  90. }
  91. }
  92. // Durability sets durability for mutation queries.
  93. func Durability(d DurabilityType) func(Call) error {
  94. return func(o Call) error {
  95. m, ok := o.(*Mutate)
  96. if !ok {
  97. return errors.New("'Durability' option can only be used with mutation queries")
  98. }
  99. if d < UseDefault || d > FsyncWal {
  100. return errors.New("invalid durability value")
  101. }
  102. m.durability = d
  103. return nil
  104. }
  105. }
  106. // DeleteOneVersion is a delete option that can be passed in order to delete only
  107. // one latest version of the specified qualifiers. Without timestamp specified,
  108. // it will have no effect for delete specific column families request.
  109. // If a Timestamp option is passed along, only the version at that timestamp will be removed
  110. // for delete specific column families and/or qualifier request.
  111. // This option cannot be used for delete entire row request.
  112. func DeleteOneVersion() func(Call) error {
  113. return func(o Call) error {
  114. m, ok := o.(*Mutate)
  115. if !ok {
  116. return errors.New("'DeleteOneVersion' option can only be used with mutation queries")
  117. }
  118. m.deleteOneVersion = true
  119. return nil
  120. }
  121. }
  122. // baseMutate returns a Mutate struct without the mutationType filled in.
  123. func baseMutate(ctx context.Context, table, key []byte, values map[string]map[string][]byte,
  124. options ...func(Call) error) (*Mutate, error) {
  125. m := &Mutate{
  126. base: base{
  127. table: table,
  128. key: key,
  129. ctx: ctx,
  130. resultch: make(chan RPCResult, 1),
  131. },
  132. values: values,
  133. timestamp: MaxTimestamp,
  134. }
  135. err := applyOptions(m, options...)
  136. if err != nil {
  137. return nil, err
  138. }
  139. return m, nil
  140. }
  141. // NewPut creates a new Mutation request to insert the given
  142. // family-column-values in the given row key of the given table.
  143. func NewPut(ctx context.Context, table, key []byte,
  144. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  145. m, err := baseMutate(ctx, table, key, values, options...)
  146. if err != nil {
  147. return nil, err
  148. }
  149. m.mutationType = pb.MutationProto_PUT
  150. return m, nil
  151. }
  152. // NewPutStr is just like NewPut but takes table and key as strings.
  153. func NewPutStr(ctx context.Context, table, key string,
  154. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  155. return NewPut(ctx, []byte(table), []byte(key), values, options...)
  156. }
  157. // NewDel is used to perform Delete operations on a single row.
  158. // To delete entire row, values should be nil.
  159. //
  160. // To delete specific families, qualifiers map should be nil:
  161. // map[string]map[string][]byte{
  162. // "cf1": nil,
  163. // "cf2": nil,
  164. // }
  165. //
  166. // To delete specific qualifiers:
  167. // map[string]map[string][]byte{
  168. // "cf": map[string][]byte{
  169. // "q1": nil,
  170. // "q2": nil,
  171. // },
  172. // }
  173. //
  174. // To delete all versions before and at a timestamp, pass hrpc.Timestamp() option.
  175. // By default all versions will be removed.
  176. //
  177. // To delete only a specific version at a timestamp, pass hrpc.DeleteOneVersion() option
  178. // along with a timestamp. For delete specific qualifiers request, if timestamp is not
  179. // passed, only the latest version will be removed. For delete specific families request,
  180. // the timestamp should be passed or it will have no effect as it's an expensive
  181. // operation to perform.
  182. func NewDel(ctx context.Context, table, key []byte,
  183. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  184. m, err := baseMutate(ctx, table, key, values, options...)
  185. if err != nil {
  186. return nil, err
  187. }
  188. if len(m.values) == 0 && m.deleteOneVersion {
  189. return nil, errors.New(
  190. "'DeleteOneVersion' option cannot be specified for delete entire row request")
  191. }
  192. m.mutationType = pb.MutationProto_DELETE
  193. return m, nil
  194. }
  195. // NewDelStr is just like NewDel but takes table and key as strings.
  196. func NewDelStr(ctx context.Context, table, key string,
  197. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  198. return NewDel(ctx, []byte(table), []byte(key), values, options...)
  199. }
  200. // NewApp creates a new Mutation request to append the given
  201. // family-column-values into the existing cells in HBase (or create them if
  202. // needed), in given row key of the given table.
  203. func NewApp(ctx context.Context, table, key []byte,
  204. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  205. m, err := baseMutate(ctx, table, key, values, options...)
  206. if err != nil {
  207. return nil, err
  208. }
  209. m.mutationType = pb.MutationProto_APPEND
  210. return m, nil
  211. }
  212. // NewAppStr is just like NewApp but takes table and key as strings.
  213. func NewAppStr(ctx context.Context, table, key string,
  214. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  215. return NewApp(ctx, []byte(table), []byte(key), values, options...)
  216. }
  217. // NewIncSingle creates a new Mutation request that will increment the given value
  218. // by amount in HBase under the given table, key, family and qualifier.
  219. func NewIncSingle(ctx context.Context, table, key []byte, family, qualifier string,
  220. amount int64, options ...func(Call) error) (*Mutate, error) {
  221. buf := make([]byte, 8)
  222. binary.BigEndian.PutUint64(buf, uint64(amount))
  223. value := map[string]map[string][]byte{family: map[string][]byte{qualifier: buf}}
  224. return NewInc(ctx, table, key, value, options...)
  225. }
  226. // NewIncStrSingle is just like NewIncSingle but takes table and key as strings.
  227. func NewIncStrSingle(ctx context.Context, table, key, family, qualifier string,
  228. amount int64, options ...func(Call) error) (*Mutate, error) {
  229. return NewIncSingle(ctx, []byte(table), []byte(key), family, qualifier, amount, options...)
  230. }
  231. // NewInc creates a new Mutation request that will increment the given values
  232. // in HBase under the given table and key.
  233. func NewInc(ctx context.Context, table, key []byte,
  234. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  235. m, err := baseMutate(ctx, table, key, values, options...)
  236. if err != nil {
  237. return nil, err
  238. }
  239. m.mutationType = pb.MutationProto_INCREMENT
  240. return m, nil
  241. }
  242. // NewIncStr is just like NewInc but takes table and key as strings.
  243. func NewIncStr(ctx context.Context, table, key string,
  244. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  245. return NewInc(ctx, []byte(table), []byte(key), values, options...)
  246. }
  247. // Name returns the name of this RPC call.
  248. func (m *Mutate) Name() string {
  249. return "Mutate"
  250. }
  251. // SkipBatch returns true if the Mutate request shouldn't be batched,
  252. // but should be sent to Region Server right away.
  253. func (m *Mutate) SkipBatch() bool {
  254. return m.skipbatch
  255. }
  256. func (m *Mutate) setSkipBatch(v bool) {
  257. m.skipbatch = v
  258. }
  259. func (m *Mutate) toProto() *pb.MutateRequest {
  260. var ts *uint64
  261. if m.timestamp != MaxTimestamp {
  262. ts = &m.timestamp
  263. }
  264. // We need to convert everything in the values field
  265. // to a protobuf ColumnValue
  266. cvs := make([]*pb.MutationProto_ColumnValue, len(m.values))
  267. i := 0
  268. for k, v := range m.values {
  269. // And likewise, each item in each column needs to be converted to a
  270. // protobuf QualifierValue
  271. // if it's a delete, figure out the type
  272. var dt *pb.MutationProto_DeleteType
  273. if m.mutationType == pb.MutationProto_DELETE {
  274. if len(v) == 0 {
  275. // delete the whole column family
  276. if m.deleteOneVersion {
  277. dt = pb.MutationProto_DELETE_FAMILY_VERSION.Enum()
  278. } else {
  279. dt = pb.MutationProto_DELETE_FAMILY.Enum()
  280. }
  281. // add empty qualifier
  282. if v == nil {
  283. v = make(map[string][]byte)
  284. }
  285. v[""] = nil
  286. } else {
  287. // delete specific qualifiers
  288. if m.deleteOneVersion {
  289. dt = pb.MutationProto_DELETE_ONE_VERSION.Enum()
  290. } else {
  291. dt = pb.MutationProto_DELETE_MULTIPLE_VERSIONS.Enum()
  292. }
  293. }
  294. }
  295. qvs := make([]*pb.MutationProto_ColumnValue_QualifierValue, len(v))
  296. j := 0
  297. for k1, v1 := range v {
  298. qvs[j] = &pb.MutationProto_ColumnValue_QualifierValue{
  299. Qualifier: []byte(k1),
  300. Value: v1,
  301. Timestamp: ts,
  302. DeleteType: dt,
  303. }
  304. j++
  305. }
  306. cvs[i] = &pb.MutationProto_ColumnValue{
  307. Family: []byte(k),
  308. QualifierValue: qvs,
  309. }
  310. i++
  311. }
  312. mProto := &pb.MutationProto{
  313. Row: m.key,
  314. MutateType: &m.mutationType,
  315. ColumnValue: cvs,
  316. Durability: pb.MutationProto_Durability(m.durability).Enum(),
  317. Timestamp: ts,
  318. }
  319. if len(m.ttl) > 0 {
  320. mProto.Attribute = append(mProto.Attribute, &pb.NameBytesPair{
  321. Name: &attributeNameTTL,
  322. Value: m.ttl,
  323. })
  324. }
  325. return &pb.MutateRequest{
  326. Region: m.regionSpecifier(),
  327. Mutation: mProto,
  328. }
  329. }
  330. // ToProto converts this mutate RPC into a protobuf message
  331. func (m *Mutate) ToProto() proto.Message {
  332. return m.toProto()
  333. }
  334. // NewResponse creates an empty protobuf message to read the response of this RPC.
  335. func (m *Mutate) NewResponse() proto.Message {
  336. return &pb.MutateResponse{}
  337. }
  338. // DeserializeCellBlocks deserializes mutate result from cell blocks
  339. func (m *Mutate) DeserializeCellBlocks(pm proto.Message, b []byte) (uint32, error) {
  340. resp := pm.(*pb.MutateResponse)
  341. if resp.Result == nil {
  342. // TODO: is this possible?
  343. return 0, nil
  344. }
  345. cells, read, err := deserializeCellBlocks(b, uint32(resp.Result.GetAssociatedCellCount()))
  346. if err != nil {
  347. return 0, err
  348. }
  349. resp.Result.Cell = append(resp.Result.Cell, cells...)
  350. return read, nil
  351. }