hdfs.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package dao
  2. import (
  3. "context"
  4. "io/ioutil"
  5. "net/http"
  6. "net/url"
  7. "time"
  8. "github.com/json-iterator/go"
  9. "go-common/app/job/bbq/recall/internal/conf"
  10. "go-common/app/job/bbq/recall/internal/model"
  11. )
  12. func (d *Dao) queryHDFS(c context.Context, api string, key *conf.BerserkerKey, suffix string) (result *[]byte, err error) {
  13. dt := time.Now().Format("2006-01-02 15:04:05")
  14. sign := d.berserkerSign(key.AppKey, key.Secret, dt, "1.0")
  15. params := &url.Values{}
  16. params.Set("appKey", key.AppKey)
  17. params.Set("timestamp", dt)
  18. params.Set("version", "1.0")
  19. params.Set("signMethod", "md5")
  20. params.Set("sign", sign)
  21. fileSuffix := struct {
  22. FileSuffix string `json:"fileSuffix"`
  23. }{
  24. FileSuffix: suffix,
  25. }
  26. j, err := jsoniter.Marshal(fileSuffix)
  27. params.Set("query", string(j))
  28. for retry := 0; retry < 3; retry++ {
  29. resp, err := http.DefaultClient.Get(api + "?" + params.Encode())
  30. if err != nil {
  31. continue
  32. }
  33. b, err := ioutil.ReadAll(resp.Body)
  34. resp.Body.Close()
  35. if err == nil && len(b) > 0 {
  36. result = &b
  37. break
  38. }
  39. // sleep 5s berserker limit
  40. time.Sleep(5 * time.Second)
  41. }
  42. return
  43. }
  44. func (d *Dao) scanHDFSPath(c context.Context, api string, key *conf.BerserkerKey, suffix string) (result *model.HDFSResult, err error) {
  45. b, err := d.queryHDFS(c, api, key, suffix)
  46. if err != nil {
  47. return
  48. }
  49. result = &model.HDFSResult{}
  50. err = jsoniter.Unmarshal(*b, result)
  51. return
  52. }
  53. func (d *Dao) loadHDFSFile(c context.Context, api string, key *conf.BerserkerKey, suffix string) (result *[]byte, err error) {
  54. return d.queryHDFS(c, api, key, suffix)
  55. }