upload_memory.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package quatrix
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/rclone/rclone/fs"
  6. )
  7. // UploadMemoryManager dynamically calculates every chunk size for the transfer and increases or decreases it
  8. // depending on the upload speed. This makes general upload time smaller, because transfers that are faster
  9. // does not have to wait for the slower ones until they finish upload.
  10. type UploadMemoryManager struct {
  11. m sync.Mutex
  12. useDynamicSize bool
  13. shared int64
  14. reserved int64
  15. effectiveTime time.Duration
  16. fileUsage map[string]int64
  17. }
  18. // NewUploadMemoryManager is a constructor for UploadMemoryManager
  19. func NewUploadMemoryManager(ci *fs.ConfigInfo, opt *Options) *UploadMemoryManager {
  20. useDynamicSize := true
  21. sharedMemory := int64(opt.MaximalSummaryChunkSize) - int64(opt.MinimalChunkSize)*int64(ci.Transfers)
  22. if sharedMemory <= 0 {
  23. sharedMemory = 0
  24. useDynamicSize = false
  25. }
  26. return &UploadMemoryManager{
  27. useDynamicSize: useDynamicSize,
  28. shared: sharedMemory,
  29. reserved: int64(opt.MinimalChunkSize),
  30. effectiveTime: time.Duration(opt.EffectiveUploadTime),
  31. fileUsage: map[string]int64{},
  32. }
  33. }
  34. // Consume -- decide amount of memory to consume
  35. func (u *UploadMemoryManager) Consume(fileID string, neededMemory int64, speed float64) int64 {
  36. if !u.useDynamicSize {
  37. if neededMemory < u.reserved {
  38. return neededMemory
  39. }
  40. return u.reserved
  41. }
  42. u.m.Lock()
  43. defer u.m.Unlock()
  44. borrowed, found := u.fileUsage[fileID]
  45. if found {
  46. u.shared += borrowed
  47. borrowed = 0
  48. }
  49. defer func() { u.fileUsage[fileID] = borrowed }()
  50. effectiveChunkSize := int64(speed * u.effectiveTime.Seconds())
  51. if effectiveChunkSize < u.reserved {
  52. effectiveChunkSize = u.reserved
  53. }
  54. if neededMemory < effectiveChunkSize {
  55. effectiveChunkSize = neededMemory
  56. }
  57. if effectiveChunkSize <= u.reserved {
  58. return effectiveChunkSize
  59. }
  60. toBorrow := effectiveChunkSize - u.reserved
  61. if toBorrow <= u.shared {
  62. u.shared -= toBorrow
  63. borrowed = toBorrow
  64. return effectiveChunkSize
  65. }
  66. borrowed = u.shared
  67. u.shared = 0
  68. return borrowed + u.reserved
  69. }
  70. // Return returns consumed memory for the previous chunk upload to the memory pool
  71. func (u *UploadMemoryManager) Return(fileID string) {
  72. if !u.useDynamicSize {
  73. return
  74. }
  75. u.m.Lock()
  76. defer u.m.Unlock()
  77. borrowed, found := u.fileUsage[fileID]
  78. if !found {
  79. return
  80. }
  81. u.shared += borrowed
  82. delete(u.fileUsage, fileID)
  83. }