upload.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. // multipart upload for box
  2. package box
  3. import (
  4. "bytes"
  5. "context"
  6. "crypto/sha1"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "net/http"
  13. "strconv"
  14. "sync"
  15. "time"
  16. "github.com/rclone/rclone/backend/box/api"
  17. "github.com/rclone/rclone/fs"
  18. "github.com/rclone/rclone/fs/accounting"
  19. "github.com/rclone/rclone/lib/atexit"
  20. "github.com/rclone/rclone/lib/rest"
  21. )
  22. // createUploadSession creates an upload session for the object
  23. func (o *Object) createUploadSession(ctx context.Context, leaf, directoryID string, size int64) (response *api.UploadSessionResponse, err error) {
  24. opts := rest.Opts{
  25. Method: "POST",
  26. Path: "/files/upload_sessions",
  27. RootURL: uploadURL,
  28. }
  29. request := api.UploadSessionRequest{
  30. FileSize: size,
  31. }
  32. // If object has an ID then it is existing so create a new version
  33. if o.id != "" {
  34. opts.Path = "/files/" + o.id + "/upload_sessions"
  35. } else {
  36. opts.Path = "/files/upload_sessions"
  37. request.FolderID = directoryID
  38. request.FileName = o.fs.opt.Enc.FromStandardName(leaf)
  39. }
  40. var resp *http.Response
  41. err = o.fs.pacer.Call(func() (bool, error) {
  42. resp, err = o.fs.srv.CallJSON(ctx, &opts, &request, &response)
  43. return shouldRetry(ctx, resp, err)
  44. })
  45. return
  46. }
  47. // sha1Digest produces a digest using sha1 as per RFC3230
  48. func sha1Digest(digest []byte) string {
  49. return "sha=" + base64.StdEncoding.EncodeToString(digest)
  50. }
  51. // uploadPart uploads a part in an upload session
  52. func (o *Object) uploadPart(ctx context.Context, SessionID string, offset, totalSize int64, chunk []byte, wrap accounting.WrapFn, options ...fs.OpenOption) (response *api.UploadPartResponse, err error) {
  53. chunkSize := int64(len(chunk))
  54. sha1sum := sha1.Sum(chunk)
  55. opts := rest.Opts{
  56. Method: "PUT",
  57. Path: "/files/upload_sessions/" + SessionID,
  58. RootURL: uploadURL,
  59. ContentType: "application/octet-stream",
  60. ContentLength: &chunkSize,
  61. ContentRange: fmt.Sprintf("bytes %d-%d/%d", offset, offset+chunkSize-1, totalSize),
  62. Options: options,
  63. ExtraHeaders: map[string]string{
  64. "Digest": sha1Digest(sha1sum[:]),
  65. },
  66. }
  67. var resp *http.Response
  68. err = o.fs.pacer.Call(func() (bool, error) {
  69. opts.Body = wrap(bytes.NewReader(chunk))
  70. resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &response)
  71. return shouldRetry(ctx, resp, err)
  72. })
  73. if err != nil {
  74. return nil, err
  75. }
  76. return response, nil
  77. }
  78. // commitUpload finishes an upload session
  79. func (o *Object) commitUpload(ctx context.Context, SessionID string, parts []api.Part, modTime time.Time, sha1sum []byte) (result *api.FolderItems, err error) {
  80. opts := rest.Opts{
  81. Method: "POST",
  82. Path: "/files/upload_sessions/" + SessionID + "/commit",
  83. RootURL: uploadURL,
  84. ExtraHeaders: map[string]string{
  85. "Digest": sha1Digest(sha1sum),
  86. },
  87. }
  88. request := api.CommitUpload{
  89. Parts: parts,
  90. }
  91. request.Attributes.ContentModifiedAt = api.Time(modTime)
  92. request.Attributes.ContentCreatedAt = api.Time(modTime)
  93. var body []byte
  94. var resp *http.Response
  95. // For discussion of this value see:
  96. // https://github.com/rclone/rclone/issues/2054
  97. maxTries := o.fs.opt.CommitRetries
  98. const defaultDelay = 10
  99. var tries int
  100. outer:
  101. for tries = 0; tries < maxTries; tries++ {
  102. err = o.fs.pacer.Call(func() (bool, error) {
  103. resp, err = o.fs.srv.CallJSON(ctx, &opts, &request, nil)
  104. if err != nil {
  105. return shouldRetry(ctx, resp, err)
  106. }
  107. body, err = rest.ReadBody(resp)
  108. return shouldRetry(ctx, resp, err)
  109. })
  110. delay := defaultDelay
  111. var why string
  112. if err != nil {
  113. // Sometimes we get 400 Error with
  114. // parts_mismatch immediately after uploading
  115. // the last part. Ignore this error and wait.
  116. if boxErr, ok := err.(*api.Error); ok && boxErr.Code == "parts_mismatch" {
  117. why = err.Error()
  118. } else {
  119. return nil, err
  120. }
  121. } else {
  122. switch resp.StatusCode {
  123. case http.StatusOK, http.StatusCreated:
  124. break outer
  125. case http.StatusAccepted:
  126. why = "not ready yet"
  127. delayString := resp.Header.Get("Retry-After")
  128. if delayString != "" {
  129. delay, err = strconv.Atoi(delayString)
  130. if err != nil {
  131. fs.Debugf(o, "Couldn't decode Retry-After header %q: %v", delayString, err)
  132. delay = defaultDelay
  133. }
  134. }
  135. default:
  136. return nil, fmt.Errorf("unknown HTTP status return %q (%d)", resp.Status, resp.StatusCode)
  137. }
  138. }
  139. fs.Debugf(o, "commit multipart upload failed %d/%d - trying again in %d seconds (%s)", tries+1, maxTries, delay, why)
  140. time.Sleep(time.Duration(delay) * time.Second)
  141. }
  142. if tries >= maxTries {
  143. return nil, errors.New("too many tries to commit multipart upload - increase --low-level-retries")
  144. }
  145. err = json.Unmarshal(body, &result)
  146. if err != nil {
  147. return nil, fmt.Errorf("couldn't decode commit response: %q: %w", body, err)
  148. }
  149. return result, nil
  150. }
  151. // abortUpload cancels an upload session
  152. func (o *Object) abortUpload(ctx context.Context, SessionID string) (err error) {
  153. opts := rest.Opts{
  154. Method: "DELETE",
  155. Path: "/files/upload_sessions/" + SessionID,
  156. RootURL: uploadURL,
  157. NoResponse: true,
  158. }
  159. var resp *http.Response
  160. err = o.fs.pacer.Call(func() (bool, error) {
  161. resp, err = o.fs.srv.Call(ctx, &opts)
  162. return shouldRetry(ctx, resp, err)
  163. })
  164. return err
  165. }
  166. // uploadMultipart uploads a file using multipart upload
  167. func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, leaf, directoryID string, size int64, modTime time.Time, options ...fs.OpenOption) (err error) {
  168. // Create upload session
  169. session, err := o.createUploadSession(ctx, leaf, directoryID, size)
  170. if err != nil {
  171. return fmt.Errorf("multipart upload create session failed: %w", err)
  172. }
  173. chunkSize := session.PartSize
  174. fs.Debugf(o, "Multipart upload session started for %d parts of size %v", session.TotalParts, fs.SizeSuffix(chunkSize))
  175. // Cancel the session if something went wrong
  176. defer atexit.OnError(&err, func() {
  177. fs.Debugf(o, "Cancelling multipart upload: %v", err)
  178. cancelErr := o.abortUpload(ctx, session.ID)
  179. if cancelErr != nil {
  180. fs.Logf(o, "Failed to cancel multipart upload: %v", cancelErr)
  181. }
  182. })()
  183. // unwrap the accounting from the input, we use wrap to put it
  184. // back on after the buffering
  185. in, wrap := accounting.UnWrap(in)
  186. // Upload the chunks
  187. remaining := size
  188. position := int64(0)
  189. parts := make([]api.Part, session.TotalParts)
  190. hash := sha1.New()
  191. errs := make(chan error, 1)
  192. var wg sync.WaitGroup
  193. outer:
  194. for part := 0; part < session.TotalParts; part++ {
  195. // Check any errors
  196. select {
  197. case err = <-errs:
  198. break outer
  199. default:
  200. }
  201. reqSize := remaining
  202. if reqSize >= chunkSize {
  203. reqSize = chunkSize
  204. }
  205. // Make a block of memory
  206. buf := make([]byte, reqSize)
  207. // Read the chunk
  208. _, err = io.ReadFull(in, buf)
  209. if err != nil {
  210. err = fmt.Errorf("multipart upload failed to read source: %w", err)
  211. break outer
  212. }
  213. // Make the global hash (must be done sequentially)
  214. _, _ = hash.Write(buf)
  215. // Transfer the chunk
  216. wg.Add(1)
  217. o.fs.uploadToken.Get()
  218. go func(part int, position int64) {
  219. defer wg.Done()
  220. defer o.fs.uploadToken.Put()
  221. fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
  222. partResponse, err := o.uploadPart(ctx, session.ID, position, size, buf, wrap, options...)
  223. if err != nil {
  224. err = fmt.Errorf("multipart upload failed to upload part: %w", err)
  225. select {
  226. case errs <- err:
  227. default:
  228. }
  229. return
  230. }
  231. parts[part] = partResponse.Part
  232. }(part, position)
  233. // ready for next block
  234. remaining -= chunkSize
  235. position += chunkSize
  236. }
  237. wg.Wait()
  238. if err == nil {
  239. select {
  240. case err = <-errs:
  241. default:
  242. }
  243. }
  244. if err != nil {
  245. return err
  246. }
  247. // Finalise the upload session
  248. result, err := o.commitUpload(ctx, session.ID, parts, modTime, hash.Sum(nil))
  249. if err != nil {
  250. return fmt.Errorf("multipart upload failed to finalize: %w", err)
  251. }
  252. if result.TotalCount != 1 || len(result.Entries) != 1 {
  253. return fmt.Errorf("multipart upload failed %v - not sure why", o)
  254. }
  255. return o.setMetaData(&result.Entries[0])
  256. }