read.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. package vfs
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/rclone/rclone/fs"
  12. "github.com/rclone/rclone/fs/accounting"
  13. "github.com/rclone/rclone/fs/chunkedreader"
  14. "github.com/rclone/rclone/fs/hash"
  15. )
  16. // ReadFileHandle is an open for read file handle on a File
  17. type ReadFileHandle struct {
  18. baseHandle
  19. done func(ctx context.Context, err error)
  20. mu sync.Mutex
  21. cond sync.Cond // cond lock for out of sequence reads
  22. r *accounting.Account
  23. size int64 // size of the object (0 for unknown length)
  24. offset int64 // offset of read of o
  25. roffset int64 // offset of Read() calls
  26. file *File
  27. hash *hash.MultiHasher
  28. remote string
  29. closed bool // set if handle has been closed
  30. readCalled bool // set if read has been called
  31. noSeek bool
  32. sizeUnknown bool // set if size of source is not known
  33. opened bool
  34. }
  35. // Check interfaces
  36. var (
  37. _ io.Reader = (*ReadFileHandle)(nil)
  38. _ io.ReaderAt = (*ReadFileHandle)(nil)
  39. _ io.Seeker = (*ReadFileHandle)(nil)
  40. _ io.Closer = (*ReadFileHandle)(nil)
  41. )
  42. func newReadFileHandle(f *File) (*ReadFileHandle, error) {
  43. var mhash *hash.MultiHasher
  44. var err error
  45. o := f.getObject()
  46. if !f.VFS().Opt.NoChecksum {
  47. hashes := hash.NewHashSet(o.Fs().Hashes().GetOne()) // just pick one hash
  48. mhash, err = hash.NewMultiHasherTypes(hashes)
  49. if err != nil {
  50. fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err)
  51. }
  52. }
  53. fh := &ReadFileHandle{
  54. remote: o.Remote(),
  55. noSeek: f.VFS().Opt.NoSeek,
  56. file: f,
  57. hash: mhash,
  58. size: nonNegative(o.Size()),
  59. sizeUnknown: o.Size() < 0,
  60. }
  61. fh.cond = sync.Cond{L: &fh.mu}
  62. return fh, nil
  63. }
  64. // openPending opens the file if there is a pending open
  65. // call with the lock held
  66. func (fh *ReadFileHandle) openPending() (err error) {
  67. if fh.opened {
  68. return nil
  69. }
  70. o := fh.file.getObject()
  71. opt := &fh.file.VFS().Opt
  72. r, err := chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams).Open()
  73. if err != nil {
  74. return err
  75. }
  76. tr := accounting.GlobalStats().NewTransfer(o, nil)
  77. fh.done = tr.Done
  78. fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer
  79. fh.opened = true
  80. return nil
  81. }
  82. // String converts it to printable
  83. func (fh *ReadFileHandle) String() string {
  84. if fh == nil {
  85. return "<nil *ReadFileHandle>"
  86. }
  87. fh.mu.Lock()
  88. defer fh.mu.Unlock()
  89. if fh.file == nil {
  90. return "<nil *ReadFileHandle.file>"
  91. }
  92. return fh.file.String() + " (r)"
  93. }
  94. // Node returns the Node associated with this - satisfies Noder interface
  95. func (fh *ReadFileHandle) Node() Node {
  96. fh.mu.Lock()
  97. defer fh.mu.Unlock()
  98. return fh.file
  99. }
  100. // seek to a new offset
  101. //
  102. // if reopen is true, then we won't attempt to use an io.Seeker interface
  103. //
  104. // Must be called with fh.mu held
  105. func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
  106. if fh.noSeek {
  107. return ESPIPE
  108. }
  109. fh.hash = nil
  110. if !reopen {
  111. ar := fh.r.GetAsyncReader()
  112. // try to fulfill the seek with buffer discard
  113. if ar != nil && ar.SkipBytes(int(offset-fh.offset)) {
  114. fh.offset = offset
  115. return nil
  116. }
  117. }
  118. fh.r.StopBuffering() // stop the background reading first
  119. oldReader := fh.r.GetReader()
  120. r, ok := oldReader.(chunkedreader.ChunkedReader)
  121. if !ok {
  122. fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader)
  123. reopen = true
  124. }
  125. if !reopen {
  126. fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (fs.RangeSeeker)", fh.offset, offset)
  127. _, err = r.RangeSeek(context.TODO(), offset, io.SeekStart, -1)
  128. if err != nil {
  129. fs.Debugf(fh.remote, "ReadFileHandle.Read fs.RangeSeeker failed: %v", err)
  130. return err
  131. }
  132. } else {
  133. fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d", fh.offset, offset)
  134. // close old one
  135. err = oldReader.Close()
  136. if err != nil {
  137. fs.Debugf(fh.remote, "ReadFileHandle.Read seek close old failed: %v", err)
  138. }
  139. // re-open with a seek
  140. o := fh.file.getObject()
  141. opt := &fh.file.VFS().Opt
  142. r = chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams)
  143. _, err := r.Seek(offset, 0)
  144. if err != nil {
  145. fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
  146. return err
  147. }
  148. r, err = r.Open()
  149. if err != nil {
  150. fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
  151. return err
  152. }
  153. }
  154. fh.r.UpdateReader(context.TODO(), r)
  155. fh.offset = offset
  156. return nil
  157. }
  158. // Seek the file - returns ESPIPE if seeking isn't possible
  159. func (fh *ReadFileHandle) Seek(offset int64, whence int) (n int64, err error) {
  160. fh.mu.Lock()
  161. defer fh.mu.Unlock()
  162. if fh.noSeek {
  163. return 0, ESPIPE
  164. }
  165. size := fh.size
  166. switch whence {
  167. case io.SeekStart:
  168. fh.roffset = 0
  169. case io.SeekEnd:
  170. fh.roffset = size
  171. }
  172. fh.roffset += offset
  173. // we don't check the offset - the next Read will
  174. return fh.roffset, nil
  175. }
  176. // ReadAt reads len(p) bytes into p starting at offset off in the
  177. // underlying input source. It returns the number of bytes read (0 <=
  178. // n <= len(p)) and any error encountered.
  179. //
  180. // When ReadAt returns n < len(p), it returns a non-nil error
  181. // explaining why more bytes were not returned. In this respect,
  182. // ReadAt is stricter than Read.
  183. //
  184. // Even if ReadAt returns n < len(p), it may use all of p as scratch
  185. // space during the call. If some data is available but not len(p)
  186. // bytes, ReadAt blocks until either all the data is available or an
  187. // error occurs. In this respect ReadAt is different from Read.
  188. //
  189. // If the n = len(p) bytes returned by ReadAt are at the end of the
  190. // input source, ReadAt may return either err == EOF or err == nil.
  191. //
  192. // If ReadAt is reading from an input source with a seek offset,
  193. // ReadAt should not affect nor be affected by the underlying seek
  194. // offset.
  195. //
  196. // Clients of ReadAt can execute parallel ReadAt calls on the same
  197. // input source.
  198. //
  199. // Implementations must not retain p.
  200. func (fh *ReadFileHandle) ReadAt(p []byte, off int64) (n int, err error) {
  201. fh.mu.Lock()
  202. defer fh.mu.Unlock()
  203. return fh.readAt(p, off)
  204. }
  205. // This waits for *poff to equal off or aborts after the timeout.
  206. //
  207. // Waits here potentially affect all seeks so need to keep them short.
  208. //
  209. // Call with fh.mu Locked
  210. func waitSequential(what string, remote string, cond *sync.Cond, maxWait time.Duration, poff *int64, off int64) {
  211. var (
  212. timeout = time.NewTimer(maxWait)
  213. done = make(chan struct{})
  214. abort atomic.Int32
  215. )
  216. go func() {
  217. select {
  218. case <-timeout.C:
  219. // take the lock to make sure that cond.Wait() is called before
  220. // cond.Broadcast. NB cond.L == mu
  221. cond.L.Lock()
  222. // set abort flag and give all the waiting goroutines a kick on timeout
  223. abort.Store(1)
  224. fs.Debugf(remote, "aborting in-sequence %s wait, off=%d", what, off)
  225. cond.Broadcast()
  226. cond.L.Unlock()
  227. case <-done:
  228. }
  229. }()
  230. for *poff != off && abort.Load() == 0 {
  231. fs.Debugf(remote, "waiting for in-sequence %s to %d for %v", what, off, maxWait)
  232. cond.Wait()
  233. }
  234. // tidy up end timer
  235. close(done)
  236. timeout.Stop()
  237. if *poff != off {
  238. fs.Debugf(remote, "failed to wait for in-sequence %s to %d", what, off)
  239. }
  240. }
  241. // Implementation of ReadAt - call with lock held
  242. func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
  243. // defer log.Trace(fh.remote, "p[%d], off=%d", len(p), off)("n=%d, err=%v", &n, &err)
  244. err = fh.openPending() // FIXME pending open could be more efficient in the presence of seek (and retries)
  245. if err != nil {
  246. return 0, err
  247. }
  248. // fs.Debugf(fh.remote, "ReadFileHandle.Read size %d offset %d", reqSize, off)
  249. if fh.closed {
  250. fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF)
  251. return 0, ECLOSED
  252. }
  253. maxBuf := 1024 * 1024
  254. if len(p) < maxBuf {
  255. maxBuf = len(p)
  256. }
  257. if gap := off - fh.offset; gap > 0 && gap < int64(8*maxBuf) {
  258. waitSequential("read", fh.remote, &fh.cond, time.Duration(fh.file.VFS().Opt.ReadWait), &fh.offset, off)
  259. }
  260. doSeek := off != fh.offset
  261. if doSeek && fh.noSeek {
  262. return 0, ESPIPE
  263. }
  264. var newOffset int64
  265. retries := 0
  266. reqSize := len(p)
  267. doReopen := false
  268. lowLevelRetries := fs.GetConfig(context.TODO()).LowLevelRetries
  269. for {
  270. if doSeek {
  271. // Are we attempting to seek beyond the end of the
  272. // file - if so just return EOF leaving the underlying
  273. // file in an unchanged state.
  274. if off >= fh.size {
  275. fs.Debugf(fh.remote, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size)
  276. return 0, io.EOF
  277. }
  278. // Otherwise do the seek
  279. err = fh.seek(off, doReopen)
  280. } else {
  281. err = nil
  282. }
  283. if err == nil {
  284. if reqSize > 0 {
  285. fh.readCalled = true
  286. }
  287. n, err = io.ReadFull(fh.r, p)
  288. newOffset = fh.offset + int64(n)
  289. // if err == nil && rand.Intn(10) == 0 {
  290. // err = errors.New("random error")
  291. // }
  292. if err == nil {
  293. break
  294. } else if (err == io.ErrUnexpectedEOF || err == io.EOF) && (newOffset == fh.size || fh.sizeUnknown) {
  295. if fh.sizeUnknown {
  296. // size is now known since we have read to the end
  297. fh.sizeUnknown = false
  298. fh.size = newOffset
  299. }
  300. // Have read to end of file - reset error
  301. err = nil
  302. break
  303. }
  304. }
  305. if retries >= lowLevelRetries {
  306. break
  307. }
  308. retries++
  309. fs.Errorf(fh.remote, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, lowLevelRetries, err)
  310. doSeek = true
  311. doReopen = true
  312. }
  313. if err != nil {
  314. fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", err)
  315. } else {
  316. fh.offset = newOffset
  317. // fs.Debugf(fh.remote, "ReadFileHandle.Read OK")
  318. if fh.hash != nil {
  319. _, err = fh.hash.Write(p[:n])
  320. if err != nil {
  321. fs.Errorf(fh.remote, "ReadFileHandle.Read HashError: %v", err)
  322. return 0, err
  323. }
  324. }
  325. // If we have no error and we didn't fill the buffer, must be EOF
  326. if n != len(p) {
  327. err = io.EOF
  328. }
  329. }
  330. fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read
  331. return n, err
  332. }
  333. func (fh *ReadFileHandle) checkHash() error {
  334. if fh.hash == nil || !fh.readCalled || fh.offset < fh.size {
  335. return nil
  336. }
  337. o := fh.file.getObject()
  338. for hashType, dstSum := range fh.hash.Sums() {
  339. srcSum, err := o.Hash(context.TODO(), hashType)
  340. if err != nil {
  341. if errors.Is(err, os.ErrNotExist) {
  342. // if it was file not found then at
  343. // this point we don't care any more
  344. continue
  345. }
  346. return err
  347. }
  348. if !hash.Equals(dstSum, srcSum) {
  349. return fmt.Errorf("corrupted on transfer: %v hashes differ src %q vs dst %q", hashType, srcSum, dstSum)
  350. }
  351. }
  352. return nil
  353. }
  354. // Read reads up to len(p) bytes into p. It returns the number of bytes read (0
  355. // <= n <= len(p)) and any error encountered. Even if Read returns n < len(p),
  356. // it may use all of p as scratch space during the call. If some data is
  357. // available but not len(p) bytes, Read conventionally returns what is
  358. // available instead of waiting for more.
  359. //
  360. // When Read encounters an error or end-of-file condition after successfully
  361. // reading n > 0 bytes, it returns the number of bytes read. It may return the
  362. // (non-nil) error from the same call or return the error (and n == 0) from a
  363. // subsequent call. An instance of this general case is that a Reader returning
  364. // a non-zero number of bytes at the end of the input stream may return either
  365. // err == EOF or err == nil. The next Read should return 0, EOF.
  366. //
  367. // Callers should always process the n > 0 bytes returned before considering
  368. // the error err. Doing so correctly handles I/O errors that happen after
  369. // reading some bytes and also both of the allowed EOF behaviors.
  370. //
  371. // Implementations of Read are discouraged from returning a zero byte count
  372. // with a nil error, except when len(p) == 0. Callers should treat a return of
  373. // 0 and nil as indicating that nothing happened; in particular it does not
  374. // indicate EOF.
  375. //
  376. // Implementations must not retain p.
  377. func (fh *ReadFileHandle) Read(p []byte) (n int, err error) {
  378. fh.mu.Lock()
  379. defer fh.mu.Unlock()
  380. if fh.roffset >= fh.size && !fh.sizeUnknown {
  381. return 0, io.EOF
  382. }
  383. n, err = fh.readAt(p, fh.roffset)
  384. fh.roffset += int64(n)
  385. return n, err
  386. }
  387. // close the file handle returning EBADF if it has been
  388. // closed already.
  389. //
  390. // Must be called with fh.mu held
  391. func (fh *ReadFileHandle) close() error {
  392. if fh.closed {
  393. return ECLOSED
  394. }
  395. fh.closed = true
  396. if fh.opened {
  397. var err error
  398. defer func() {
  399. fh.done(context.TODO(), err)
  400. }()
  401. // Close first so that we have hashes
  402. err = fh.r.Close()
  403. if err != nil {
  404. return err
  405. }
  406. // Now check the hash
  407. err = fh.checkHash()
  408. if err != nil {
  409. return err
  410. }
  411. }
  412. return nil
  413. }
  414. // Close closes the file
  415. func (fh *ReadFileHandle) Close() error {
  416. fh.mu.Lock()
  417. defer fh.mu.Unlock()
  418. return fh.close()
  419. }
  420. // Flush is called each time the file or directory is closed.
  421. // Because there can be multiple file descriptors referring to a
  422. // single opened file, Flush can be called multiple times.
  423. func (fh *ReadFileHandle) Flush() error {
  424. fh.mu.Lock()
  425. defer fh.mu.Unlock()
  426. if !fh.opened {
  427. return nil
  428. }
  429. // fs.Debugf(fh.remote, "ReadFileHandle.Flush")
  430. if err := fh.checkHash(); err != nil {
  431. fs.Errorf(fh.remote, "ReadFileHandle.Flush error: %v", err)
  432. return err
  433. }
  434. // fs.Debugf(fh.remote, "ReadFileHandle.Flush OK")
  435. return nil
  436. }
  437. // Release is called when we are finished with the file handle
  438. //
  439. // It isn't called directly from userspace so the error is ignored by
  440. // the kernel
  441. func (fh *ReadFileHandle) Release() error {
  442. fh.mu.Lock()
  443. defer fh.mu.Unlock()
  444. if !fh.opened {
  445. return nil
  446. }
  447. if fh.closed {
  448. fs.Debugf(fh.remote, "ReadFileHandle.Release nothing to do")
  449. return nil
  450. }
  451. fs.Debugf(fh.remote, "ReadFileHandle.Release closing")
  452. err := fh.close()
  453. if err != nil {
  454. fs.Errorf(fh.remote, "ReadFileHandle.Release error: %v", err)
  455. //} else {
  456. // fs.Debugf(fh.remote, "ReadFileHandle.Release OK")
  457. }
  458. return err
  459. }
  460. // Name returns the name of the file from the underlying Object.
  461. func (fh *ReadFileHandle) Name() string {
  462. return fh.file.String()
  463. }
  464. // Size returns the size of the underlying file
  465. func (fh *ReadFileHandle) Size() int64 {
  466. fh.mu.Lock()
  467. defer fh.mu.Unlock()
  468. return fh.size
  469. }
  470. // Stat returns info about the file
  471. func (fh *ReadFileHandle) Stat() (os.FileInfo, error) {
  472. fh.mu.Lock()
  473. defer fh.mu.Unlock()
  474. return fh.file, nil
  475. }