hbase.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/binary"
  7. "fmt"
  8. "io"
  9. "strconv"
  10. "time"
  11. "go-common/app/service/main/secure/model"
  12. "go-common/library/log"
  13. xtime "go-common/library/time"
  14. "github.com/tsuna/gohbase/hrpc"
  15. )
  16. var (
  17. tableLog = "ugc:login_log"
  18. familyTS = "ts"
  19. familyTSB = []byte(familyTS)
  20. tableSecure = "ugc:secure"
  21. familyEcpt = "ecpt"
  22. familyEcptB = []byte(familyEcpt)
  23. columnid = "id"
  24. columnloc = "loc"
  25. columntp = "tp"
  26. columnip = "ip"
  27. columnts = "ts"
  28. // familyFB = "feedback"
  29. // familyFBB = []byte(familyFB)
  30. )
  31. // rowKey return key string.
  32. func rowKey(mid int64) string {
  33. sum := md5.Sum([]byte(strconv.FormatInt(mid, 10)))
  34. return fmt.Sprintf("%x", sum)
  35. }
  36. func exKey(mid, ts int64, ip uint32) string {
  37. return fmt.Sprintf("%d%d_%d_%d", mid%10, mid, ts, ip)
  38. }
  39. func exStartkey(mid int64) string {
  40. return fmt.Sprintf("%d%d", mid%10, mid)
  41. }
  42. func exStopKey(mid int64) string {
  43. return fmt.Sprintf("%d%d", mid%10, mid+1)
  44. }
  45. // AddLocs add login log.
  46. func (d *Dao) AddLocs(c context.Context, mid, locid, ts int64) (err error) {
  47. var (
  48. locB = make([]byte, 8)
  49. key = rowKey(mid)
  50. column = strconv.FormatInt(ts, 10)
  51. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.WriteTimeout))
  52. )
  53. defer cancel()
  54. binary.BigEndian.PutUint64(locB, uint64(locid))
  55. values := map[string]map[string][]byte{familyTS: {column: locB}}
  56. if _, err = d.hbase.PutStr(ctx, tableLog, key, values); err != nil {
  57. log.Error("hbase.Put error(%v)", err)
  58. }
  59. return
  60. }
  61. // Locs get all login location.
  62. func (d *Dao) Locs(c context.Context, mid int64) (locs map[int64]int64, err error) {
  63. var (
  64. result *hrpc.Result
  65. key = rowKey(mid)
  66. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.ReadTimeout))
  67. )
  68. defer cancel()
  69. if result, err = d.hbase.GetStr(ctx, tableLog, key); err != nil {
  70. log.Error("d.hbase.Get error(%v)", err)
  71. return
  72. }
  73. if result == nil {
  74. return
  75. }
  76. locs = make(map[int64]int64)
  77. for _, c := range result.Cells {
  78. if c != nil && len(c.Value) == 8 && bytes.Equal(c.Family, familyTSB) {
  79. locid := int64(binary.BigEndian.Uint64(c.Value))
  80. locs[locid]++
  81. }
  82. }
  83. return
  84. }
  85. // AddException add feedback.
  86. func (d *Dao) AddException(c context.Context, l *model.Log) (err error) {
  87. var (
  88. idB = make([]byte, 8)
  89. ipB = make([]byte, 8)
  90. tsB = make([]byte, 8)
  91. key = exKey(l.Mid, int64(l.Time), l.IP)
  92. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.WriteTimeout))
  93. )
  94. defer cancel()
  95. binary.BigEndian.PutUint64(idB, uint64(l.LocationID))
  96. binary.BigEndian.PutUint64(ipB, uint64(l.IP))
  97. binary.BigEndian.PutUint64(tsB, uint64(l.Time))
  98. values := map[string]map[string][]byte{familyEcpt: {
  99. columnid: idB,
  100. columnloc: []byte(l.Location),
  101. columnts: tsB,
  102. columnip: ipB,
  103. }}
  104. if _, err = d.hbase.PutStr(ctx, tableSecure, key, values); err != nil {
  105. log.Error("hbase.Put error(%v)", err)
  106. }
  107. return
  108. }
  109. // AddFeedBack add feedback
  110. func (d *Dao) AddFeedBack(c context.Context, l *model.Log) (err error) {
  111. var (
  112. tpB = make([]byte, 8)
  113. key = exKey(l.Mid, int64(l.Time), l.IP)
  114. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.WriteTimeout))
  115. )
  116. defer cancel()
  117. binary.BigEndian.PutUint64(tpB, uint64(l.Type))
  118. values := map[string]map[string][]byte{familyEcpt: {columntp: tpB}}
  119. if _, err = d.hbase.PutStr(ctx, tableSecure, key, values); err != nil {
  120. log.Error("hbase.Put error(%v)", err)
  121. }
  122. return
  123. }
  124. // ExceptionLoc get exception loc.
  125. func (d *Dao) ExceptionLoc(c context.Context, mid int64) (eps []*model.Expection, err error) {
  126. var (
  127. scanner hrpc.Scanner
  128. result *hrpc.Result
  129. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.ReadTimeout))
  130. )
  131. defer cancel()
  132. scanner, err = d.hbase.ScanRangeStr(ctx, tableSecure, exStartkey(mid), exStopKey(mid))
  133. if err != nil {
  134. log.Error("d.hbase.ScanRangeStr error(%v)", err)
  135. return
  136. }
  137. for {
  138. result, err = scanner.Next()
  139. if err != nil {
  140. if err == io.EOF {
  141. err = nil
  142. }
  143. return
  144. }
  145. ep := new(model.Expection)
  146. for _, c := range result.Cells {
  147. if c != nil && bytes.Equal(c.Family, familyEcptB) {
  148. switch string(c.Qualifier) {
  149. case columnts:
  150. ep.Time = xtime.Time(binary.BigEndian.Uint64(c.Value))
  151. case columntp:
  152. ep.FeedBack = int8(binary.BigEndian.Uint64(c.Value))
  153. case columnip:
  154. ep.IP = binary.BigEndian.Uint64(c.Value)
  155. }
  156. }
  157. }
  158. eps = append(eps, ep)
  159. }
  160. }