blockqueue.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package scanner
  7. import (
  8. "context"
  9. "errors"
  10. "github.com/syncthing/syncthing/lib/fs"
  11. "github.com/syncthing/syncthing/lib/protocol"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. )
  14. // HashFile hashes the files and returns a list of blocks representing the file.
  15. func HashFile(ctx context.Context, folderID string, fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
  16. fd, err := fs.Open(path)
  17. if err != nil {
  18. l.Debugln("open:", err)
  19. return nil, err
  20. }
  21. defer fd.Close()
  22. // Get the size and modtime of the file before we start hashing it.
  23. fi, err := fd.Stat()
  24. if err != nil {
  25. l.Debugln("stat before:", err)
  26. return nil, err
  27. }
  28. size := fi.Size()
  29. modTime := fi.ModTime()
  30. // Hash the file. This may take a while for large files.
  31. blocks, err := Blocks(ctx, fd, blockSize, size, counter, useWeakHashes)
  32. if err != nil {
  33. l.Debugln("blocks:", err)
  34. return nil, err
  35. }
  36. metricHashedBytes.WithLabelValues(folderID).Add(float64(size))
  37. // Recheck the size and modtime again. If they differ, the file changed
  38. // while we were reading it and our hash results are invalid.
  39. fi, err = fd.Stat()
  40. if err != nil {
  41. l.Debugln("stat after:", err)
  42. return nil, err
  43. }
  44. if size != fi.Size() || !modTime.Equal(fi.ModTime()) {
  45. return nil, errors.New("file changed during hashing")
  46. }
  47. return blocks, nil
  48. }
  49. // The parallel hasher reads FileInfo structures from the inbox, hashes the
  50. // file to populate the Blocks element and sends it to the outbox. A number of
  51. // workers are used in parallel. The outbox will become closed when the inbox
  52. // is closed and all items handled.
  53. type parallelHasher struct {
  54. folderID string
  55. fs fs.Filesystem
  56. outbox chan<- ScanResult
  57. inbox <-chan protocol.FileInfo
  58. counter Counter
  59. done chan<- struct{}
  60. wg sync.WaitGroup
  61. }
  62. func newParallelHasher(ctx context.Context, folderID string, fs fs.Filesystem, workers int, outbox chan<- ScanResult, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}) {
  63. ph := &parallelHasher{
  64. folderID: folderID,
  65. fs: fs,
  66. outbox: outbox,
  67. inbox: inbox,
  68. counter: counter,
  69. done: done,
  70. wg: sync.NewWaitGroup(),
  71. }
  72. ph.wg.Add(workers)
  73. for i := 0; i < workers; i++ {
  74. go ph.hashFiles(ctx)
  75. }
  76. go ph.closeWhenDone()
  77. }
  78. func (ph *parallelHasher) hashFiles(ctx context.Context) {
  79. defer ph.wg.Done()
  80. for {
  81. select {
  82. case f, ok := <-ph.inbox:
  83. if !ok {
  84. return
  85. }
  86. l.Debugln("started hashing:", f)
  87. if f.IsDirectory() || f.IsDeleted() {
  88. panic("Bug. Asked to hash a directory or a deleted file.")
  89. }
  90. blocks, err := HashFile(ctx, ph.folderID, ph.fs, f.Name, f.BlockSize(), ph.counter, true)
  91. if err != nil {
  92. handleError(ctx, "hashing", f.Name, err, ph.outbox)
  93. continue
  94. }
  95. f.Blocks = blocks
  96. f.BlocksHash = protocol.BlocksHash(blocks)
  97. // The size we saw when initially deciding to hash the file
  98. // might not have been the size it actually had when we hashed
  99. // it. Update the size from the block list.
  100. f.Size = 0
  101. for _, b := range blocks {
  102. f.Size += int64(b.Size)
  103. }
  104. l.Debugln("completed hashing:", f)
  105. select {
  106. case ph.outbox <- ScanResult{File: f}:
  107. case <-ctx.Done():
  108. return
  109. }
  110. case <-ctx.Done():
  111. return
  112. }
  113. }
  114. }
  115. func (ph *parallelHasher) closeWhenDone() {
  116. ph.wg.Wait()
  117. // In case the hasher aborted on context, wait for filesystem
  118. // walking/progress routine to finish.
  119. for range ph.inbox {
  120. }
  121. if ph.done != nil {
  122. close(ph.done)
  123. }
  124. close(ph.outbox)
  125. }