hbase_face.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "io"
  6. "strconv"
  7. "time"
  8. "go-common/app/service/main/passport/model"
  9. "go-common/library/log"
  10. "github.com/tsuna/gohbase/filter"
  11. "github.com/tsuna/gohbase/hrpc"
  12. )
  13. const (
  14. _tFace = "account:user_face"
  15. _fFaceApply = "c"
  16. _cMid = "mid"
  17. _cApplyTime = "at"
  18. _cModifyTime = "mt"
  19. _cNewFace = "nf"
  20. _cOldFace = "of"
  21. _cOperator = "op"
  22. _cStatus = "s"
  23. _maxIDLen = 10
  24. )
  25. var (
  26. _tFaceB = []byte(_tFace)
  27. _cMidB = []byte(_cMid)
  28. _fFaceApplyB = []byte(_fFaceApply)
  29. _cApplyTimeB = []byte(_cApplyTime)
  30. _cModifyTimeB = []byte(_cModifyTime)
  31. _cNewFaceB = []byte(_cNewFace)
  32. _cOldFaceB = []byte(_cOldFace)
  33. _cOperatorB = []byte(_cOperator)
  34. _cStatusB = []byte(_cStatus)
  35. )
  36. // FaceApplies get face applies from hbase.
  37. func (d *Dao) FaceApplies(c context.Context, mid, from, to int64, status, operator string) (res []*model.FaceApply, err error) {
  38. midStr := strconv.FormatInt(mid, 10)
  39. if !checkIDLen(midStr) {
  40. log.Error("midInt64: %d, midStr: %s, len(midStr): %d, exceed max length %d", mid, midStr, len(midStr), _maxIDLen)
  41. return
  42. }
  43. ctx, cancel := context.WithTimeout(c, time.Duration(d.c.HBase.FaceApply.ReadTimeout))
  44. defer cancel()
  45. st := rowKeyFaceApplyMts(midStr, to, _uint32Max)
  46. if len(st) == 0 {
  47. return
  48. }
  49. ed := rowKeyFaceApplyMts(midStr, from, 0)
  50. if len(st) == 0 {
  51. return
  52. }
  53. fts := filter.NewList(filter.MustPassAll)
  54. if status != "" {
  55. statusFt := filter.NewSingleColumnValueFilter(_fFaceApplyB, _cStatusB, filter.Equal, filter.NewBinaryComparator(filter.NewByteArrayComparable([]byte(status))), true, true)
  56. fts.AddFilters(statusFt)
  57. }
  58. if operator != "" {
  59. operatorFt := filter.NewSingleColumnValueFilter(_fFaceApplyB, _cOperatorB, filter.Equal, filter.NewBinaryComparator(filter.NewByteArrayComparable([]byte(operator))), true, true)
  60. fts.AddFilters(operatorFt)
  61. }
  62. var options []func(hrpc.Call) error
  63. if len(fts.Filters) > 0 {
  64. options = append(options, hrpc.Filters(fts))
  65. }
  66. var scaner hrpc.Scanner
  67. scaner, err = d.hbase.ScanRange(ctx, _tFaceB, st, ed, options...)
  68. if err != nil {
  69. log.Error("hbase.ScanRange(%s, %s, %s) error(%v)", _tFaceB, st, ed, err)
  70. return
  71. }
  72. var rs []*hrpc.Result
  73. for {
  74. var r *hrpc.Result
  75. r, err = scaner.Next()
  76. if err != nil {
  77. if err == io.EOF {
  78. // set err nil
  79. err = nil
  80. break
  81. }
  82. return
  83. }
  84. rs = append(rs, r)
  85. }
  86. if len(rs) == 0 {
  87. return
  88. }
  89. res = make([]*model.FaceApply, 0, len(rs))
  90. for _, r := range rs {
  91. var u *model.FaceApply
  92. if u, err = scanFaceRecord(r.Cells); err != nil {
  93. return
  94. }
  95. if u != nil {
  96. res = append(res, u)
  97. }
  98. }
  99. return
  100. }
  101. // rowKeyFaceApplyMts get row key for face apply using this schema:
  102. // mid string reverse with right fill 0 +
  103. // (int64_max - mts) string cut last 10 digit +
  104. // (unsigned_int32_max - id) string with left fill 0.
  105. func rowKeyFaceApplyMts(midStr string, mts, id int64) []byte {
  106. rMid := reverseID(midStr, 10)
  107. if len(rMid) == 0 {
  108. return nil
  109. }
  110. b := bytes.Buffer{}
  111. b.WriteString(rMid)
  112. rMTS := diffTs(mts)
  113. b.WriteString(rMTS)
  114. rID := diffID(id)
  115. b.WriteString(rID)
  116. return b.Bytes()
  117. }
  118. func scanFaceRecord(cells []*hrpc.Cell) (res *model.FaceApply, err error) {
  119. if len(cells) == 0 {
  120. return
  121. }
  122. res = new(model.FaceApply)
  123. for _, cell := range cells {
  124. if bytes.Equal(cell.Family, _fFaceApplyB) {
  125. switch {
  126. case bytes.Equal(cell.Qualifier, _cMidB):
  127. if res.Mid, err = strconv.ParseInt(string(cell.Value), 10, 64); err != nil {
  128. log.Error("failed to parse mid from cell, strconv.ParseInt(%s, 10, 64) error(%v)", cell.Value, err)
  129. return
  130. }
  131. case bytes.Equal(cell.Qualifier, _cOldFaceB):
  132. res.OldFace = string(cell.Value)
  133. case bytes.Equal(cell.Qualifier, _cNewFaceB):
  134. res.NewFace = string(cell.Value)
  135. case bytes.Equal(cell.Qualifier, _cApplyTimeB):
  136. if res.ApplyTime, err = strconv.ParseInt(string(cell.Value), 10, 64); err != nil {
  137. log.Error("failed to parse apply_time from cell, strconv.ParseInt(%s, 10, 64) error(%v)", cell.Value, err)
  138. return
  139. }
  140. case bytes.Equal(cell.Qualifier, _cStatusB):
  141. res.Status = string(cell.Value)
  142. case bytes.Equal(cell.Qualifier, _cOperatorB):
  143. res.Operator = string(cell.Value)
  144. case bytes.Equal(cell.Qualifier, _cModifyTimeB):
  145. res.ModifyTime = string(cell.Value)
  146. }
  147. }
  148. }
  149. return
  150. }