dataplatform.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "fmt"
  8. "net/http"
  9. "net/url"
  10. "sort"
  11. "strings"
  12. "time"
  13. "go-common/app/job/main/push/model"
  14. "go-common/library/ecode"
  15. "go-common/library/log"
  16. )
  17. const (
  18. // 提交查询的接口
  19. _dpSubmitQueryURL = "http://berserker.bilibili.co/avenger/api/74/query"
  20. )
  21. var (
  22. dpSignParams = []string{"appKey", "timestamp", "version"}
  23. )
  24. // DpSubmitQuery .
  25. func (d *Dao) DpSubmitQuery(ctx context.Context, query string) (statusRUL string, err error) {
  26. params := d.params()
  27. params.Set("query", query)
  28. var res struct {
  29. Code int `json:"code"`
  30. Msg string `json:"msg"`
  31. StatusURL string `json:"jobStatusUrl"`
  32. }
  33. if err = d.newRequest(ctx, _dpSubmitQueryURL, params, &res); err != nil {
  34. log.Error("d.DpSubmitQuery newRequest url(%s) error(%v)", _dpSubmitQueryURL+"?"+params.Encode(), err)
  35. return
  36. }
  37. if res.Code != http.StatusOK {
  38. log.Error("d.DpSubmitQuery newRequest error code:%d ; url(%s) ", res.Code, _dpSubmitQueryURL+"?"+params.Encode())
  39. err = ecode.Int(res.Code)
  40. err = fmt.Errorf("code(%d) msg(%s)", res.Code, res.Msg)
  41. return
  42. }
  43. statusRUL = res.StatusURL
  44. return
  45. }
  46. // DpCheckJob .
  47. func (d *Dao) DpCheckJob(ctx context.Context, url string) (res *model.DpCheckJobResult, err error) {
  48. params := d.params()
  49. if err = d.newRequest(ctx, url, params, &res); err != nil {
  50. log.Error("d.DpCheckJob newRequest error(%v)", err)
  51. return
  52. }
  53. if res.Code != http.StatusOK {
  54. log.Error("d.DpCheckJob newRequest error code:%d ; url(%s) ", res.Code, url+"?"+params.Encode())
  55. err = fmt.Errorf("code(%d) msg(%s)", res.Code, res.Msg)
  56. }
  57. return
  58. }
  59. // DpDownloadFile .
  60. func (d *Dao) DpDownloadFile(ctx context.Context, url string) (content []byte, err error) {
  61. req, _ := http.NewRequest(http.MethodGet, url, nil)
  62. if content, err = d.dpClient.Raw(ctx, req); err != nil {
  63. log.Error("d.dpClient.Raw(%s) error(%v)", url, err)
  64. }
  65. return
  66. }
  67. func (d *Dao) params() url.Values {
  68. params := url.Values{}
  69. params.Set("appKey", d.c.DpClient.Key)
  70. params.Set("timestamp", time.Now().Format("2006-01-02 15:04:05"))
  71. params.Set("version", "1.0")
  72. params.Set("signMethod", "md5")
  73. return params
  74. }
  75. // newRequest new http request with method, url, ip, values and headers.
  76. func (d *Dao) newRequest(c context.Context, url string, params url.Values, res interface{}) (err error) {
  77. enc, err := d.dpSign(params)
  78. if err != nil {
  79. log.Error("url:%s,params:%v", url, params)
  80. return
  81. }
  82. if enc != "" {
  83. url = url + "?" + enc
  84. }
  85. req, err := http.NewRequest(http.MethodGet, url, nil)
  86. if err != nil {
  87. log.Error("method:%s,url:%s", http.MethodGet, url)
  88. return
  89. }
  90. return d.httpClient.Do(c, req, res)
  91. }
  92. // dpSign calc appkey and appsecret sign.
  93. func (d *Dao) dpSign(params url.Values) (query string, err error) {
  94. tmp := params.Encode()
  95. signTmp := d.encode(params)
  96. if strings.IndexByte(tmp, '+') > -1 {
  97. tmp = strings.Replace(tmp, "+", "%20", -1)
  98. }
  99. var b bytes.Buffer
  100. b.WriteString(d.c.DpClient.Secret)
  101. b.WriteString(signTmp)
  102. b.WriteString(d.c.DpClient.Secret)
  103. mh := md5.Sum(b.Bytes())
  104. var qb bytes.Buffer
  105. qb.WriteString(tmp)
  106. qb.WriteString("&sign=")
  107. qb.WriteString(strings.ToUpper(hex.EncodeToString(mh[:])))
  108. query = qb.String()
  109. return
  110. }
  111. // Encode encodes the values into ``URL encoded'' form
  112. // ("bar=baz&foo=quux") sorted by key.
  113. func (d *Dao) encode(v url.Values) string {
  114. if v == nil {
  115. return ""
  116. }
  117. var buf bytes.Buffer
  118. keys := make([]string, 0, len(v))
  119. for k := range v {
  120. keys = append(keys, k)
  121. }
  122. sort.Strings(keys)
  123. for _, k := range keys {
  124. found := false
  125. for _, p := range dpSignParams {
  126. if p == k {
  127. found = true
  128. break
  129. }
  130. }
  131. if !found {
  132. continue
  133. }
  134. vs := v[k]
  135. prefix := k
  136. for _, v := range vs {
  137. buf.WriteString(prefix)
  138. buf.WriteString(v)
  139. }
  140. }
  141. return buf.String()
  142. }