diskstore.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. // Copyright (C) 2023 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "bytes"
  9. "compress/gzip"
  10. "context"
  11. "io"
  12. "log"
  13. "math"
  14. "os"
  15. "path/filepath"
  16. "sort"
  17. "time"
  18. )
  19. type diskStore struct {
  20. dir string
  21. inbox chan diskEntry
  22. maxBytes int64
  23. maxFiles int
  24. currentFiles []currentFile
  25. currentSize int64
  26. }
  27. type diskEntry struct {
  28. path string
  29. data []byte
  30. }
  31. type currentFile struct {
  32. path string
  33. size int64
  34. mtime int64
  35. }
  36. func (d *diskStore) Serve(ctx context.Context) {
  37. if err := os.MkdirAll(d.dir, 0o700); err != nil {
  38. log.Println("Creating directory:", err)
  39. return
  40. }
  41. if err := d.inventory(); err != nil {
  42. log.Println("Failed to inventory disk store:", err)
  43. }
  44. d.clean()
  45. cleanTimer := time.NewTicker(time.Minute)
  46. inventoryTimer := time.NewTicker(24 * time.Hour)
  47. buf := new(bytes.Buffer)
  48. gw := gzip.NewWriter(buf)
  49. for {
  50. select {
  51. case entry := <-d.inbox:
  52. path := d.fullPath(entry.path)
  53. if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
  54. log.Println("Creating directory:", err)
  55. continue
  56. }
  57. buf.Reset()
  58. gw.Reset(buf)
  59. if _, err := gw.Write(entry.data); err != nil {
  60. log.Println("Failed to compress crash report:", err)
  61. continue
  62. }
  63. if err := gw.Close(); err != nil {
  64. log.Println("Failed to compress crash report:", err)
  65. continue
  66. }
  67. if err := os.WriteFile(path, buf.Bytes(), 0o600); err != nil {
  68. log.Printf("Failed to write %s: %v", entry.path, err)
  69. _ = os.Remove(path)
  70. continue
  71. }
  72. d.currentSize += int64(buf.Len())
  73. d.currentFiles = append(d.currentFiles, currentFile{
  74. size: int64(len(entry.data)),
  75. path: path,
  76. })
  77. case <-cleanTimer.C:
  78. d.clean()
  79. case <-inventoryTimer.C:
  80. if err := d.inventory(); err != nil {
  81. log.Println("Failed to inventory disk store:", err)
  82. }
  83. case <-ctx.Done():
  84. return
  85. }
  86. }
  87. }
  88. func (d *diskStore) Put(path string, data []byte) bool {
  89. select {
  90. case d.inbox <- diskEntry{
  91. path: path,
  92. data: data,
  93. }:
  94. return true
  95. default:
  96. return false
  97. }
  98. }
  99. func (d *diskStore) Get(path string) ([]byte, error) {
  100. path = d.fullPath(path)
  101. bs, err := os.ReadFile(path)
  102. if err != nil {
  103. return nil, err
  104. }
  105. gr, err := gzip.NewReader(bytes.NewReader(bs))
  106. if err != nil {
  107. return nil, err
  108. }
  109. defer gr.Close()
  110. return io.ReadAll(gr)
  111. }
  112. func (d *diskStore) Exists(path string) bool {
  113. path = d.fullPath(path)
  114. _, err := os.Lstat(path)
  115. return err == nil
  116. }
  117. func (d *diskStore) clean() {
  118. for len(d.currentFiles) > 0 && (len(d.currentFiles) > d.maxFiles || d.currentSize > d.maxBytes) {
  119. f := d.currentFiles[0]
  120. log.Println("Removing", f.path)
  121. if err := os.Remove(f.path); err != nil {
  122. log.Println("Failed to remove file:", err)
  123. }
  124. d.currentFiles = d.currentFiles[1:]
  125. d.currentSize -= f.size
  126. }
  127. var oldest time.Duration
  128. if len(d.currentFiles) > 0 {
  129. oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute)
  130. }
  131. metricDiskstoreFilesTotal.Set(float64(len(d.currentFiles)))
  132. metricDiskstoreBytesTotal.Set(float64(d.currentSize))
  133. metricDiskstoreOldestAgeSeconds.Set(math.Round(oldest.Seconds()))
  134. log.Printf("Clean complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest)
  135. }
  136. func (d *diskStore) inventory() error {
  137. d.currentFiles = nil
  138. d.currentSize = 0
  139. err := filepath.Walk(d.dir, func(path string, info os.FileInfo, err error) error {
  140. if err != nil {
  141. return err
  142. }
  143. if info.IsDir() {
  144. return nil
  145. }
  146. if filepath.Ext(path) != ".gz" {
  147. return nil
  148. }
  149. d.currentSize += info.Size()
  150. d.currentFiles = append(d.currentFiles, currentFile{
  151. path: path,
  152. size: info.Size(),
  153. mtime: info.ModTime().Unix(),
  154. })
  155. return nil
  156. })
  157. sort.Slice(d.currentFiles, func(i, j int) bool {
  158. return d.currentFiles[i].mtime < d.currentFiles[j].mtime
  159. })
  160. var oldest time.Duration
  161. if len(d.currentFiles) > 0 {
  162. oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute)
  163. }
  164. metricDiskstoreFilesTotal.Set(float64(len(d.currentFiles)))
  165. metricDiskstoreBytesTotal.Set(float64(d.currentSize))
  166. metricDiskstoreOldestAgeSeconds.Set(math.Round(oldest.Seconds()))
  167. log.Printf("Inventory complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest)
  168. return err
  169. }
  170. func (d *diskStore) fullPath(path string) string {
  171. return filepath.Join(d.dir, path[0:2], path[2:]) + ".gz"
  172. }