download.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package oss
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "hash"
  8. "hash/crc64"
  9. "io"
  10. "io/ioutil"
  11. "os"
  12. "strconv"
  13. )
  14. // DownloadFile downloads files with multipart download.
  15. //
  16. // objectKey the object key.
  17. // filePath the local file to download from objectKey in OSS.
  18. // partSize the part size in bytes.
  19. // options object's constraints, check out GetObject for the reference.
  20. //
  21. // error it's nil when the call succeeds, otherwise it's an error object.
  22. //
  23. func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
  24. if partSize < 1 {
  25. return errors.New("oss: part size smaller than 1")
  26. }
  27. cpConf, err := getCpConfig(options, filePath)
  28. if err != nil {
  29. return err
  30. }
  31. uRange, err := getRangeConfig(options)
  32. if err != nil {
  33. return err
  34. }
  35. routines := getRoutines(options)
  36. if cpConf.IsEnable {
  37. return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpConf.FilePath, routines, uRange)
  38. }
  39. return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
  40. }
  41. // getRangeConfig gets the download range from the options.
  42. func getRangeConfig(options []Option) (*unpackedRange, error) {
  43. rangeOpt, err := findOption(options, HTTPHeaderRange, nil)
  44. if err != nil || rangeOpt == nil {
  45. return nil, err
  46. }
  47. return parseRange(rangeOpt.(string))
  48. }
  49. // ----- concurrent download without checkpoint -----
  50. // downloadWorkerArg is download worker's parameters
  51. type downloadWorkerArg struct {
  52. bucket *Bucket
  53. key string
  54. filePath string
  55. options []Option
  56. hook downloadPartHook
  57. enableCRC bool
  58. }
  59. // downloadPartHook is hook for test
  60. type downloadPartHook func(part downloadPart) error
  61. var downloadPartHooker downloadPartHook = defaultDownloadPartHook
  62. func defaultDownloadPartHook(part downloadPart) error {
  63. return nil
  64. }
  65. // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
  66. type defaultDownloadProgressListener struct {
  67. }
  68. // ProgressChanged no-ops
  69. func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
  70. }
  71. // downloadWorker
  72. func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
  73. for part := range jobs {
  74. if err := arg.hook(part); err != nil {
  75. failed <- err
  76. break
  77. }
  78. // Resolve options
  79. r := Range(part.Start, part.End)
  80. p := Progress(&defaultDownloadProgressListener{})
  81. opts := make([]Option, len(arg.options)+2)
  82. // Append orderly, can not be reversed!
  83. opts = append(opts, arg.options...)
  84. opts = append(opts, r, p)
  85. rd, err := arg.bucket.GetObject(arg.key, opts...)
  86. if err != nil {
  87. failed <- err
  88. break
  89. }
  90. // defer rd.Close()
  91. var crcCalc hash.Hash64
  92. if arg.enableCRC {
  93. crcCalc = crc64.New(crcTable())
  94. contentLen := part.End - part.Start + 1
  95. rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
  96. }
  97. // defer rd.Close()
  98. select {
  99. case <-die:
  100. return
  101. default:
  102. }
  103. fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
  104. if err != nil {
  105. failed <- err
  106. break
  107. }
  108. _, err = fd.Seek(part.Start-part.Offset, io.SeekStart)
  109. if err != nil {
  110. fd.Close()
  111. failed <- err
  112. break
  113. }
  114. _, err = io.Copy(fd, rd)
  115. if err != nil {
  116. fd.Close()
  117. failed <- err
  118. break
  119. }
  120. rd.Close()
  121. if arg.enableCRC {
  122. part.CRC64 = crcCalc.Sum64()
  123. }
  124. fd.Close()
  125. results <- part
  126. }
  127. }
  128. // downloadScheduler
  129. func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
  130. for _, part := range parts {
  131. jobs <- part
  132. }
  133. close(jobs)
  134. }
  135. // downloadPart defines download part
  136. type downloadPart struct {
  137. Index int // Part number, starting from 0
  138. Start int64 // Start index
  139. End int64 // End index
  140. Offset int64 // Offset
  141. CRC64 uint64 // CRC check value of part
  142. }
  143. // getDownloadParts gets download parts
  144. func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, bool, uint64, error) {
  145. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  146. if err != nil {
  147. return nil, false, 0, err
  148. }
  149. parts := []downloadPart{}
  150. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  151. if err != nil {
  152. return nil, false, 0, err
  153. }
  154. enableCRC := false
  155. crcVal := (uint64)(0)
  156. if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
  157. if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
  158. enableCRC = true
  159. crcVal, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
  160. }
  161. }
  162. part := downloadPart{}
  163. i := 0
  164. start, end := adjustRange(uRange, objectSize)
  165. for offset := start; offset < end; offset += partSize {
  166. part.Index = i
  167. part.Start = offset
  168. part.End = GetPartEnd(offset, end, partSize)
  169. part.Offset = start
  170. part.CRC64 = 0
  171. parts = append(parts, part)
  172. i++
  173. }
  174. return parts, enableCRC, crcVal, nil
  175. }
  176. // getObjectBytes gets object bytes length
  177. func getObjectBytes(parts []downloadPart) int64 {
  178. var ob int64
  179. for _, part := range parts {
  180. ob += (part.End - part.Start + 1)
  181. }
  182. return ob
  183. }
  184. // combineCRCInParts caculates the total CRC of continuous parts
  185. func combineCRCInParts(dps []downloadPart) uint64 {
  186. if len(dps) == 0 {
  187. return 0
  188. }
  189. crc := dps[0].CRC64
  190. for i := 1; i < len(dps); i++ {
  191. crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
  192. }
  193. return crc
  194. }
  195. // downloadFile downloads file concurrently without checkpoint.
  196. func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
  197. tempFilePath := filePath + TempFileSuffix
  198. listener := getProgressListener(options)
  199. // If the file does not exist, create one. If exists, the download will overwrite it.
  200. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  201. if err != nil {
  202. return err
  203. }
  204. fd.Close()
  205. // Get the parts of the file
  206. parts, enableCRC, expectedCRC, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
  207. if err != nil {
  208. return err
  209. }
  210. jobs := make(chan downloadPart, len(parts))
  211. results := make(chan downloadPart, len(parts))
  212. failed := make(chan error)
  213. die := make(chan bool)
  214. var completedBytes int64
  215. totalBytes := getObjectBytes(parts)
  216. event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
  217. publishProgress(listener, event)
  218. // Start the download workers
  219. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
  220. for w := 1; w <= routines; w++ {
  221. go downloadWorker(w, arg, jobs, results, failed, die)
  222. }
  223. // Download parts concurrently
  224. go downloadScheduler(jobs, parts)
  225. // Waiting for parts download finished
  226. completed := 0
  227. for completed < len(parts) {
  228. select {
  229. case part := <-results:
  230. completed++
  231. completedBytes += (part.End - part.Start + 1)
  232. parts[part.Index].CRC64 = part.CRC64
  233. event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
  234. publishProgress(listener, event)
  235. case err = <-failed:
  236. close(die)
  237. event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes)
  238. publishProgress(listener, event)
  239. return err
  240. }
  241. if completed >= len(parts) {
  242. break
  243. }
  244. }
  245. event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
  246. publishProgress(listener, event)
  247. if enableCRC {
  248. actualCRC := combineCRCInParts(parts)
  249. err = checkDownloadCRC(actualCRC, expectedCRC)
  250. if err != nil {
  251. return err
  252. }
  253. }
  254. return os.Rename(tempFilePath, filePath)
  255. }
  256. // ----- Concurrent download with chcekpoint -----
  257. const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
  258. type downloadCheckpoint struct {
  259. Magic string // Magic
  260. MD5 string // Checkpoint content MD5
  261. FilePath string // Local file
  262. Object string // Key
  263. ObjStat objectStat // Object status
  264. Parts []downloadPart // All download parts
  265. PartStat []bool // Parts' download status
  266. Start int64 // Start point of the file
  267. End int64 // End point of the file
  268. enableCRC bool // Whether has CRC check
  269. CRC uint64 // CRC check value
  270. }
  271. type objectStat struct {
  272. Size int64 // Object size
  273. LastModified string // Last modified time
  274. Etag string // Etag
  275. }
  276. // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
  277. func (cp downloadCheckpoint) isValid(bucket *Bucket, objectKey string, uRange *unpackedRange) (bool, error) {
  278. // Compare the CP's Magic and the MD5
  279. cpb := cp
  280. cpb.MD5 = ""
  281. js, _ := json.Marshal(cpb)
  282. sum := md5.Sum(js)
  283. b64 := base64.StdEncoding.EncodeToString(sum[:])
  284. if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
  285. return false, nil
  286. }
  287. // Ensure the object is not updated.
  288. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  289. if err != nil {
  290. return false, err
  291. }
  292. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  293. if err != nil {
  294. return false, err
  295. }
  296. // Compare the object size, last modified time and etag
  297. if cp.ObjStat.Size != objectSize ||
  298. cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
  299. cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
  300. return false, nil
  301. }
  302. // Check the download range
  303. if uRange != nil {
  304. start, end := adjustRange(uRange, objectSize)
  305. if start != cp.Start || end != cp.End {
  306. return false, nil
  307. }
  308. }
  309. return true, nil
  310. }
  311. // load checkpoint from local file
  312. func (cp *downloadCheckpoint) load(filePath string) error {
  313. contents, err := ioutil.ReadFile(filePath)
  314. if err != nil {
  315. return err
  316. }
  317. err = json.Unmarshal(contents, cp)
  318. return err
  319. }
  320. // dump funciton dumps to file
  321. func (cp *downloadCheckpoint) dump(filePath string) error {
  322. bcp := *cp
  323. // Calculate MD5
  324. bcp.MD5 = ""
  325. js, err := json.Marshal(bcp)
  326. if err != nil {
  327. return err
  328. }
  329. sum := md5.Sum(js)
  330. b64 := base64.StdEncoding.EncodeToString(sum[:])
  331. bcp.MD5 = b64
  332. // Serialize
  333. js, err = json.Marshal(bcp)
  334. if err != nil {
  335. return err
  336. }
  337. // Dump
  338. return ioutil.WriteFile(filePath, js, FilePermMode)
  339. }
  340. // todoParts gets unfinished parts
  341. func (cp downloadCheckpoint) todoParts() []downloadPart {
  342. dps := []downloadPart{}
  343. for i, ps := range cp.PartStat {
  344. if !ps {
  345. dps = append(dps, cp.Parts[i])
  346. }
  347. }
  348. return dps
  349. }
  350. // getCompletedBytes gets completed size
  351. func (cp downloadCheckpoint) getCompletedBytes() int64 {
  352. var completedBytes int64
  353. for i, part := range cp.Parts {
  354. if cp.PartStat[i] {
  355. completedBytes += (part.End - part.Start + 1)
  356. }
  357. }
  358. return completedBytes
  359. }
  360. // prepare initiates download tasks
  361. func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string, partSize int64, uRange *unpackedRange) error {
  362. // CP
  363. cp.Magic = downloadCpMagic
  364. cp.FilePath = filePath
  365. cp.Object = objectKey
  366. // Object
  367. meta, err := bucket.GetObjectDetailedMeta(objectKey)
  368. if err != nil {
  369. return err
  370. }
  371. objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
  372. if err != nil {
  373. return err
  374. }
  375. cp.ObjStat.Size = objectSize
  376. cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
  377. cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
  378. // Parts
  379. cp.Parts, cp.enableCRC, cp.CRC, err = getDownloadParts(bucket, objectKey, partSize, uRange)
  380. if err != nil {
  381. return err
  382. }
  383. cp.PartStat = make([]bool, len(cp.Parts))
  384. for i := range cp.PartStat {
  385. cp.PartStat[i] = false
  386. }
  387. return nil
  388. }
  389. func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
  390. os.Remove(cpFilePath)
  391. return os.Rename(downFilepath, cp.FilePath)
  392. }
  393. // downloadFileWithCp downloads files with checkpoint.
  394. func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *unpackedRange) error {
  395. tempFilePath := filePath + TempFileSuffix
  396. listener := getProgressListener(options)
  397. // Load checkpoint data.
  398. dcp := downloadCheckpoint{}
  399. err := dcp.load(cpFilePath)
  400. if err != nil {
  401. os.Remove(cpFilePath)
  402. }
  403. // Load error or data invalid. Re-initialize the download.
  404. valid, err := dcp.isValid(&bucket, objectKey, uRange)
  405. if err != nil || !valid {
  406. if err = dcp.prepare(&bucket, objectKey, filePath, partSize, uRange); err != nil {
  407. return err
  408. }
  409. os.Remove(cpFilePath)
  410. }
  411. // Create the file if not exists. Otherwise the parts download will overwrite it.
  412. fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
  413. if err != nil {
  414. return err
  415. }
  416. fd.Close()
  417. // Unfinished parts
  418. parts := dcp.todoParts()
  419. jobs := make(chan downloadPart, len(parts))
  420. results := make(chan downloadPart, len(parts))
  421. failed := make(chan error)
  422. die := make(chan bool)
  423. completedBytes := dcp.getCompletedBytes()
  424. event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size)
  425. publishProgress(listener, event)
  426. // Start the download workers routine
  427. arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
  428. for w := 1; w <= routines; w++ {
  429. go downloadWorker(w, arg, jobs, results, failed, die)
  430. }
  431. // Concurrently downloads parts
  432. go downloadScheduler(jobs, parts)
  433. // Wait for the parts download finished
  434. completed := 0
  435. for completed < len(parts) {
  436. select {
  437. case part := <-results:
  438. completed++
  439. dcp.PartStat[part.Index] = true
  440. dcp.Parts[part.Index].CRC64 = part.CRC64
  441. dcp.dump(cpFilePath)
  442. completedBytes += (part.End - part.Start + 1)
  443. event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
  444. publishProgress(listener, event)
  445. case err = <-failed:
  446. close(die)
  447. event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size)
  448. publishProgress(listener, event)
  449. return err
  450. }
  451. if completed >= len(parts) {
  452. break
  453. }
  454. }
  455. event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
  456. publishProgress(listener, event)
  457. if dcp.enableCRC {
  458. actualCRC := combineCRCInParts(dcp.Parts)
  459. err = checkDownloadCRC(actualCRC, dcp.CRC)
  460. if err != nil {
  461. return err
  462. }
  463. }
  464. return dcp.complete(cpFilePath, tempFilePath)
  465. }