upload.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Upload large files for sharefile
  2. //
  3. // Docs - https://api.sharefile.com/rest/docs/resource.aspx?name=Items#Upload_File
  4. package sharefile
  5. import (
  6. "bytes"
  7. "context"
  8. "crypto/md5"
  9. "encoding/hex"
  10. "encoding/json"
  11. "fmt"
  12. "io"
  13. "strings"
  14. "sync"
  15. "github.com/rclone/rclone/backend/sharefile/api"
  16. "github.com/rclone/rclone/fs"
  17. "github.com/rclone/rclone/fs/accounting"
  18. "github.com/rclone/rclone/lib/readers"
  19. "github.com/rclone/rclone/lib/rest"
  20. )
  21. // largeUpload is used to control the upload of large files which need chunking
  22. type largeUpload struct {
  23. ctx context.Context
  24. f *Fs // parent Fs
  25. o *Object // object being uploaded
  26. in io.Reader // read the data from here
  27. wrap accounting.WrapFn // account parts being transferred
  28. size int64 // total size
  29. parts int64 // calculated number of parts, if known
  30. info *api.UploadSpecification // where to post chunks, etc.
  31. threads int // number of threads to use in upload
  32. streamed bool // set if using streamed upload
  33. }
  34. // newLargeUpload starts an upload of object o from in with metadata in src
  35. func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, info *api.UploadSpecification) (up *largeUpload, err error) {
  36. size := src.Size()
  37. parts := int64(-1)
  38. if size >= 0 {
  39. parts = size / int64(o.fs.opt.ChunkSize)
  40. if size%int64(o.fs.opt.ChunkSize) != 0 {
  41. parts++
  42. }
  43. }
  44. var streamed bool
  45. switch strings.ToLower(info.Method) {
  46. case "streamed":
  47. streamed = true
  48. case "threaded":
  49. streamed = false
  50. default:
  51. return nil, fmt.Errorf("can't use method %q with newLargeUpload", info.Method)
  52. }
  53. threads := f.ci.Transfers
  54. if threads > info.MaxNumberOfThreads {
  55. threads = info.MaxNumberOfThreads
  56. }
  57. // unwrap the accounting from the input, we use wrap to put it
  58. // back on after the buffering
  59. in, wrap := accounting.UnWrap(in)
  60. up = &largeUpload{
  61. ctx: ctx,
  62. f: f,
  63. o: o,
  64. in: in,
  65. wrap: wrap,
  66. size: size,
  67. threads: threads,
  68. info: info,
  69. parts: parts,
  70. streamed: streamed,
  71. }
  72. return up, nil
  73. }
  74. // parse the api.UploadFinishResponse in respBody
  75. func (up *largeUpload) parseUploadFinishResponse(respBody []byte) (err error) {
  76. var finish api.UploadFinishResponse
  77. err = json.Unmarshal(respBody, &finish)
  78. if err != nil {
  79. // Sometimes the unmarshal fails in which case return the body
  80. return fmt.Errorf("upload: bad response: %q", bytes.TrimSpace(respBody))
  81. }
  82. return up.o.checkUploadResponse(up.ctx, &finish)
  83. }
  84. // Transfer a chunk
  85. func (up *largeUpload) transferChunk(ctx context.Context, part int64, offset int64, body []byte, fileHash string) error {
  86. md5sumRaw := md5.Sum(body)
  87. md5sum := hex.EncodeToString(md5sumRaw[:])
  88. size := int64(len(body))
  89. // Add some more parameters to the ChunkURI
  90. u := up.info.ChunkURI
  91. u += fmt.Sprintf("&index=%d&byteOffset=%d&hash=%s&fmt=json",
  92. part, offset, md5sum,
  93. )
  94. if fileHash != "" {
  95. u += fmt.Sprintf("&finish=true&fileSize=%d&fileHash=%s",
  96. offset+int64(len(body)),
  97. fileHash,
  98. )
  99. }
  100. opts := rest.Opts{
  101. Method: "POST",
  102. RootURL: u,
  103. ContentLength: &size,
  104. }
  105. var respBody []byte
  106. err := up.f.pacer.Call(func() (bool, error) {
  107. fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body))
  108. opts.Body = up.wrap(bytes.NewReader(body))
  109. resp, err := up.f.srv.Call(ctx, &opts)
  110. if err != nil {
  111. fs.Debugf(up.o, "Error sending chunk %d: %v", part, err)
  112. } else {
  113. respBody, err = rest.ReadBody(resp)
  114. }
  115. // retry all errors now that the multipart upload has started
  116. return err != nil, err
  117. })
  118. if err != nil {
  119. fs.Debugf(up.o, "Error sending chunk %d: %v", part, err)
  120. return err
  121. }
  122. // If last chunk and using "streamed" transfer, get the response back now
  123. if up.streamed && fileHash != "" {
  124. return up.parseUploadFinishResponse(respBody)
  125. }
  126. fs.Debugf(up.o, "Done sending chunk %d", part)
  127. return nil
  128. }
  129. // finish closes off the large upload and reads the metadata
  130. func (up *largeUpload) finish(ctx context.Context) error {
  131. fs.Debugf(up.o, "Finishing large file upload")
  132. // For a streamed transfer we will already have read the info
  133. if up.streamed {
  134. return nil
  135. }
  136. opts := rest.Opts{
  137. Method: "POST",
  138. RootURL: up.info.FinishURI,
  139. }
  140. var respBody []byte
  141. err := up.f.pacer.Call(func() (bool, error) {
  142. resp, err := up.f.srv.Call(ctx, &opts)
  143. if err != nil {
  144. return shouldRetry(ctx, resp, err)
  145. }
  146. respBody, err = rest.ReadBody(resp)
  147. // retry all errors now that the multipart upload has started
  148. return err != nil, err
  149. })
  150. if err != nil {
  151. return err
  152. }
  153. return up.parseUploadFinishResponse(respBody)
  154. }
  155. // Upload uploads the chunks from the input
  156. func (up *largeUpload) Upload(ctx context.Context) error {
  157. if up.parts >= 0 {
  158. fs.Debugf(up.o, "Starting upload of large file in %d chunks", up.parts)
  159. } else {
  160. fs.Debugf(up.o, "Starting streaming upload of large file")
  161. }
  162. var (
  163. offset int64
  164. errs = make(chan error, 1)
  165. wg sync.WaitGroup
  166. err error
  167. wholeFileHash = md5.New()
  168. eof = false
  169. )
  170. outer:
  171. for part := int64(0); !eof; part++ {
  172. // Check any errors
  173. select {
  174. case err = <-errs:
  175. break outer
  176. default:
  177. }
  178. // Get a block of memory
  179. buf := up.f.getUploadBlock()
  180. // Read the chunk
  181. var n int
  182. n, err = readers.ReadFill(up.in, buf)
  183. if err == io.EOF {
  184. eof = true
  185. buf = buf[:n]
  186. err = nil
  187. } else if err != nil {
  188. up.f.putUploadBlock(buf)
  189. break outer
  190. }
  191. // Hash it
  192. _, _ = io.Copy(wholeFileHash, bytes.NewBuffer(buf))
  193. // Get file hash if was last chunk
  194. fileHash := ""
  195. if eof {
  196. fileHash = hex.EncodeToString(wholeFileHash.Sum(nil))
  197. }
  198. // Transfer the chunk
  199. wg.Add(1)
  200. transferChunk := func(part, offset int64, buf []byte, fileHash string) {
  201. defer wg.Done()
  202. defer up.f.putUploadBlock(buf)
  203. err := up.transferChunk(ctx, part, offset, buf, fileHash)
  204. if err != nil {
  205. select {
  206. case errs <- err:
  207. default:
  208. }
  209. }
  210. }
  211. if up.streamed {
  212. transferChunk(part, offset, buf, fileHash) // streamed
  213. } else {
  214. go transferChunk(part, offset, buf, fileHash) // multithreaded
  215. }
  216. offset += int64(n)
  217. }
  218. wg.Wait()
  219. // check size read is correct
  220. if eof && err == nil && up.size >= 0 && up.size != offset {
  221. err = fmt.Errorf("upload: short read: read %d bytes expected %d", up.size, offset)
  222. }
  223. // read any errors
  224. if err == nil {
  225. select {
  226. case err = <-errs:
  227. default:
  228. }
  229. }
  230. // finish regardless of errors
  231. finishErr := up.finish(ctx)
  232. if err == nil {
  233. err = finishErr
  234. }
  235. return err
  236. }