face_history.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "go-common/app/admin/main/member/model"
  11. "go-common/library/log"
  12. "go-common/library/sync/errgroup"
  13. xtime "go-common/library/time"
  14. "github.com/tsuna/gohbase/hrpc"
  15. )
  16. var (
  17. tableFaceByMidHistory = "account:user_face"
  18. tableFaceByOPHistory = "account:user_face_admin"
  19. )
  20. // reverse returns its argument string reversed rune-wise left to right.
  21. func reverse(s string) string {
  22. r := []rune(s)
  23. for i, j := 0, len(r)-1; i < len(r)/2; i, j = i+1, j-1 {
  24. r[i], r[j] = r[j], r[i]
  25. }
  26. return string(r)
  27. }
  28. func rpad(s string, c string, l int) string {
  29. dt := l - len(s)
  30. if dt <= 0 {
  31. return s
  32. }
  33. return s + strings.Repeat(c, dt)
  34. }
  35. // func lpad(s string, c string, l int) string {
  36. // dt := l - len(s)
  37. // if dt <= 0 {
  38. // return s
  39. // }
  40. // return strings.Repeat(c, dt) + s
  41. // }
  42. func midKey(mid int64, time xtime.Time) string {
  43. suf := rpad(strconv.FormatInt(math.MaxInt64-int64(time), 10), "0", 10)
  44. return fmt.Sprintf("%s%s", rpad(reverse(strconv.FormatInt(mid, 10)), "0", 10), string(suf[len(suf)-10:]))
  45. }
  46. func operatorKey(operator string, time xtime.Time) string {
  47. suf := rpad(strconv.FormatInt(math.MaxInt64-int64(time), 10), "0", 10)
  48. return fmt.Sprintf("%s%s", rpad(operator, "-", 20), string(suf[len(suf)-10:]))
  49. }
  50. func scanTimes(tDuration xtime.Time) int {
  51. times := int(tDuration)/(60*60) + 5
  52. if times > 50 {
  53. times = 50
  54. }
  55. return times
  56. }
  57. // FaceHistoryByMid is.
  58. func (d *Dao) FaceHistoryByMid(ctx context.Context, arg *model.ArgFaceHistory) (model.FaceRecordList, error) {
  59. size := arg.ETime - arg.STime
  60. stimes := scanTimes(size)
  61. chunk := xtime.Time(size / xtime.Time(stimes))
  62. if chunk == 0 {
  63. chunk = size
  64. }
  65. rendKey, rstartKey := midKey(arg.Mid, arg.STime), midKey(arg.Mid, arg.ETime)
  66. eg := errgroup.Group{}
  67. lock := sync.RWMutex{}
  68. records := make(model.FaceRecordList, 0)
  69. for i := 0; i < stimes; i++ {
  70. times := i + 1
  71. stime := arg.STime + (chunk * xtime.Time(i))
  72. etime := arg.STime + (chunk * xtime.Time(i+1))
  73. eg.Go(func() error {
  74. endKey, startKey := midKey(arg.Mid, stime), midKey(arg.Mid, etime)
  75. log.Info("FaceHistoryByMid: range: start: %s end: %s with times: %d scaning key: start: %s end: %s",
  76. rstartKey, rendKey, times, startKey, endKey)
  77. scanner, err := d.fhbymidhbase.ScanRangeStr(ctx, tableFaceByMidHistory, startKey, endKey)
  78. if err != nil {
  79. log.Error("hbase.ScanRangeStr(%s,%+v) error(%v)", tableFaceByMidHistory, arg, err)
  80. return err
  81. }
  82. for {
  83. r, err := scanner.Next()
  84. if err != nil {
  85. if err != io.EOF {
  86. return err
  87. }
  88. break
  89. }
  90. lock.Lock()
  91. records = append(records, toMidFaceRecord(r))
  92. lock.Unlock()
  93. }
  94. return nil
  95. })
  96. if etime >= arg.ETime {
  97. break
  98. }
  99. }
  100. if err := eg.Wait(); err != nil {
  101. return nil, err
  102. }
  103. return records, nil
  104. }
  105. // FaceHistoryByOP is.
  106. func (d *Dao) FaceHistoryByOP(ctx context.Context, arg *model.ArgFaceHistory) (model.FaceRecordList, error) {
  107. size := arg.ETime - arg.STime
  108. stimes := scanTimes(size)
  109. chunk := xtime.Time(size / xtime.Time(stimes))
  110. if chunk == 0 {
  111. chunk = size
  112. }
  113. rendKey, rstartKey := operatorKey(arg.Operator, arg.STime), operatorKey(arg.Operator, arg.ETime)
  114. eg := errgroup.Group{}
  115. lock := sync.RWMutex{}
  116. records := make(model.FaceRecordList, 0)
  117. for i := 0; i < stimes; i++ {
  118. times := i + 1
  119. stime := arg.STime + (chunk * xtime.Time(i))
  120. etime := arg.STime + (chunk * xtime.Time(i+1))
  121. eg.Go(func() error {
  122. endKey, startKey := operatorKey(arg.Operator, stime), operatorKey(arg.Operator, etime)
  123. log.Info("FaceHistoryByOP: range: start: %s end: %s with times: %d scaning key: start: %s end: %s",
  124. rstartKey, rendKey, times, startKey, endKey)
  125. scanner, err := d.fhbyophbase.ScanRangeStr(ctx, tableFaceByOPHistory, startKey, endKey)
  126. if err != nil {
  127. log.Error("hbase.ScanRangeStr(%s,%+v) error(%v)", tableFaceByOPHistory, arg, err)
  128. return err
  129. }
  130. for {
  131. r, err := scanner.Next()
  132. if err != nil {
  133. if err != io.EOF {
  134. return err
  135. }
  136. break
  137. }
  138. lock.Lock()
  139. records = append(records, toOPFaceRecord(r))
  140. lock.Unlock()
  141. }
  142. return nil
  143. })
  144. if etime >= arg.ETime {
  145. break
  146. }
  147. }
  148. if err := eg.Wait(); err != nil {
  149. return nil, err
  150. }
  151. return records, nil
  152. }
  153. func toOPFaceRecord(res *hrpc.Result) *model.FaceRecord {
  154. l := &model.FaceRecord{}
  155. for _, c := range res.Cells {
  156. key := string(c.Row)
  157. qf := string(c.Qualifier)
  158. v := string(c.Value)
  159. log.Info("Retrieving op face record history: key(%s) qualifier(%s) value(%s)", key, qf, v)
  160. // fill fields
  161. switch qf {
  162. case "id":
  163. l.ID, _ = strconv.ParseInt(v, 10, 64)
  164. case "mid":
  165. l.Mid, _ = strconv.ParseInt(v, 10, 64)
  166. case "sta":
  167. l.Status = model.ParseStatus(v)
  168. case "op":
  169. l.Operator = v
  170. case "old":
  171. l.OldFace = v
  172. case "new":
  173. l.NewFace = v
  174. case "apply":
  175. l.ApplyTime = model.ParseApplyTime(v)
  176. case "mtm":
  177. l.ModifyTime, _ = model.ParseLogTime(v)
  178. }
  179. }
  180. return l
  181. }
  182. func toMidFaceRecord(res *hrpc.Result) *model.FaceRecord {
  183. l := &model.FaceRecord{}
  184. for _, c := range res.Cells {
  185. key := string(c.Row)
  186. qf := string(c.Qualifier)
  187. v := string(c.Value)
  188. log.Info("Retrieving mid face record history: key(%s) qualifier(%s) value(%s)", key, qf, v)
  189. // fill fields
  190. switch qf {
  191. // case "id":
  192. // l.ID, _ = strconv.ParseInt(v, 10, 64)
  193. case "mid":
  194. l.Mid, _ = strconv.ParseInt(v, 10, 64)
  195. case "s":
  196. l.Status = model.ParseStatus(v)
  197. case "op":
  198. l.Operator = v
  199. case "of":
  200. l.OldFace = v
  201. case "nf":
  202. l.NewFace = v
  203. case "at":
  204. l.ApplyTime = model.ParseApplyTime(v)
  205. case "mt":
  206. l.ModifyTime, _ = model.ParseLogTime(v)
  207. }
  208. }
  209. return l
  210. }