gcs.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package gcs
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "hash/crc32"
  19. "log"
  20. "net/url"
  21. "strings"
  22. "cloud.google.com/go/storage"
  23. "google.golang.org/api/option"
  24. )
  25. // ClientWithCreds returns a storage client, optionally authenticated with the specified .json creds
  26. func ClientWithCreds(ctx context.Context, creds ...string) (*storage.Client, error) {
  27. var options []option.ClientOption
  28. switch l := len(creds); l {
  29. case 0: // Do nothing
  30. case 1:
  31. options = append(options, option.WithCredentialsFile(creds[0]))
  32. default:
  33. return nil, fmt.Errorf("%d creds files unsupported (at most 1)", l)
  34. }
  35. return storage.NewClient(ctx, options...)
  36. }
  37. // Path parses gs://bucket/obj urls
  38. type Path struct {
  39. url url.URL
  40. }
  41. // String returns the gs://bucket/obj url
  42. func (g Path) String() string {
  43. return g.url.String()
  44. }
  45. // Set updates value from a gs://bucket/obj string, validating errors.
  46. func (g *Path) Set(v string) error {
  47. u, err := url.Parse(v)
  48. if err != nil {
  49. return fmt.Errorf("invalid gs:// url %s: %v", v, err)
  50. }
  51. return g.SetURL(u)
  52. }
  53. // SetURL updates value to the passed in gs://bucket/obj url
  54. func (g *Path) SetURL(u *url.URL) error {
  55. switch {
  56. case u == nil:
  57. return errors.New("nil url")
  58. case u.Scheme != "gs":
  59. return fmt.Errorf("must use a gs:// url: %s", u)
  60. case strings.Contains(u.Host, ":"):
  61. return fmt.Errorf("gs://bucket may not contain a port: %s", u)
  62. case u.Opaque != "":
  63. return fmt.Errorf("url must start with gs://: %s", u)
  64. case u.User != nil:
  65. return fmt.Errorf("gs://bucket may not contain an user@ prefix: %s", u)
  66. case u.RawQuery != "":
  67. return fmt.Errorf("gs:// url may not contain a ?query suffix: %s", u)
  68. case u.Fragment != "":
  69. return fmt.Errorf("gs:// url may not contain a #fragment suffix: %s", u)
  70. }
  71. g.url = *u
  72. return nil
  73. }
  74. // ResolveReference returns the path relative to the current path
  75. func (g Path) ResolveReference(ref *url.URL) (*Path, error) {
  76. var newP Path
  77. if err := newP.SetURL(g.url.ResolveReference(ref)); err != nil {
  78. return nil, err
  79. }
  80. return &newP, nil
  81. }
  82. // Bucket returns bucket in gs://bucket/obj
  83. func (g Path) Bucket() string {
  84. return g.url.Host
  85. }
  86. // Object returns path/to/something in gs://bucket/path/to/something
  87. func (g Path) Object() string {
  88. if g.url.Path == "" {
  89. return g.url.Path
  90. }
  91. return g.url.Path[1:]
  92. }
  93. func calcCRC(buf []byte) uint32 {
  94. return crc32.Checksum(buf, crc32.MakeTable(crc32.Castagnoli))
  95. }
  96. // Upload writes bytes to the specified Path
  97. func Upload(ctx context.Context, client *storage.Client, path Path, buf []byte) error {
  98. crc := calcCRC(buf)
  99. w := client.Bucket(path.Bucket()).Object(path.Object()).NewWriter(ctx)
  100. w.SendCRC32C = true
  101. // Send our CRC32 to ensure google received the same data we sent.
  102. // See checksum example at:
  103. // https://godoc.org/cloud.google.com/go/storage#Writer.Write
  104. w.ObjectAttrs.CRC32C = crc
  105. w.ProgressFunc = func(bytes int64) {
  106. log.Printf("Uploading %s: %d/%d...", path, bytes, len(buf))
  107. }
  108. if n, err := w.Write(buf); err != nil {
  109. return fmt.Errorf("writing %s failed: %v", path, err)
  110. } else if n != len(buf) {
  111. return fmt.Errorf("partial write of %s: %d < %d", path, n, len(buf))
  112. }
  113. if err := w.Close(); err != nil {
  114. return fmt.Errorf("closing %s failed: %v", path, err)
  115. }
  116. return nil
  117. }