hbase.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/tool/saga/conf"
  8. "go-common/library/log"
  9. "github.com/pkg/errors"
  10. "github.com/tsuna/gohbase/hrpc"
  11. )
  12. const (
  13. _sagaTable = "ep:saga"
  14. _ColFamily = "saga_auth"
  15. _cSagaPathOwner = "path_owner"
  16. _cSagaPathReviewer = "path_reviewer"
  17. )
  18. // sagaAuthKey ...
  19. func sagaAuthKey(projID int, branch string, path string) string {
  20. return fmt.Sprintf("saga_auth_%d_%s_%s", projID, branch, path)
  21. }
  22. // SetPathAuthH ...
  23. func (d *Dao) SetPathAuthH(c context.Context, projID int, branch string, path string, owners []string, reviewers []string) (err error) {
  24. var (
  25. key = sagaAuthKey(projID, branch, path)
  26. auth = make(map[string][]byte)
  27. bOwner []byte
  28. bReviewer []byte
  29. )
  30. if bOwner, err = json.Marshal(owners); err != nil {
  31. return errors.WithStack(err)
  32. }
  33. if bReviewer, err = json.Marshal(reviewers); err != nil {
  34. return errors.WithStack(err)
  35. }
  36. auth[_cSagaPathOwner] = bOwner
  37. auth[_cSagaPathReviewer] = bReviewer
  38. values := map[string]map[string][]byte{_ColFamily: auth}
  39. ctx, cancel := context.WithTimeout(c, time.Duration(conf.Conf.HBase.WriteTimeout))
  40. defer cancel()
  41. if _, err = d.hbase.PutStr(ctx, _sagaTable, key, values); err != nil {
  42. return errors.Wrapf(err, "hbase PutStr error (key: %s values: %v)", key, values)
  43. }
  44. return
  45. }
  46. // PathAuthH ...
  47. func (d *Dao) PathAuthH(ctx context.Context, projID int, branch string, path string) (owners []string, reviewers []string, err error) {
  48. var (
  49. key = sagaAuthKey(projID, branch, path)
  50. result *hrpc.Result
  51. )
  52. ctx, cancel := context.WithTimeout(ctx, time.Duration(conf.Conf.HBase.ReadTimeout))
  53. defer cancel()
  54. if result, err = d.hbase.GetStr(ctx, _sagaTable, key); err != nil {
  55. err = errors.Wrapf(err, "hbase GetStr error (key: %s)", key)
  56. return
  57. }
  58. for _, c := range result.Cells {
  59. switch string(c.Qualifier) {
  60. case _cSagaPathOwner:
  61. if err = json.Unmarshal(c.Value, &owners); err != nil {
  62. err = errors.WithStack(err)
  63. return
  64. }
  65. log.Info("Get key: (%s), owners Info: (%+v)", key, owners)
  66. case _cSagaPathReviewer:
  67. if err = json.Unmarshal(c.Value, &reviewers); err != nil {
  68. err = errors.WithStack(err)
  69. return
  70. }
  71. log.Info("Get key: (%s), reviewers Info: (%+v)", key, reviewers)
  72. }
  73. }
  74. return
  75. }
  76. // DeletePathAuthH ...
  77. func (d *Dao) DeletePathAuthH(c context.Context, projID int, branch string, path string) (err error) {
  78. key := sagaAuthKey(projID, branch, path)
  79. ctx, cancel := context.WithTimeout(c, time.Duration(conf.Conf.HBase.WriteTimeout))
  80. defer cancel()
  81. auth := make(map[string][]byte)
  82. auth[_cSagaPathOwner] = nil
  83. auth[_cSagaPathReviewer] = nil
  84. values := map[string]map[string][]byte{_ColFamily: auth}
  85. if _, err = d.hbase.Delete(ctx, _sagaTable, key, values); err != nil {
  86. err = errors.Wrapf(err, "hbase delete error (key: %s)", key)
  87. }
  88. return
  89. }