query.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package oplog
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "fmt"
  8. "math"
  9. "net/url"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "go-common/app/admin/main/dm/model/oplog"
  15. "go-common/library/conf/env"
  16. "go-common/library/log"
  17. xhttp "net/http"
  18. pkgerr "github.com/pkg/errors"
  19. )
  20. const (
  21. _singleQueryDMLogHbase = `{"startRow": "%s","stopRow": "%s","columns": {"family":"%s"}}`
  22. _userAgent = "User-Agent"
  23. )
  24. var (
  25. signParams = []string{"appKey", "timestamp", "version"}
  26. )
  27. // QueryOpLogs 查找弹幕操作日志,前方高能,这是一段极其恶心的代码(1. 数据平台的key和secret是根据个人用户生成目前是我的账号(madou) 2.sign算法是根据appkey,timestamp,version生成并大小写敏感)
  28. func (d *Dao) QueryOpLogs(c context.Context, dmid int64) (infos []*oplog.InfocResult, err error) {
  29. v := make(url.Values, 8)
  30. v.Set("appKey", d.key)
  31. v.Set("signMethod", "md5")
  32. v.Set("timestamp", time.Now().Format("2006-01-02 15:04:05"))
  33. v.Set("version", "1.0")
  34. //默认只查询三个月
  35. startRow, stopRow := d.makeRowKeyScope(dmid, -3)
  36. query := fmt.Sprintf(_singleQueryDMLogHbase, startRow, stopRow, "i")
  37. v.Set("query", query)
  38. var res struct {
  39. Code int `json:"code"`
  40. Result []*oplog.InfocResult `json:"result"`
  41. }
  42. if err = d.doHTTPRequest(c, d.infocQueryURL, "", v, &res); err != nil {
  43. log.Error("berserker url(%v), err(%v)", d.infocQueryURL+"?"+v.Encode(), err)
  44. return
  45. }
  46. if res.Code == 200 && len(res.Result) > 0 {
  47. infos = res.Result
  48. }
  49. return
  50. }
  51. // doHttpRequest make a http request for data platform api
  52. func (d *Dao) doHTTPRequest(c context.Context, uri, ip string, params url.Values, res interface{}) (err error) {
  53. enc, err := d.sign(params)
  54. if err != nil {
  55. err = pkgerr.Wrapf(err, "uri:%s,params:%v", uri, params)
  56. return
  57. }
  58. if enc != "" {
  59. uri = uri + "?" + enc
  60. }
  61. req, err := xhttp.NewRequest(xhttp.MethodGet, uri, nil)
  62. if err != nil {
  63. err = pkgerr.Wrapf(err, "method:%s,uri:%s", xhttp.MethodGet, uri)
  64. return
  65. }
  66. req.Header.Set(_userAgent, "haoguanwei@bilibili.com "+env.AppID)
  67. if err != nil {
  68. return
  69. }
  70. return d.httpCli.Do(c, req, res)
  71. }
  72. // Sign calc appkey and appsecret sign.
  73. func (d *Dao) sign(params url.Values) (query string, err error) {
  74. tmp := params.Encode()
  75. signTmp := d.encode(params)
  76. if strings.IndexByte(tmp, '+') > -1 {
  77. tmp = strings.Replace(tmp, "+", "%20", -1)
  78. }
  79. var b bytes.Buffer
  80. b.WriteString(d.secret)
  81. b.WriteString(signTmp)
  82. b.WriteString(d.secret)
  83. mh := md5.Sum(b.Bytes())
  84. // query
  85. var qb bytes.Buffer
  86. qb.WriteString(tmp)
  87. qb.WriteString("&sign=")
  88. qb.WriteString(strings.ToUpper(hex.EncodeToString(mh[:])))
  89. query = qb.String()
  90. return
  91. }
  92. // Encode encodes the values into ``URL encoded'' form
  93. // ("bar=baz&foo=quux") sorted by key.
  94. func (d *Dao) encode(v url.Values) string {
  95. if v == nil {
  96. return ""
  97. }
  98. var buf bytes.Buffer
  99. keys := make([]string, 0, len(v))
  100. for k := range v {
  101. keys = append(keys, k)
  102. }
  103. sort.Strings(keys)
  104. for _, k := range keys {
  105. found := false
  106. for _, p := range signParams {
  107. if p == k {
  108. found = true
  109. break
  110. }
  111. }
  112. if !found {
  113. continue
  114. }
  115. vs := v[k]
  116. prefix := k
  117. for _, v := range vs {
  118. buf.WriteString(prefix)
  119. buf.WriteString(v)
  120. }
  121. }
  122. return buf.String()
  123. }
  124. // rowkey存储方式: [dmid倒序补零20位][(Long.Max_Value - timestamp) 的结果后10位]
  125. func (d *Dao) makeRowKeyScope(dmid int64, months int) (startRow, endRow string) {
  126. endTime := time.Now()
  127. startTime := endTime.AddDate(0, months, 0)
  128. startTmp := strconv.FormatInt(math.MaxInt64-startTime.Unix(), 10)
  129. endTmp := strconv.FormatInt(math.MaxInt64-endTime.Unix(), 10)
  130. endRow = d.reverse(fmt.Sprintf("%020d", dmid)) + startTmp[len(startTmp)-10:]
  131. startRow = d.reverse(fmt.Sprintf("%020d", dmid)) + endTmp[len(endTmp)-10:]
  132. return
  133. }
  134. func (d *Dao) reverse(s string) string {
  135. n := len(s)
  136. runes := make([]rune, n)
  137. for _, rune := range s {
  138. n--
  139. runes[n] = rune
  140. }
  141. return string(runes[n:])
  142. }