task.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "fmt"
  8. "net/http"
  9. "net/url"
  10. "sort"
  11. "strings"
  12. "time"
  13. "go-common/library/ecode"
  14. "go-common/library/log"
  15. )
  16. const (
  17. _selectQuery = "SELECT index.dmid,index.oid,index.mid,index.state,content.msg,b_long2ip(content.ip),content.ctime FROM ods.ods_dm_index AS index, ods.ods_dm_content AS content WHERE index.dmid=content.dmid AND index.state in (0,2,6) %s limit 1000000"
  18. )
  19. var (
  20. signParams = []string{"appKey", "timestamp", "version"}
  21. )
  22. // SendTask send task to BI
  23. func (d *Dao) SendTask(c context.Context, taskSQL []string) (statusURL string, err error) {
  24. var (
  25. sql string
  26. res struct {
  27. Code int64 `json:"code"`
  28. StatusURL string `json:"jobStatusUrl"`
  29. Message string `json:"msg"`
  30. }
  31. params = url.Values{}
  32. )
  33. if len(taskSQL) > 0 {
  34. sql = fmt.Sprintf(" AND %s", strings.Join(taskSQL, " AND "))
  35. } else {
  36. err = ecode.RequestErr
  37. return
  38. }
  39. log.Warn("send task sql(%s)", fmt.Sprintf(_selectQuery, sql))
  40. params.Set("appKey", "672bc22888af701529e8b3052fd2c4a7")
  41. params.Set("query", fmt.Sprintf(_selectQuery, sql))
  42. params.Set("timestamp", time.Now().Format("2006-01-02 15:04:05"))
  43. params.Set("version", "1.0")
  44. params.Set("signMethod", "md5")
  45. uri := d.berserkerURI + "?" + sign(params)
  46. log.Warn("send task uri(%s)", uri)
  47. req, err := http.NewRequest(http.MethodGet, uri, nil)
  48. if err != nil {
  49. log.Error("http.NewRequest(%s) error(%v)", uri, err)
  50. return
  51. }
  52. for i := 0; i < 3; i++ {
  53. if err = d.httpCli.Do(c, req, &res); err != nil {
  54. log.Error("d.httpCli.Do error:%v", err)
  55. time.Sleep(100 * time.Millisecond)
  56. continue
  57. }
  58. if res.Code != 200 {
  59. err = fmt.Errorf("uri:%s,code:%d", uri, res.Code)
  60. log.Error("d.res.Code error:%v", err)
  61. time.Sleep(100 * time.Millisecond)
  62. continue
  63. }
  64. break
  65. }
  66. if err != nil {
  67. log.Error("d.SendTask(uri:%s) error(%v)", uri, err)
  68. }
  69. return res.StatusURL, err
  70. }
  71. // Sign calculate appkey and appsecret sign.
  72. func sign(params url.Values) (query string) {
  73. tmp := params.Encode()
  74. signTmp := encode(params)
  75. if strings.IndexByte(tmp, '+') > -1 {
  76. tmp = strings.Replace(tmp, "+", "%20", -1)
  77. }
  78. var b bytes.Buffer
  79. b.WriteString("bee5e4b744a22a59abbaecc7ade5de9c")
  80. b.WriteString(signTmp)
  81. b.WriteString("bee5e4b744a22a59abbaecc7ade5de9c")
  82. mh := md5.Sum(b.Bytes())
  83. // fmt.Println(b.String())
  84. // query
  85. var qb bytes.Buffer
  86. qb.WriteString(tmp)
  87. qb.WriteString("&sign=")
  88. qb.WriteString(strings.ToUpper(hex.EncodeToString(mh[:])))
  89. query = qb.String()
  90. return
  91. }
  92. // Encode encodes the values into ``sign encoded'' form
  93. // ("barbazfooquux") sorted by key.
  94. func encode(v url.Values) string {
  95. if v == nil {
  96. return ""
  97. }
  98. var buf bytes.Buffer
  99. keys := make([]string, 0, len(v))
  100. for k := range v {
  101. keys = append(keys, k)
  102. }
  103. sort.Strings(keys)
  104. for _, k := range keys {
  105. found := false
  106. for _, p := range signParams {
  107. if p == k {
  108. found = true
  109. break
  110. }
  111. }
  112. if !found {
  113. continue
  114. }
  115. vs := v[k]
  116. prefix := k
  117. for _, v := range vs {
  118. buf.WriteString(prefix)
  119. buf.WriteString(v)
  120. }
  121. }
  122. return buf.String()
  123. }