bfs.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package bfs
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/hmac"
  6. "crypto/md5"
  7. "crypto/sha1"
  8. "encoding/base64"
  9. "encoding/hex"
  10. "fmt"
  11. "hash"
  12. "io"
  13. "net/http"
  14. "strconv"
  15. "time"
  16. "go-common/app/admin/main/feed/conf"
  17. "go-common/library/log"
  18. )
  19. const (
  20. _template = "%s\n%s\n\n%d\n"
  21. _method = "PUT"
  22. )
  23. // Dao is bfs dao.
  24. type Dao struct {
  25. c *conf.Config
  26. client *http.Client
  27. bucket string
  28. url string
  29. key string
  30. secret string
  31. }
  32. //New bfs dao.
  33. func New(c *conf.Config) (d *Dao) {
  34. d = &Dao{
  35. c: c,
  36. // http client
  37. client: &http.Client{
  38. Timeout: time.Duration(c.Bfs.Timeout),
  39. },
  40. bucket: c.Bfs.Bucket,
  41. url: c.Bfs.Addr,
  42. key: c.Bfs.Key,
  43. secret: c.Bfs.Secret,
  44. }
  45. return
  46. }
  47. // Upload upload bfs.
  48. func (d *Dao) Upload(c context.Context, fileType string, body io.Reader) (location string, err error) {
  49. req, err := http.NewRequest(_method, d.url, body)
  50. if err != nil {
  51. log.Error("http.NewRequest error (%v) | fileType(%s) body(%v)", err, fileType, body)
  52. return
  53. }
  54. expire := time.Now().Unix()
  55. authorization := authorize(d.key, d.secret, _method, d.bucket, expire)
  56. req.Header.Set("Host", d.url)
  57. req.Header.Add("Date", fmt.Sprint(expire))
  58. req.Header.Add("Authorization", authorization)
  59. req.Header.Add("Content-Type", fileType)
  60. log.Error("Authorization_:%v", authorization)
  61. // timeout
  62. c, cancel := context.WithTimeout(c, time.Duration(d.c.Bfs.Timeout))
  63. req = req.WithContext(c)
  64. defer cancel()
  65. resp, err := d.client.Do(req)
  66. if err != nil {
  67. log.Error("d.Client.Do error(%v) | _url(%s) req(%v)", err, d.url, req)
  68. err = fmt.Errorf("d.Client.Do error(%v) | _url(%s) req(%v)", err, d.url, req)
  69. return
  70. }
  71. if resp.StatusCode != http.StatusOK {
  72. log.Error("Upload http.StatusCode nq http.StatusOK (%d) | url(%s)", resp.StatusCode, d.url)
  73. err = fmt.Errorf("Upload http.StatusCode nq http.StatusOK (%d) | url(%s)", resp.StatusCode, d.url)
  74. return
  75. }
  76. header := resp.Header
  77. code := header.Get("Code")
  78. if code != strconv.Itoa(http.StatusOK) {
  79. log.Error("strconv.Itoa err, code(%s) | url(%s)", code, d.url)
  80. err = fmt.Errorf("strconv.Itoa err, code(%s) | url(%s)", code, d.url)
  81. return
  82. }
  83. location = header.Get("Location")
  84. return
  85. }
  86. // authorize returns authorization for upload file to bfs
  87. func authorize(key, secret, method, bucket string, expire int64) (authorization string) {
  88. var (
  89. content string
  90. mac hash.Hash
  91. signature string
  92. )
  93. content = fmt.Sprintf(_template, method, bucket, expire)
  94. mac = hmac.New(sha1.New, []byte(secret))
  95. mac.Write([]byte(content))
  96. signature = base64.StdEncoding.EncodeToString(mac.Sum(nil))
  97. authorization = fmt.Sprintf("%s:%s:%d", key, signature, expire)
  98. return
  99. }
  100. // FileMd5 calculates the local file's md5 and store it in a file
  101. func (d *Dao) FileMd5(content []byte) (md5Str string, err error) {
  102. md5hash := md5.New()
  103. if _, err = io.Copy(md5hash, bytes.NewReader(content)); err != nil {
  104. log.Error("FileMd5 is error (%v)", err)
  105. return
  106. }
  107. md5 := md5hash.Sum(nil)
  108. md5Str = hex.EncodeToString(md5[:])
  109. return
  110. }