item.go 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454
  1. package vfscache
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "sync"
  10. "time"
  11. "github.com/rclone/rclone/fs"
  12. "github.com/rclone/rclone/fs/fserrors"
  13. "github.com/rclone/rclone/fs/operations"
  14. "github.com/rclone/rclone/lib/file"
  15. "github.com/rclone/rclone/lib/ranges"
  16. "github.com/rclone/rclone/vfs/vfscache/downloaders"
  17. "github.com/rclone/rclone/vfs/vfscache/writeback"
  18. )
  19. // NB as Cache and Item are tightly linked it is necessary to have a
  20. // total lock ordering between them. So Cache.mu must always be
  21. // taken before Item.mu to avoid deadlocks.
  22. //
  23. // Cache may call into Item but care is needed if Item calls Cache
  24. //
  25. // A lot of the Cache methods do not require locking, these include
  26. //
  27. // - Cache.toOSPath
  28. // - Cache.toOSPathMeta
  29. // - Cache.createItemDir
  30. // - Cache.objectFingerprint
  31. // - Cache.AddVirtual
  32. // NB Item and downloader are tightly linked so it is necessary to
  33. // have a total lock ordering between them. downloader.mu must always
  34. // be taken before Item.mu. downloader may call into Item but Item may
  35. // **not** call downloader methods with Item.mu held
  36. // NB Item and writeback are tightly linked so it is necessary to
  37. // have a total lock ordering between them. writeback.mu must always
  38. // be taken before Item.mu. writeback may call into Item but Item may
  39. // **not** call writeback methods with Item.mu held
  40. // LL Item reset is invoked by cache cleaner for synchronous recovery
  41. // from ENOSPC errors. The reset operation removes the cache file and
  42. // closes/reopens the downloaders. Although most parts of reset and
  43. // other item operations are done with the item mutex held, the mutex
  44. // is released during fd.WriteAt and downloaders calls. We use preAccess
  45. // and postAccess calls to serialize reset and other item operations.
  46. // Item is stored in the item map
  47. //
  48. // The Info field is written to the backing store to store status
  49. type Item struct {
  50. // read only
  51. c *Cache // cache this is part of
  52. mu sync.Mutex // protect the variables
  53. cond sync.Cond // synchronize with cache cleaner
  54. name string // name in the VFS
  55. opens int // number of times file is open
  56. downloaders *downloaders.Downloaders // a record of the downloaders in action - may be nil
  57. o fs.Object // object we are caching - may be nil
  58. fd *os.File // handle we are using to read and write to the file
  59. info Info // info about the file to persist to backing store
  60. writeBackID writeback.Handle // id of any writebacks in progress
  61. pendingAccesses int // number of threads - cache reset not allowed if not zero
  62. modified bool // set if the file has been modified since the last Open
  63. beingReset bool // cache cleaner is resetting the cache file, access not allowed
  64. }
  65. // Info is persisted to backing store
  66. type Info struct {
  67. ModTime time.Time // last time file was modified
  68. ATime time.Time // last time file was accessed
  69. Size int64 // size of the file
  70. Rs ranges.Ranges // which parts of the file are present
  71. Fingerprint string // fingerprint of remote object
  72. Dirty bool // set if the backing file has been modified
  73. }
  74. // Items are a slice of *Item ordered by ATime
  75. type Items []*Item
  76. // ResetResult reports the actual action taken in the Reset function and reason
  77. type ResetResult int
  78. // Constants used to report actual action taken in the Reset function and reason
  79. const (
  80. SkippedDirty ResetResult = iota // Dirty item cannot be reset
  81. SkippedPendingAccess // Reset pending access can lead to deadlock
  82. SkippedEmpty // Reset empty item does not save space
  83. RemovedNotInUse // Item not used. Remove instead of reset
  84. ResetFailed // Reset failed with an error
  85. ResetComplete // Reset completed successfully
  86. )
  87. func (rr ResetResult) String() string {
  88. return [...]string{"Dirty item skipped", "In-access item skipped", "Empty item skipped",
  89. "Not-in-use item removed", "Item reset failed", "Item reset completed"}[rr]
  90. }
  91. func (v Items) Len() int { return len(v) }
  92. func (v Items) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
  93. func (v Items) Less(i, j int) bool {
  94. if i == j {
  95. return false
  96. }
  97. iItem := v[i]
  98. jItem := v[j]
  99. iItem.mu.Lock()
  100. defer iItem.mu.Unlock()
  101. jItem.mu.Lock()
  102. defer jItem.mu.Unlock()
  103. return iItem.info.ATime.Before(jItem.info.ATime)
  104. }
  105. // clean the item after its cache file has been deleted
  106. func (info *Info) clean() {
  107. *info = Info{}
  108. info.ModTime = time.Now()
  109. info.ATime = info.ModTime
  110. }
  111. // StoreFn is called back with an object after it has been uploaded
  112. type StoreFn func(fs.Object)
  113. // newItem returns an item for the cache
  114. func newItem(c *Cache, name string) (item *Item) {
  115. now := time.Now()
  116. item = &Item{
  117. c: c,
  118. name: name,
  119. info: Info{
  120. ModTime: now,
  121. ATime: now,
  122. },
  123. }
  124. item.cond = sync.Cond{L: &item.mu}
  125. // check the cache file exists
  126. osPath := c.toOSPath(name)
  127. fi, statErr := os.Stat(osPath)
  128. if statErr != nil {
  129. if os.IsNotExist(statErr) {
  130. item._removeMeta("cache file doesn't exist")
  131. } else {
  132. item.remove(fmt.Sprintf("failed to stat cache file: %v", statErr))
  133. }
  134. }
  135. // Try to load the metadata
  136. exists, err := item.load()
  137. if !exists {
  138. item._removeFile("metadata doesn't exist")
  139. } else if err != nil {
  140. item.remove(fmt.Sprintf("failed to load metadata: %v", err))
  141. }
  142. // Get size estimate (which is best we can do until Open() called)
  143. if statErr == nil {
  144. item.info.Size = fi.Size()
  145. }
  146. return item
  147. }
  148. // inUse returns true if the item is open or dirty
  149. func (item *Item) inUse() bool {
  150. item.mu.Lock()
  151. defer item.mu.Unlock()
  152. return item.opens != 0 || item.info.Dirty
  153. }
  154. // getDiskSize returns the size on disk (approximately) of the item
  155. //
  156. // We return the sizes of the chunks we have fetched, however there is
  157. // likely to be some overhead which we are not taking into account.
  158. func (item *Item) getDiskSize() int64 {
  159. item.mu.Lock()
  160. defer item.mu.Unlock()
  161. return item.info.Rs.Size()
  162. }
  163. // load reads an item from the disk or returns nil if not found
  164. func (item *Item) load() (exists bool, err error) {
  165. item.mu.Lock()
  166. defer item.mu.Unlock()
  167. osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
  168. in, err := os.Open(osPathMeta)
  169. if err != nil {
  170. if os.IsNotExist(err) {
  171. return false, err
  172. }
  173. return true, fmt.Errorf("vfs cache item: failed to read metadata: %w", err)
  174. }
  175. defer fs.CheckClose(in, &err)
  176. decoder := json.NewDecoder(in)
  177. err = decoder.Decode(&item.info)
  178. if err != nil {
  179. return true, fmt.Errorf("vfs cache item: corrupt metadata: %w", err)
  180. }
  181. return true, nil
  182. }
  183. // save writes an item to the disk
  184. //
  185. // call with the lock held
  186. func (item *Item) _save() (err error) {
  187. osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
  188. out, err := os.Create(osPathMeta)
  189. if err != nil {
  190. return fmt.Errorf("vfs cache item: failed to write metadata: %w", err)
  191. }
  192. defer fs.CheckClose(out, &err)
  193. encoder := json.NewEncoder(out)
  194. encoder.SetIndent("", "\t")
  195. err = encoder.Encode(item.info)
  196. if err != nil {
  197. return fmt.Errorf("vfs cache item: failed to encode metadata: %w", err)
  198. }
  199. return nil
  200. }
  201. // truncate the item to the given size, creating it if necessary
  202. //
  203. // this does not mark the object as dirty
  204. //
  205. // call with the lock held
  206. func (item *Item) _truncate(size int64) (err error) {
  207. if size < 0 {
  208. // FIXME ignore unknown length files
  209. return nil
  210. }
  211. // Use open handle if available
  212. fd := item.fd
  213. if fd == nil {
  214. // If the metadata says we have some blocks cached then the
  215. // file should exist, so open without O_CREATE
  216. oFlags := os.O_WRONLY
  217. if item.info.Rs.Size() == 0 {
  218. oFlags |= os.O_CREATE
  219. }
  220. osPath := item.c.toOSPath(item.name) // No locking in Cache
  221. fd, err = file.OpenFile(osPath, oFlags, 0600)
  222. if err != nil && os.IsNotExist(err) {
  223. // If the metadata has info but the file doesn't
  224. // not exist then it has been externally removed
  225. fs.Errorf(item.name, "vfs cache: detected external removal of cache file")
  226. item.info.Rs = nil // show we have no blocks cached
  227. item.info.Dirty = false // file can't be dirty if it doesn't exist
  228. item._removeMeta("cache file externally deleted")
  229. fd, err = file.OpenFile(osPath, os.O_CREATE|os.O_WRONLY, 0600)
  230. }
  231. if err != nil {
  232. return fmt.Errorf("vfs cache: truncate: failed to open cache file: %w", err)
  233. }
  234. defer fs.CheckClose(fd, &err)
  235. err = file.SetSparse(fd)
  236. if err != nil {
  237. fs.Errorf(item.name, "vfs cache: truncate: failed to set as a sparse file: %v", err)
  238. }
  239. }
  240. // Check to see what the current size is, and don't truncate
  241. // if it is already the correct size.
  242. //
  243. // Apparently Windows Defender likes to check executables each
  244. // time they are modified, and truncating a file to its
  245. // existing size is enough to trigger the Windows Defender
  246. // scan. This was causing a big slowdown for operations which
  247. // opened and closed the file a lot, such as looking at
  248. // properties on an executable.
  249. fi, err := fd.Stat()
  250. if err == nil && fi.Size() == size {
  251. fs.Debugf(item.name, "vfs cache: truncate to size=%d (not needed as size correct)", size)
  252. } else {
  253. fs.Debugf(item.name, "vfs cache: truncate to size=%d", size)
  254. err = fd.Truncate(size)
  255. if err != nil {
  256. return fmt.Errorf("vfs cache: truncate: %w", err)
  257. }
  258. }
  259. item.info.Size = size
  260. return nil
  261. }
  262. // Truncate the item to the current size, creating if necessary
  263. //
  264. // This does not mark the object as dirty.
  265. //
  266. // call with the lock held
  267. func (item *Item) _truncateToCurrentSize() (err error) {
  268. size, err := item._getSize()
  269. if err != nil && !errors.Is(err, os.ErrNotExist) {
  270. return fmt.Errorf("truncate to current size: %w", err)
  271. }
  272. if size < 0 {
  273. // FIXME ignore unknown length files
  274. return nil
  275. }
  276. err = item._truncate(size)
  277. if err != nil {
  278. return err
  279. }
  280. return nil
  281. }
  282. // Truncate the item to the given size, creating it if necessary
  283. //
  284. // If the new size is shorter than the existing size then the object
  285. // will be shortened and marked as dirty.
  286. //
  287. // If the new size is longer than the old size then the object will be
  288. // extended and the extended data will be filled with zeros. The
  289. // object will be marked as dirty in this case also.
  290. func (item *Item) Truncate(size int64) (err error) {
  291. item.preAccess()
  292. defer item.postAccess()
  293. item.mu.Lock()
  294. defer item.mu.Unlock()
  295. if item.fd == nil {
  296. return errors.New("vfs cache item truncate: internal error: didn't Open file")
  297. }
  298. // Read old size
  299. oldSize, err := item._getSize()
  300. if err != nil {
  301. if !errors.Is(err, os.ErrNotExist) {
  302. return fmt.Errorf("truncate failed to read size: %w", err)
  303. }
  304. oldSize = 0
  305. }
  306. err = item._truncate(size)
  307. if err != nil {
  308. return err
  309. }
  310. changed := true
  311. if size > oldSize {
  312. // Truncate extends the file in which case all new bytes are
  313. // read as zeros. In this case we must show we have written to
  314. // the new parts of the file.
  315. item._written(oldSize, size)
  316. } else if size < oldSize {
  317. // Truncate shrinks the file so clip the downloaded ranges
  318. item.info.Rs = item.info.Rs.Intersection(ranges.Range{Pos: 0, Size: size})
  319. } else {
  320. changed = item.o == nil
  321. }
  322. if changed {
  323. item._dirty()
  324. }
  325. return nil
  326. }
  327. // _stat gets the current stat of the backing file
  328. //
  329. // Call with mutex held
  330. func (item *Item) _stat() (fi os.FileInfo, err error) {
  331. if item.fd != nil {
  332. return item.fd.Stat()
  333. }
  334. osPath := item.c.toOSPath(item.name) // No locking in Cache
  335. return os.Stat(osPath)
  336. }
  337. // _getSize gets the current size of the item and updates item.info.Size
  338. //
  339. // Call with mutex held
  340. func (item *Item) _getSize() (size int64, err error) {
  341. fi, err := item._stat()
  342. if err != nil {
  343. if os.IsNotExist(err) && item.o != nil {
  344. size = item.o.Size()
  345. err = nil
  346. }
  347. } else {
  348. size = fi.Size()
  349. }
  350. if err == nil {
  351. item.info.Size = size
  352. }
  353. return size, err
  354. }
  355. // GetName gets the vfs name of the item
  356. func (item *Item) GetName() (name string) {
  357. item.mu.Lock()
  358. defer item.mu.Unlock()
  359. return item.name
  360. }
  361. // GetSize gets the current size of the item
  362. func (item *Item) GetSize() (size int64, err error) {
  363. item.mu.Lock()
  364. defer item.mu.Unlock()
  365. return item._getSize()
  366. }
  367. // _exists returns whether the backing file for the item exists or not
  368. //
  369. // call with mutex held
  370. func (item *Item) _exists() bool {
  371. osPath := item.c.toOSPath(item.name) // No locking in Cache
  372. _, err := os.Stat(osPath)
  373. return err == nil
  374. }
  375. // Exists returns whether the backing file for the item exists or not
  376. func (item *Item) Exists() bool {
  377. item.mu.Lock()
  378. defer item.mu.Unlock()
  379. return item._exists()
  380. }
  381. // _dirty marks the item as changed and needing writeback
  382. //
  383. // call with lock held
  384. func (item *Item) _dirty() {
  385. item.info.ModTime = time.Now()
  386. item.info.ATime = item.info.ModTime
  387. if !item.modified {
  388. item.modified = true
  389. item.mu.Unlock()
  390. item.c.writeback.Remove(item.writeBackID)
  391. item.mu.Lock()
  392. }
  393. if !item.info.Dirty {
  394. item.info.Dirty = true
  395. err := item._save()
  396. if err != nil {
  397. fs.Errorf(item.name, "vfs cache: failed to save item info: %v", err)
  398. }
  399. }
  400. }
  401. // Dirty marks the item as changed and needing writeback
  402. func (item *Item) Dirty() {
  403. item.preAccess()
  404. defer item.postAccess()
  405. item.mu.Lock()
  406. item._dirty()
  407. item.mu.Unlock()
  408. }
  409. // IsDirty returns true if the item data is dirty
  410. func (item *Item) IsDirty() bool {
  411. item.mu.Lock()
  412. defer item.mu.Unlock()
  413. return item.info.Dirty
  414. }
  415. // Create the cache file and store the metadata on disk
  416. // Called with item.mu locked
  417. func (item *Item) _createFile(osPath string) (err error) {
  418. if item.fd != nil {
  419. return errors.New("vfs cache item: internal error: didn't Close file")
  420. }
  421. item.modified = false
  422. // t0 := time.Now()
  423. fd, err := file.OpenFile(osPath, os.O_RDWR, 0600)
  424. // fs.Debugf(item.name, "OpenFile took %v", time.Since(t0))
  425. if err != nil {
  426. return fmt.Errorf("vfs cache item: open failed: %w", err)
  427. }
  428. err = file.SetSparse(fd)
  429. if err != nil {
  430. fs.Errorf(item.name, "vfs cache: failed to set as a sparse file: %v", err)
  431. }
  432. item.fd = fd
  433. err = item._save()
  434. if err != nil {
  435. closeErr := item.fd.Close()
  436. if closeErr != nil {
  437. fs.Errorf(item.name, "vfs cache: item.fd.Close: closeErr: %v", err)
  438. }
  439. item.fd = nil
  440. return fmt.Errorf("vfs cache item: _save failed: %w", err)
  441. }
  442. return err
  443. }
  444. // Open the local file from the object passed in. Wraps open()
  445. // to provide recovery from out of space error.
  446. func (item *Item) Open(o fs.Object) (err error) {
  447. for retries := 0; retries < fs.GetConfig(context.TODO()).LowLevelRetries; retries++ {
  448. item.preAccess()
  449. err = item.open(o)
  450. item.postAccess()
  451. if err == nil {
  452. break
  453. }
  454. fs.Errorf(item.name, "vfs cache: failed to open item: %v", err)
  455. if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" {
  456. fs.Errorf(item.name, "Non-out-of-space error encountered during open")
  457. break
  458. }
  459. item.c.KickCleaner()
  460. }
  461. return err
  462. }
  463. // Open the local file from the object passed in (which may be nil)
  464. // which implies we are about to create the file
  465. func (item *Item) open(o fs.Object) (err error) {
  466. // defer log.Trace(o, "item=%p", item)("err=%v", &err)
  467. item.mu.Lock()
  468. defer item.mu.Unlock()
  469. item.info.ATime = time.Now()
  470. osPath, err := item.c.createItemDir(item.name) // No locking in Cache
  471. if err != nil {
  472. return fmt.Errorf("vfs cache item: createItemDir failed: %w", err)
  473. }
  474. err = item._checkObject(o)
  475. if err != nil {
  476. return fmt.Errorf("vfs cache item: check object failed: %w", err)
  477. }
  478. item.opens++
  479. if item.opens != 1 {
  480. return nil
  481. }
  482. err = item._createFile(osPath)
  483. if err != nil {
  484. item._remove("item.open failed on _createFile, remove cache data/metadata files")
  485. item.fd = nil
  486. item.opens--
  487. return fmt.Errorf("vfs cache item: create cache file failed: %w", err)
  488. }
  489. // Unlock the Item.mu so we can call some methods which take Cache.mu
  490. item.mu.Unlock()
  491. // Ensure this item is in the cache. It is possible a cache
  492. // expiry has run and removed the item if it had no opens so
  493. // we put it back here. If there was an item with opens
  494. // already then return an error. This shouldn't happen because
  495. // there should only be one vfs.File with a pointer to this
  496. // item in at a time.
  497. oldItem := item.c.put(item.name, item) // LOCKING in Cache method
  498. if oldItem != nil {
  499. oldItem.mu.Lock()
  500. if oldItem.opens != 0 {
  501. // Put the item back and return an error
  502. item.c.put(item.name, oldItem) // LOCKING in Cache method
  503. err = fmt.Errorf("internal error: item %q already open in the cache", item.name)
  504. }
  505. oldItem.mu.Unlock()
  506. }
  507. // Relock the Item.mu for the return
  508. item.mu.Lock()
  509. // Create the downloaders
  510. if item.o != nil {
  511. item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
  512. }
  513. return err
  514. }
  515. // Calls f with mu unlocked, re-locking mu if a panic is raised
  516. //
  517. // mu must be locked when calling this function
  518. func unlockMutexForCall(mu *sync.Mutex, f func()) {
  519. mu.Unlock()
  520. defer mu.Lock()
  521. f()
  522. }
  523. // Store stores the local cache file to the remote object, returning
  524. // the new remote object. objOld is the old object if known.
  525. //
  526. // Call with lock held
  527. func (item *Item) _store(ctx context.Context, storeFn StoreFn) (err error) {
  528. // defer log.Trace(item.name, "item=%p", item)("err=%v", &err)
  529. // Transfer the temp file to the remote
  530. cacheObj, err := item.c.fcache.NewObject(ctx, item.name)
  531. if err != nil && err != fs.ErrorObjectNotFound {
  532. return fmt.Errorf("vfs cache: failed to find cache file: %w", err)
  533. }
  534. // Object has disappeared if cacheObj == nil
  535. if cacheObj != nil {
  536. o, name := item.o, item.name
  537. unlockMutexForCall(&item.mu, func() {
  538. o, err = operations.Copy(ctx, item.c.fremote, o, name, cacheObj)
  539. })
  540. if err != nil {
  541. if errors.Is(err, fs.ErrorCantUploadEmptyFiles) {
  542. fs.Errorf(name, "Writeback failed: %v", err)
  543. return nil
  544. }
  545. return fmt.Errorf("vfs cache: failed to transfer file from cache to remote: %w", err)
  546. }
  547. item.o = o
  548. item._updateFingerprint()
  549. }
  550. // Write the object back to the VFS layer before we mark it as
  551. // clean, otherwise it will become eligible for removal which
  552. // can cause a deadlock
  553. if storeFn != nil && item.o != nil {
  554. fs.Debugf(item.name, "vfs cache: writeback object to VFS layer")
  555. // Write the object back to the VFS layer last with mutex unlocked
  556. o := item.o
  557. item.mu.Unlock()
  558. storeFn(o)
  559. item.mu.Lock()
  560. }
  561. // Show item is clean and is eligible for cache removal
  562. item.info.Dirty = false
  563. err = item._save()
  564. if err != nil {
  565. fs.Errorf(item.name, "vfs cache: failed to write metadata file: %v", err)
  566. }
  567. return nil
  568. }
  569. // Store stores the local cache file to the remote object, returning
  570. // the new remote object. objOld is the old object if known.
  571. func (item *Item) store(ctx context.Context, storeFn StoreFn) (err error) {
  572. item.mu.Lock()
  573. defer item.mu.Unlock()
  574. return item._store(ctx, storeFn)
  575. }
  576. // Close the cache file
  577. func (item *Item) Close(storeFn StoreFn) (err error) {
  578. // defer log.Trace(item.o, "Item.Close")("err=%v", &err)
  579. item.preAccess()
  580. defer item.postAccess()
  581. var (
  582. downloaders *downloaders.Downloaders
  583. syncWriteBack = item.c.opt.WriteBack <= 0
  584. )
  585. item.mu.Lock()
  586. defer item.mu.Unlock()
  587. item.info.ATime = time.Now()
  588. item.opens--
  589. if item.opens < 0 {
  590. return os.ErrClosed
  591. } else if item.opens > 0 {
  592. return nil
  593. }
  594. // Update the size on close
  595. _, _ = item._getSize()
  596. // If the file is dirty ensure any segments not transferred
  597. // are brought in first.
  598. //
  599. // FIXME It would be nice to do this asynchronously however it
  600. // would require keeping the downloaders alive after the item
  601. // has been closed
  602. if item.info.Dirty && item.o != nil {
  603. err = item._ensure(0, item.info.Size)
  604. if err != nil {
  605. return fmt.Errorf("vfs cache: failed to download missing parts of cache file: %w", err)
  606. }
  607. }
  608. // Accumulate and log errors
  609. checkErr := func(e error) {
  610. if e != nil {
  611. fs.Errorf(item.o, "vfs cache: item close failed: %v", e)
  612. if err == nil {
  613. err = e
  614. }
  615. }
  616. }
  617. // Close the downloaders
  618. if downloaders = item.downloaders; downloaders != nil {
  619. item.downloaders = nil
  620. // FIXME need to unlock to kill downloader - should we
  621. // re-arrange locking so this isn't necessary? maybe
  622. // downloader should use the item mutex for locking? or put a
  623. // finer lock on Rs?
  624. //
  625. // downloader.Write calls ensure which needs the lock
  626. // close downloader with mutex unlocked
  627. item.mu.Unlock()
  628. checkErr(downloaders.Close(nil))
  629. item.mu.Lock()
  630. }
  631. // close the file handle
  632. if item.fd == nil {
  633. checkErr(errors.New("vfs cache item: internal error: didn't Open file"))
  634. } else {
  635. checkErr(item.fd.Close())
  636. item.fd = nil
  637. }
  638. // save the metadata once more since it may be dirty
  639. // after the downloader
  640. checkErr(item._save())
  641. // if the item hasn't been changed but has been completed then
  642. // set the modtime from the object otherwise set it from the info
  643. if item._exists() {
  644. if !item.info.Dirty && item.o != nil {
  645. item._setModTime(item.o.ModTime(context.Background()))
  646. } else {
  647. item._setModTime(item.info.ModTime)
  648. }
  649. }
  650. // upload the file to backing store if changed
  651. if item.info.Dirty {
  652. fs.Infof(item.name, "vfs cache: queuing for upload in %v", item.c.opt.WriteBack)
  653. if syncWriteBack {
  654. // do synchronous writeback
  655. checkErr(item._store(context.Background(), storeFn))
  656. } else {
  657. // asynchronous writeback
  658. item.c.writeback.SetID(&item.writeBackID)
  659. id := item.writeBackID
  660. item.mu.Unlock()
  661. item.c.writeback.Add(id, item.name, item.info.Size, item.modified, func(ctx context.Context) error {
  662. return item.store(ctx, storeFn)
  663. })
  664. item.mu.Lock()
  665. }
  666. }
  667. // mark as not modified now we have uploaded or queued for upload
  668. item.modified = false
  669. return err
  670. }
  671. // reload is called with valid items recovered from a cache reload.
  672. //
  673. // If they are dirty then it makes sure they get uploaded.
  674. //
  675. // it is called before the cache has started so opens will be 0 and
  676. // metaDirty will be false.
  677. func (item *Item) reload(ctx context.Context) error {
  678. item.mu.Lock()
  679. dirty := item.info.Dirty
  680. item.mu.Unlock()
  681. if !dirty {
  682. return nil
  683. }
  684. // see if the object still exists
  685. obj, _ := item.c.fremote.NewObject(ctx, item.name)
  686. // open the file with the object (or nil)
  687. err := item.Open(obj)
  688. if err != nil {
  689. return err
  690. }
  691. // close the file to execute the writeback if needed
  692. err = item.Close(nil)
  693. if err != nil {
  694. return err
  695. }
  696. // put the file into the directory listings
  697. size, err := item._getSize()
  698. if err != nil {
  699. return fmt.Errorf("reload: failed to read size: %w", err)
  700. }
  701. err = item.c.AddVirtual(item.name, size, false)
  702. if err != nil {
  703. return fmt.Errorf("reload: failed to add virtual dir entry: %w", err)
  704. }
  705. return nil
  706. }
  707. // check the fingerprint of an object and update the item or delete
  708. // the cached file accordingly
  709. //
  710. // If we have local modifications then they take precedence
  711. // over a change in the remote
  712. //
  713. // It ensures the file is the correct size for the object.
  714. //
  715. // call with lock held
  716. func (item *Item) _checkObject(o fs.Object) error {
  717. if o == nil {
  718. if item.info.Fingerprint != "" {
  719. // no remote object && local object
  720. // remove local object unless dirty
  721. if !item.info.Dirty {
  722. item._remove("stale (remote deleted)")
  723. } else {
  724. fs.Debugf(item.name, "vfs cache: remote object has gone but local object modified - keeping it")
  725. }
  726. //} else {
  727. // no remote object && no local object
  728. // OK
  729. }
  730. } else {
  731. remoteFingerprint := fs.Fingerprint(context.TODO(), o, item.c.opt.FastFingerprint)
  732. fs.Debugf(item.name, "vfs cache: checking remote fingerprint %q against cached fingerprint %q", remoteFingerprint, item.info.Fingerprint)
  733. if item.info.Fingerprint != "" {
  734. // remote object && local object
  735. if remoteFingerprint != item.info.Fingerprint {
  736. if !item.info.Dirty {
  737. fs.Debugf(item.name, "vfs cache: removing cached entry as stale (remote fingerprint %q != cached fingerprint %q)", remoteFingerprint, item.info.Fingerprint)
  738. item._remove("stale (remote is different)")
  739. item.info.Fingerprint = remoteFingerprint
  740. } else {
  741. fs.Debugf(item.name, "vfs cache: remote object has changed but local object modified - keeping it (remote fingerprint %q != cached fingerprint %q)", remoteFingerprint, item.info.Fingerprint)
  742. }
  743. }
  744. } else {
  745. // remote object && no local object
  746. // Set fingerprint
  747. item.info.Fingerprint = remoteFingerprint
  748. }
  749. item.info.Size = o.Size()
  750. }
  751. item.o = o
  752. err := item._truncateToCurrentSize()
  753. if err != nil {
  754. return fmt.Errorf("vfs cache item: open truncate failed: %w", err)
  755. }
  756. return nil
  757. }
  758. // WrittenBack checks to see if the item has been written back or not
  759. func (item *Item) WrittenBack() bool {
  760. item.mu.Lock()
  761. defer item.mu.Unlock()
  762. return item.info.Fingerprint != ""
  763. }
  764. // remove the cached file
  765. //
  766. // call with lock held
  767. func (item *Item) _removeFile(reason string) {
  768. osPath := item.c.toOSPath(item.name) // No locking in Cache
  769. err := os.Remove(osPath)
  770. if err != nil {
  771. if !os.IsNotExist(err) {
  772. fs.Errorf(item.name, "vfs cache: failed to remove cache file as %s: %v", reason, err)
  773. }
  774. } else {
  775. fs.Infof(item.name, "vfs cache: removed cache file as %s", reason)
  776. }
  777. }
  778. // remove the metadata
  779. //
  780. // call with lock held
  781. func (item *Item) _removeMeta(reason string) {
  782. osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
  783. err := os.Remove(osPathMeta)
  784. if err != nil {
  785. if !os.IsNotExist(err) {
  786. fs.Errorf(item.name, "vfs cache: failed to remove metadata from cache as %s: %v", reason, err)
  787. }
  788. } else {
  789. fs.Debugf(item.name, "vfs cache: removed metadata from cache as %s", reason)
  790. }
  791. }
  792. // remove the cached file and empty the metadata
  793. //
  794. // This returns true if the file was in the transfer queue so may not
  795. // have completely uploaded yet.
  796. //
  797. // call with lock held
  798. func (item *Item) _remove(reason string) (wasWriting bool) {
  799. // Cancel writeback, if any
  800. item.mu.Unlock()
  801. wasWriting = item.c.writeback.Remove(item.writeBackID)
  802. item.mu.Lock()
  803. item.info.clean()
  804. item._removeFile(reason)
  805. item._removeMeta(reason)
  806. return wasWriting
  807. }
  808. // remove the cached file and empty the metadata
  809. //
  810. // This returns true if the file was in the transfer queue so may not
  811. // have completely uploaded yet.
  812. func (item *Item) remove(reason string) (wasWriting bool) {
  813. item.mu.Lock()
  814. defer item.mu.Unlock()
  815. return item._remove(reason)
  816. }
  817. // RemoveNotInUse is called to remove cache file that has not been accessed recently
  818. // It may also be called for removing empty cache files too when the quota is already reached.
  819. func (item *Item) RemoveNotInUse(maxAge time.Duration, emptyOnly bool) (removed bool, spaceFreed int64) {
  820. item.mu.Lock()
  821. defer item.mu.Unlock()
  822. spaceFreed = 0
  823. removed = false
  824. if item.opens != 0 || item.info.Dirty {
  825. return
  826. }
  827. removeIt := false
  828. if maxAge == 0 {
  829. removeIt = true // quota-driven removal
  830. }
  831. if maxAge != 0 {
  832. cutoff := time.Now().Add(-maxAge)
  833. // If not locked and access time too long ago - delete the file
  834. accessTime := item.info.ATime
  835. if accessTime.Sub(cutoff) <= 0 {
  836. removeIt = true
  837. }
  838. }
  839. if removeIt {
  840. spaceUsed := item.info.Rs.Size()
  841. if !emptyOnly || spaceUsed == 0 {
  842. spaceFreed = spaceUsed
  843. removed = true
  844. if item._remove("Removing old cache file not in use") {
  845. fs.Errorf(item.name, "item removed when it was writing/uploaded")
  846. }
  847. }
  848. }
  849. return
  850. }
  851. // Reset is called by the cache purge functions only to reset (empty the contents) cache files that
  852. // are not dirty. It is used when cache space runs out and we see some ENOSPC error.
  853. func (item *Item) Reset() (rr ResetResult, spaceFreed int64, err error) {
  854. item.mu.Lock()
  855. defer item.mu.Unlock()
  856. // The item is not being used now. Just remove it instead of resetting it.
  857. if item.opens == 0 && !item.info.Dirty {
  858. spaceFreed = item.info.Rs.Size()
  859. if item._remove("Removing old cache file not in use") {
  860. fs.Errorf(item.name, "item removed when it was writing/uploaded")
  861. }
  862. return RemovedNotInUse, spaceFreed, nil
  863. }
  864. // do not reset dirty file
  865. if item.info.Dirty {
  866. return SkippedDirty, 0, nil
  867. }
  868. /* A wait on pendingAccessCnt to become 0 can lead to deadlock when an item.Open bumps
  869. up the pendingAccesses count, calls item.open, which calls cache.put. The cache.put
  870. operation needs the cache mutex, which is held here. We skip this file now. The
  871. caller (the cache cleaner thread) may retry resetting this item if the cache size does
  872. not reduce below quota. */
  873. if item.pendingAccesses > 0 {
  874. return SkippedPendingAccess, 0, nil
  875. }
  876. /* Do not need to reset an empty cache file unless it was being reset and the reset failed.
  877. Some thread(s) may be waiting on the reset's successful completion in that case. */
  878. if item.info.Rs.Size() == 0 && !item.beingReset {
  879. return SkippedEmpty, 0, nil
  880. }
  881. item.beingReset = true
  882. /* Error handling from this point on (setting item.fd and item.beingReset):
  883. Since Reset is called by the cache cleaner thread, there is no direct way to return
  884. the error to the io threads. Set item.fd to nil upon internal errors, so that the
  885. io threads will return internal errors seeing a nil fd. In the case when the error
  886. is ENOSPC, keep the item in isBeingReset state and that will keep the item.ReadAt
  887. waiting at its beginning. The cache purge loop will try to redo the reset after cache
  888. space is made available again. This recovery design should allow most io threads to
  889. eventually go through, unless large files are written/overwritten concurrently and
  890. the total size of these files exceed the cache storage limit. */
  891. // Close the downloaders
  892. // Accumulate and log errors
  893. checkErr := func(e error) {
  894. if e != nil {
  895. fs.Errorf(item.o, "vfs cache: item reset failed: %v", e)
  896. if err == nil {
  897. err = e
  898. }
  899. }
  900. }
  901. if downloaders := item.downloaders; downloaders != nil {
  902. item.downloaders = nil
  903. // FIXME need to unlock to kill downloader - should we
  904. // re-arrange locking so this isn't necessary? maybe
  905. // downloader should use the item mutex for locking? or put a
  906. // finer lock on Rs?
  907. //
  908. // downloader.Write calls ensure which needs the lock
  909. // close downloader with mutex unlocked
  910. item.mu.Unlock()
  911. checkErr(downloaders.Close(nil))
  912. item.mu.Lock()
  913. }
  914. // close the file handle
  915. // fd can be nil if we tried Reset and failed before because of ENOSPC during reset
  916. if item.fd != nil {
  917. checkErr(item.fd.Close())
  918. if err != nil {
  919. // Could not close the cache file
  920. item.beingReset = false
  921. item.cond.Broadcast()
  922. return ResetFailed, 0, err
  923. }
  924. item.fd = nil
  925. }
  926. spaceFreed = item.info.Rs.Size()
  927. // This should not be possible. We get here only if cache data is not dirty.
  928. if item._remove("cache out of space, item is clean") {
  929. fs.Errorf(item.o, "vfs cache item removed when it was writing/uploaded")
  930. }
  931. // can we have an item with no dirty data (so that we can get here) and nil item.o at the same time?
  932. fso := item.o
  933. checkErr(item._checkObject(fso))
  934. if err != nil {
  935. item.beingReset = false
  936. item.cond.Broadcast()
  937. return ResetFailed, spaceFreed, err
  938. }
  939. osPath := item.c.toOSPath(item.name)
  940. checkErr(item._createFile(osPath))
  941. if err != nil {
  942. item._remove("cache reset failed on _createFile, removed cache data file")
  943. item.fd = nil // This allows a new Reset redo to have a clean state to deal with
  944. if !fserrors.IsErrNoSpace(err) {
  945. item.beingReset = false
  946. item.cond.Broadcast()
  947. }
  948. return ResetFailed, spaceFreed, err
  949. }
  950. // Create the downloaders
  951. if item.o != nil {
  952. item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
  953. }
  954. /* The item will stay in the beingReset state if we get an error that prevents us from
  955. reaching this point. The cache purge loop will redo the failed Reset. */
  956. item.beingReset = false
  957. item.cond.Broadcast()
  958. return ResetComplete, spaceFreed, err
  959. }
  960. // ProtectCache either waits for an ongoing cache reset to finish or increases pendingReads
  961. // to protect against cache reset on this item while the thread potentially uses the cache file
  962. // Cache cleaner waits until pendingReads is zero before resetting cache.
  963. func (item *Item) preAccess() {
  964. item.mu.Lock()
  965. defer item.mu.Unlock()
  966. if item.beingReset {
  967. for {
  968. item.cond.Wait()
  969. if !item.beingReset {
  970. break
  971. }
  972. }
  973. }
  974. item.pendingAccesses++
  975. }
  976. // postAccess reduces the pendingReads count enabling cache reset upon ENOSPC
  977. func (item *Item) postAccess() {
  978. item.mu.Lock()
  979. defer item.mu.Unlock()
  980. item.pendingAccesses--
  981. item.cond.Broadcast()
  982. }
  983. // _present returns true if the whole file has been downloaded
  984. //
  985. // call with the lock held
  986. func (item *Item) _present() bool {
  987. return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size})
  988. }
  989. // present returns true if the whole file has been downloaded
  990. func (item *Item) present() bool {
  991. item.mu.Lock()
  992. defer item.mu.Unlock()
  993. return item._present()
  994. }
  995. // HasRange returns true if the current ranges entirely include range
  996. func (item *Item) HasRange(r ranges.Range) bool {
  997. item.mu.Lock()
  998. defer item.mu.Unlock()
  999. return item.info.Rs.Present(r)
  1000. }
  1001. // FindMissing adjusts r returning a new ranges.Range which only
  1002. // contains the range which needs to be downloaded. This could be
  1003. // empty - check with IsEmpty. It also adjust this to make sure it is
  1004. // not larger than the file.
  1005. func (item *Item) FindMissing(r ranges.Range) (outr ranges.Range) {
  1006. item.mu.Lock()
  1007. defer item.mu.Unlock()
  1008. outr = item.info.Rs.FindMissing(r)
  1009. // Clip returned block to size of file
  1010. outr.Clip(item.info.Size)
  1011. return outr
  1012. }
  1013. // ensure the range from offset, size is present in the backing file
  1014. //
  1015. // call with the item lock held
  1016. func (item *Item) _ensure(offset, size int64) (err error) {
  1017. // defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("err=%v", &err)
  1018. if offset+size > item.info.Size {
  1019. size = item.info.Size - offset
  1020. }
  1021. r := ranges.Range{Pos: offset, Size: size}
  1022. present := item.info.Rs.Present(r)
  1023. /* This statement simulates a cache space error for test purpose */
  1024. /* if present != true && item.info.Rs.Size() > 32*1024*1024 {
  1025. return errors.New("no space left on device")
  1026. } */
  1027. fs.Debugf(nil, "vfs cache: looking for range=%+v in %+v - present %v", r, item.info.Rs, present)
  1028. item.mu.Unlock()
  1029. defer item.mu.Lock()
  1030. if present {
  1031. // This is a file we are writing so no downloaders needed
  1032. if item.downloaders == nil {
  1033. return nil
  1034. }
  1035. // Otherwise start the downloader for the future if required
  1036. return item.downloaders.EnsureDownloader(r)
  1037. }
  1038. if item.downloaders == nil {
  1039. // Downloaders can be nil here if the file has been
  1040. // renamed, so need to make some more downloaders
  1041. // OK to call downloaders constructor with item.mu held
  1042. // item.o can also be nil under some circumstances
  1043. // See: https://github.com/rclone/rclone/issues/6190
  1044. // See: https://github.com/rclone/rclone/issues/6235
  1045. if item.o == nil {
  1046. o, err := item.c.fremote.NewObject(context.Background(), item.name)
  1047. if err != nil {
  1048. return err
  1049. }
  1050. item.o = o
  1051. }
  1052. item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
  1053. }
  1054. return item.downloaders.Download(r)
  1055. }
  1056. // _written marks the (offset, size) as present in the backing file
  1057. //
  1058. // This is called by the downloader downloading file segments and the
  1059. // vfs layer writing to the file.
  1060. //
  1061. // This doesn't mark the item as Dirty - that the responsibility
  1062. // of the caller as we don't know here whether we are adding reads or
  1063. // writes to the cache file.
  1064. //
  1065. // call with lock held
  1066. func (item *Item) _written(offset, size int64) {
  1067. // defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("")
  1068. item.info.Rs.Insert(ranges.Range{Pos: offset, Size: size})
  1069. }
  1070. // update the fingerprint of the object if any
  1071. //
  1072. // call with lock held
  1073. func (item *Item) _updateFingerprint() {
  1074. if item.o == nil {
  1075. return
  1076. }
  1077. oldFingerprint := item.info.Fingerprint
  1078. item.info.Fingerprint = fs.Fingerprint(context.TODO(), item.o, item.c.opt.FastFingerprint)
  1079. if oldFingerprint != item.info.Fingerprint {
  1080. fs.Debugf(item.o, "vfs cache: fingerprint now %q", item.info.Fingerprint)
  1081. }
  1082. }
  1083. // setModTime of the cache file
  1084. //
  1085. // call with lock held
  1086. func (item *Item) _setModTime(modTime time.Time) {
  1087. fs.Debugf(item.name, "vfs cache: setting modification time to %v", modTime)
  1088. osPath := item.c.toOSPath(item.name) // No locking in Cache
  1089. err := os.Chtimes(osPath, modTime, modTime)
  1090. if err != nil {
  1091. fs.Errorf(item.name, "vfs cache: failed to set modification time of cached file: %v", err)
  1092. }
  1093. }
  1094. // setModTime of the cache file and in the Item
  1095. func (item *Item) setModTime(modTime time.Time) {
  1096. // defer log.Trace(item.name, "modTime=%v", modTime)("")
  1097. item.mu.Lock()
  1098. item._updateFingerprint()
  1099. item._setModTime(modTime)
  1100. item.info.ModTime = modTime
  1101. err := item._save()
  1102. if err != nil {
  1103. fs.Errorf(item.name, "vfs cache: setModTime: failed to save item info: %v", err)
  1104. }
  1105. item.mu.Unlock()
  1106. }
  1107. // GetModTime of the cache file
  1108. func (item *Item) GetModTime() (modTime time.Time, err error) {
  1109. // defer log.Trace(item.name, "modTime=%v", modTime)("")
  1110. item.mu.Lock()
  1111. defer item.mu.Unlock()
  1112. fi, err := item._stat()
  1113. if err == nil {
  1114. modTime = fi.ModTime()
  1115. }
  1116. return modTime, nil
  1117. }
  1118. // ReadAt bytes from the file at off
  1119. func (item *Item) ReadAt(b []byte, off int64) (n int, err error) {
  1120. n = 0
  1121. var expBackOff int
  1122. for retries := 0; retries < fs.GetConfig(context.TODO()).LowLevelRetries; retries++ {
  1123. item.preAccess()
  1124. n, err = item.readAt(b, off)
  1125. item.postAccess()
  1126. if err == nil || err == io.EOF {
  1127. break
  1128. }
  1129. fs.Errorf(item.name, "vfs cache: failed to _ensure cache %v", err)
  1130. if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" {
  1131. fs.Debugf(item.name, "vfs cache: failed to _ensure cache %v is not out of space", err)
  1132. break
  1133. }
  1134. item.c.KickCleaner()
  1135. expBackOff = 2 << uint(retries)
  1136. time.Sleep(time.Duration(expBackOff) * time.Millisecond) // Exponential back-off the retries
  1137. }
  1138. if fserrors.IsErrNoSpace(err) {
  1139. fs.Errorf(item.name, "vfs cache: failed to _ensure cache after retries %v", err)
  1140. }
  1141. return n, err
  1142. }
  1143. // ReadAt bytes from the file at off
  1144. func (item *Item) readAt(b []byte, off int64) (n int, err error) {
  1145. item.mu.Lock()
  1146. if item.fd == nil {
  1147. item.mu.Unlock()
  1148. return 0, errors.New("vfs cache item ReadAt: internal error: didn't Open file")
  1149. }
  1150. if off < 0 {
  1151. item.mu.Unlock()
  1152. return 0, io.EOF
  1153. }
  1154. defer item.mu.Unlock()
  1155. err = item._ensure(off, int64(len(b)))
  1156. if err != nil {
  1157. return 0, err
  1158. }
  1159. // Check to see if object has shrunk - if so don't read too much.
  1160. if item.o != nil && !item.info.Dirty && item.o.Size() != item.info.Size {
  1161. fs.Debugf(item.o, "Size has changed from %d to %d", item.info.Size, item.o.Size())
  1162. err = item._truncate(item.o.Size())
  1163. if err != nil {
  1164. return 0, err
  1165. }
  1166. }
  1167. item.info.ATime = time.Now()
  1168. // Do the reading with Item.mu unlocked and cache protected by preAccess
  1169. n, err = item.fd.ReadAt(b, off)
  1170. return n, err
  1171. }
  1172. // WriteAt bytes to the file at off
  1173. func (item *Item) WriteAt(b []byte, off int64) (n int, err error) {
  1174. item.preAccess()
  1175. defer item.postAccess()
  1176. item.mu.Lock()
  1177. if item.fd == nil {
  1178. item.mu.Unlock()
  1179. return 0, errors.New("vfs cache item WriteAt: internal error: didn't Open file")
  1180. }
  1181. item.mu.Unlock()
  1182. // Do the writing with Item.mu unlocked
  1183. n, err = item.fd.WriteAt(b, off)
  1184. if err == nil && n != len(b) {
  1185. err = fmt.Errorf("short write: tried to write %d but only %d written", len(b), n)
  1186. }
  1187. item.mu.Lock()
  1188. item._written(off, int64(n))
  1189. if n > 0 {
  1190. item._dirty()
  1191. }
  1192. end := off + int64(n)
  1193. // Writing off the end of the file so need to make some
  1194. // zeroes. we do this by showing that we have written to the
  1195. // new parts of the file.
  1196. if off > item.info.Size {
  1197. item._written(item.info.Size, off-item.info.Size)
  1198. item._dirty()
  1199. }
  1200. // Update size
  1201. if end > item.info.Size {
  1202. item.info.Size = end
  1203. }
  1204. item.mu.Unlock()
  1205. return n, err
  1206. }
  1207. // WriteAtNoOverwrite writes b to the file, but will not overwrite
  1208. // already present ranges.
  1209. //
  1210. // This is used by the downloader to write bytes to the file.
  1211. //
  1212. // It returns n the total bytes processed and skipped the number of
  1213. // bytes which were processed but not actually written to the file.
  1214. func (item *Item) WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) {
  1215. item.mu.Lock()
  1216. var (
  1217. // Range we wish to write
  1218. r = ranges.Range{Pos: off, Size: int64(len(b))}
  1219. // Ranges that we need to write
  1220. foundRanges = item.info.Rs.FindAll(r)
  1221. // Length of each write
  1222. nn int
  1223. )
  1224. // Write the range out ignoring already written chunks
  1225. // fs.Debugf(item.name, "Ranges = %v", item.info.Rs)
  1226. for i := range foundRanges {
  1227. foundRange := &foundRanges[i]
  1228. // fs.Debugf(item.name, "foundRange[%d] = %v", i, foundRange)
  1229. if foundRange.R.Pos != off {
  1230. err = errors.New("internal error: offset of range is wrong")
  1231. break
  1232. }
  1233. size := int(foundRange.R.Size)
  1234. if foundRange.Present {
  1235. // if present want to skip this range
  1236. // fs.Debugf(item.name, "skip chunk offset=%d size=%d", off, size)
  1237. nn = size
  1238. skipped += size
  1239. } else {
  1240. // if range not present then we want to write it
  1241. // fs.Debugf(item.name, "write chunk offset=%d size=%d", off, size)
  1242. nn, err = item.fd.WriteAt(b[:size], off)
  1243. if err == nil && nn != size {
  1244. err = fmt.Errorf("downloader: short write: tried to write %d but only %d written", size, nn)
  1245. }
  1246. item._written(off, int64(nn))
  1247. }
  1248. off += int64(nn)
  1249. b = b[nn:]
  1250. n += nn
  1251. if err != nil {
  1252. break
  1253. }
  1254. }
  1255. item.mu.Unlock()
  1256. return n, skipped, err
  1257. }
  1258. // Sync commits the current contents of the file to stable storage. Typically,
  1259. // this means flushing the file system's in-memory copy of recently written
  1260. // data to disk.
  1261. func (item *Item) Sync() (err error) {
  1262. item.preAccess()
  1263. defer item.postAccess()
  1264. item.mu.Lock()
  1265. defer item.mu.Unlock()
  1266. if item.fd == nil {
  1267. return errors.New("vfs cache item sync: internal error: didn't Open file")
  1268. }
  1269. // sync the file and the metadata to disk
  1270. err = item.fd.Sync()
  1271. if err != nil {
  1272. return fmt.Errorf("vfs cache item sync: failed to sync file: %w", err)
  1273. }
  1274. err = item._save()
  1275. if err != nil {
  1276. return fmt.Errorf("vfs cache item sync: failed to sync metadata: %w", err)
  1277. }
  1278. return nil
  1279. }
  1280. // rename the item
  1281. func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) {
  1282. item.preAccess()
  1283. defer item.postAccess()
  1284. item.mu.Lock()
  1285. // stop downloader
  1286. downloaders := item.downloaders
  1287. item.downloaders = nil
  1288. // id for writeback cancel
  1289. id := item.writeBackID
  1290. // Set internal state
  1291. item.name = newName
  1292. item.o = newObj
  1293. // Rename cache file if it exists
  1294. err = rename(item.c.toOSPath(name), item.c.toOSPath(newName)) // No locking in Cache
  1295. // Rename meta file if it exists
  1296. err2 := rename(item.c.toOSPathMeta(name), item.c.toOSPathMeta(newName)) // No locking in Cache
  1297. if err2 != nil {
  1298. err = err2
  1299. }
  1300. item.mu.Unlock()
  1301. // close downloader and cancel writebacks with mutex unlocked
  1302. if downloaders != nil {
  1303. _ = downloaders.Close(nil)
  1304. }
  1305. item.c.writeback.Rename(id, newName)
  1306. return err
  1307. }