filemonitor.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package main
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/calmh/syncthing/buffers"
  12. )
  13. type fileMonitor struct {
  14. name string // in-repo name
  15. path string // full path
  16. writeDone sync.WaitGroup
  17. model *Model
  18. global File
  19. localBlocks []Block
  20. copyError error
  21. writeError error
  22. }
  23. func (m *fileMonitor) FileBegins(cc <-chan content) error {
  24. if m.model.trace["file"] {
  25. log.Printf("FILE: FileBegins: " + m.name)
  26. }
  27. tmp := tempName(m.path, m.global.Modified)
  28. dir := path.Dir(tmp)
  29. _, err := os.Stat(dir)
  30. if err != nil && os.IsNotExist(err) {
  31. err = os.MkdirAll(dir, 0777)
  32. if err != nil {
  33. return err
  34. }
  35. }
  36. outFile, err := os.Create(tmp)
  37. if err != nil {
  38. return err
  39. }
  40. m.writeDone.Add(1)
  41. var writeWg sync.WaitGroup
  42. if len(m.localBlocks) > 0 {
  43. writeWg.Add(1)
  44. inFile, err := os.Open(m.path)
  45. if err != nil {
  46. return err
  47. }
  48. // Copy local blocks, close infile when done
  49. go m.copyLocalBlocks(inFile, outFile, &writeWg)
  50. }
  51. // Write remote blocks,
  52. writeWg.Add(1)
  53. go m.copyRemoteBlocks(cc, outFile, &writeWg)
  54. // Wait for both writing routines, then close the outfile
  55. go func() {
  56. writeWg.Wait()
  57. outFile.Close()
  58. m.writeDone.Done()
  59. }()
  60. return nil
  61. }
  62. func (m *fileMonitor) copyLocalBlocks(inFile, outFile *os.File, writeWg *sync.WaitGroup) {
  63. defer inFile.Close()
  64. defer writeWg.Done()
  65. var buf = buffers.Get(BlockSize)
  66. defer buffers.Put(buf)
  67. for _, lb := range m.localBlocks {
  68. buf = buf[:lb.Size]
  69. _, err := inFile.ReadAt(buf, lb.Offset)
  70. if err != nil {
  71. m.copyError = err
  72. return
  73. }
  74. _, err = outFile.WriteAt(buf, lb.Offset)
  75. if err != nil {
  76. m.copyError = err
  77. return
  78. }
  79. }
  80. }
  81. func (m *fileMonitor) copyRemoteBlocks(cc <-chan content, outFile *os.File, writeWg *sync.WaitGroup) {
  82. defer writeWg.Done()
  83. for content := range cc {
  84. _, err := outFile.WriteAt(content.data, content.offset)
  85. buffers.Put(content.data)
  86. if err != nil {
  87. m.writeError = err
  88. return
  89. }
  90. }
  91. }
  92. func (m *fileMonitor) FileDone() error {
  93. if m.model.trace["file"] {
  94. log.Printf("FILE: FileDone: " + m.name)
  95. }
  96. m.writeDone.Wait()
  97. tmp := tempName(m.path, m.global.Modified)
  98. defer os.Remove(tmp)
  99. if m.copyError != nil {
  100. return m.copyError
  101. }
  102. if m.writeError != nil {
  103. return m.writeError
  104. }
  105. err := hashCheck(tmp, m.global.Blocks)
  106. if err != nil {
  107. return err
  108. }
  109. err = os.Chtimes(tmp, time.Unix(m.global.Modified, 0), time.Unix(m.global.Modified, 0))
  110. if err != nil {
  111. return err
  112. }
  113. err = os.Chmod(tmp, os.FileMode(m.global.Flags&0777))
  114. if err != nil {
  115. return err
  116. }
  117. err = os.Rename(tmp, m.path)
  118. if err != nil {
  119. return err
  120. }
  121. m.model.updateLocal(m.global)
  122. return nil
  123. }
  124. func hashCheck(name string, correct []Block) error {
  125. rf, err := os.Open(name)
  126. if err != nil {
  127. return err
  128. }
  129. defer rf.Close()
  130. current, err := Blocks(rf, BlockSize)
  131. if err != nil {
  132. return err
  133. }
  134. if len(current) != len(correct) {
  135. return errors.New("incorrect number of blocks")
  136. }
  137. for i := range current {
  138. if bytes.Compare(current[i].Hash, correct[i].Hash) != 0 {
  139. return fmt.Errorf("hash mismatch: %x != %x", current[i], correct[i])
  140. }
  141. }
  142. return nil
  143. }