123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- // multipart upload for box
- package box
- import (
- "bytes"
- "context"
- "crypto/sha1"
- "encoding/base64"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "strconv"
- "sync"
- "time"
- "github.com/rclone/rclone/backend/box/api"
- "github.com/rclone/rclone/fs"
- "github.com/rclone/rclone/fs/accounting"
- "github.com/rclone/rclone/lib/atexit"
- "github.com/rclone/rclone/lib/rest"
- )
- // createUploadSession creates an upload session for the object
- func (o *Object) createUploadSession(ctx context.Context, leaf, directoryID string, size int64) (response *api.UploadSessionResponse, err error) {
- opts := rest.Opts{
- Method: "POST",
- Path: "/files/upload_sessions",
- RootURL: uploadURL,
- }
- request := api.UploadSessionRequest{
- FileSize: size,
- }
- // If object has an ID then it is existing so create a new version
- if o.id != "" {
- opts.Path = "/files/" + o.id + "/upload_sessions"
- } else {
- opts.Path = "/files/upload_sessions"
- request.FolderID = directoryID
- request.FileName = o.fs.opt.Enc.FromStandardName(leaf)
- }
- var resp *http.Response
- err = o.fs.pacer.Call(func() (bool, error) {
- resp, err = o.fs.srv.CallJSON(ctx, &opts, &request, &response)
- return shouldRetry(ctx, resp, err)
- })
- return
- }
- // sha1Digest produces a digest using sha1 as per RFC3230
- func sha1Digest(digest []byte) string {
- return "sha=" + base64.StdEncoding.EncodeToString(digest)
- }
- // uploadPart uploads a part in an upload session
- func (o *Object) uploadPart(ctx context.Context, SessionID string, offset, totalSize int64, chunk []byte, wrap accounting.WrapFn, options ...fs.OpenOption) (response *api.UploadPartResponse, err error) {
- chunkSize := int64(len(chunk))
- sha1sum := sha1.Sum(chunk)
- opts := rest.Opts{
- Method: "PUT",
- Path: "/files/upload_sessions/" + SessionID,
- RootURL: uploadURL,
- ContentType: "application/octet-stream",
- ContentLength: &chunkSize,
- ContentRange: fmt.Sprintf("bytes %d-%d/%d", offset, offset+chunkSize-1, totalSize),
- Options: options,
- ExtraHeaders: map[string]string{
- "Digest": sha1Digest(sha1sum[:]),
- },
- }
- var resp *http.Response
- err = o.fs.pacer.Call(func() (bool, error) {
- opts.Body = wrap(bytes.NewReader(chunk))
- resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &response)
- return shouldRetry(ctx, resp, err)
- })
- if err != nil {
- return nil, err
- }
- return response, nil
- }
- // commitUpload finishes an upload session
- func (o *Object) commitUpload(ctx context.Context, SessionID string, parts []api.Part, modTime time.Time, sha1sum []byte) (result *api.FolderItems, err error) {
- opts := rest.Opts{
- Method: "POST",
- Path: "/files/upload_sessions/" + SessionID + "/commit",
- RootURL: uploadURL,
- ExtraHeaders: map[string]string{
- "Digest": sha1Digest(sha1sum),
- },
- }
- request := api.CommitUpload{
- Parts: parts,
- }
- request.Attributes.ContentModifiedAt = api.Time(modTime)
- request.Attributes.ContentCreatedAt = api.Time(modTime)
- var body []byte
- var resp *http.Response
- // For discussion of this value see:
- // https://github.com/rclone/rclone/issues/2054
- maxTries := o.fs.opt.CommitRetries
- const defaultDelay = 10
- var tries int
- outer:
- for tries = 0; tries < maxTries; tries++ {
- err = o.fs.pacer.Call(func() (bool, error) {
- resp, err = o.fs.srv.CallJSON(ctx, &opts, &request, nil)
- if err != nil {
- return shouldRetry(ctx, resp, err)
- }
- body, err = rest.ReadBody(resp)
- return shouldRetry(ctx, resp, err)
- })
- delay := defaultDelay
- var why string
- if err != nil {
- // Sometimes we get 400 Error with
- // parts_mismatch immediately after uploading
- // the last part. Ignore this error and wait.
- if boxErr, ok := err.(*api.Error); ok && boxErr.Code == "parts_mismatch" {
- why = err.Error()
- } else {
- return nil, err
- }
- } else {
- switch resp.StatusCode {
- case http.StatusOK, http.StatusCreated:
- break outer
- case http.StatusAccepted:
- why = "not ready yet"
- delayString := resp.Header.Get("Retry-After")
- if delayString != "" {
- delay, err = strconv.Atoi(delayString)
- if err != nil {
- fs.Debugf(o, "Couldn't decode Retry-After header %q: %v", delayString, err)
- delay = defaultDelay
- }
- }
- default:
- return nil, fmt.Errorf("unknown HTTP status return %q (%d)", resp.Status, resp.StatusCode)
- }
- }
- fs.Debugf(o, "commit multipart upload failed %d/%d - trying again in %d seconds (%s)", tries+1, maxTries, delay, why)
- time.Sleep(time.Duration(delay) * time.Second)
- }
- if tries >= maxTries {
- return nil, errors.New("too many tries to commit multipart upload - increase --low-level-retries")
- }
- err = json.Unmarshal(body, &result)
- if err != nil {
- return nil, fmt.Errorf("couldn't decode commit response: %q: %w", body, err)
- }
- return result, nil
- }
- // abortUpload cancels an upload session
- func (o *Object) abortUpload(ctx context.Context, SessionID string) (err error) {
- opts := rest.Opts{
- Method: "DELETE",
- Path: "/files/upload_sessions/" + SessionID,
- RootURL: uploadURL,
- NoResponse: true,
- }
- var resp *http.Response
- err = o.fs.pacer.Call(func() (bool, error) {
- resp, err = o.fs.srv.Call(ctx, &opts)
- return shouldRetry(ctx, resp, err)
- })
- return err
- }
- // uploadMultipart uploads a file using multipart upload
- func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, leaf, directoryID string, size int64, modTime time.Time, options ...fs.OpenOption) (err error) {
- // Create upload session
- session, err := o.createUploadSession(ctx, leaf, directoryID, size)
- if err != nil {
- return fmt.Errorf("multipart upload create session failed: %w", err)
- }
- chunkSize := session.PartSize
- fs.Debugf(o, "Multipart upload session started for %d parts of size %v", session.TotalParts, fs.SizeSuffix(chunkSize))
- // Cancel the session if something went wrong
- defer atexit.OnError(&err, func() {
- fs.Debugf(o, "Cancelling multipart upload: %v", err)
- cancelErr := o.abortUpload(ctx, session.ID)
- if cancelErr != nil {
- fs.Logf(o, "Failed to cancel multipart upload: %v", cancelErr)
- }
- })()
- // unwrap the accounting from the input, we use wrap to put it
- // back on after the buffering
- in, wrap := accounting.UnWrap(in)
- // Upload the chunks
- remaining := size
- position := int64(0)
- parts := make([]api.Part, session.TotalParts)
- hash := sha1.New()
- errs := make(chan error, 1)
- var wg sync.WaitGroup
- outer:
- for part := 0; part < session.TotalParts; part++ {
- // Check any errors
- select {
- case err = <-errs:
- break outer
- default:
- }
- reqSize := remaining
- if reqSize >= chunkSize {
- reqSize = chunkSize
- }
- // Make a block of memory
- buf := make([]byte, reqSize)
- // Read the chunk
- _, err = io.ReadFull(in, buf)
- if err != nil {
- err = fmt.Errorf("multipart upload failed to read source: %w", err)
- break outer
- }
- // Make the global hash (must be done sequentially)
- _, _ = hash.Write(buf)
- // Transfer the chunk
- wg.Add(1)
- o.fs.uploadToken.Get()
- go func(part int, position int64) {
- defer wg.Done()
- defer o.fs.uploadToken.Put()
- fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
- partResponse, err := o.uploadPart(ctx, session.ID, position, size, buf, wrap, options...)
- if err != nil {
- err = fmt.Errorf("multipart upload failed to upload part: %w", err)
- select {
- case errs <- err:
- default:
- }
- return
- }
- parts[part] = partResponse.Part
- }(part, position)
- // ready for next block
- remaining -= chunkSize
- position += chunkSize
- }
- wg.Wait()
- if err == nil {
- select {
- case err = <-errs:
- default:
- }
- }
- if err != nil {
- return err
- }
- // Finalise the upload session
- result, err := o.commitUpload(ctx, session.ID, parts, modTime, hash.Sum(nil))
- if err != nil {
- return fmt.Errorf("multipart upload failed to finalize: %w", err)
- }
- if result.TotalCount != 1 || len(result.Entries) != 1 {
- return fmt.Errorf("multipart upload failed %v - not sure why", o)
- }
- return o.setMetaData(&result.Entries[0])
- }
|