limiter_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // Copyright (C) 2017 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "bytes"
  9. "context"
  10. crand "crypto/rand"
  11. "io"
  12. "math/rand"
  13. "sync/atomic"
  14. "testing"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/protocol"
  18. "golang.org/x/time/rate"
  19. )
  20. var device1, device2, device3, device4 protocol.DeviceID
  21. var dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration
  22. func init() {
  23. device1, _ = protocol.DeviceIDFromString("AIR6LPZ7K4PTTUXQSMUUCPQ5YWOEDFIIQJUG7772YQXXR5YD6AWQ")
  24. device2, _ = protocol.DeviceIDFromString("GYRZZQB-IRNPV4Z-T7TC52W-EQYJ3TT-FDQW6MW-DFLMU42-SSSU6EM-FBK2VAY")
  25. device3, _ = protocol.DeviceIDFromString("LGFPDIT-7SKNNJL-VJZA4FC-7QNCRKA-CE753K7-2BW5QDK-2FOZ7FR-FEP57QJ")
  26. device4, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2")
  27. }
  28. func newDeviceConfiguration(w config.Wrapper, id protocol.DeviceID, name string) config.DeviceConfiguration {
  29. cfg := w.DefaultDevice()
  30. cfg.DeviceID = id
  31. cfg.Name = name
  32. return cfg
  33. }
  34. func initConfig() (config.Wrapper, context.CancelFunc) {
  35. wrapper := config.Wrap("/dev/null", config.New(device1), device1, events.NoopLogger)
  36. dev1Conf = newDeviceConfiguration(wrapper, device1, "device1")
  37. dev2Conf = newDeviceConfiguration(wrapper, device2, "device2")
  38. dev3Conf = newDeviceConfiguration(wrapper, device3, "device3")
  39. dev4Conf = newDeviceConfiguration(wrapper, device4, "device4")
  40. ctx, cancel := context.WithCancel(context.Background())
  41. go wrapper.Serve(ctx)
  42. dev2Conf.MaxRecvKbps = rand.Int() % 100000
  43. dev2Conf.MaxSendKbps = rand.Int() % 100000
  44. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  45. cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf})
  46. })
  47. waiter.Wait()
  48. return wrapper, cancel
  49. }
  50. func TestLimiterInit(t *testing.T) {
  51. wrapper, wrapperCancel := initConfig()
  52. defer wrapperCancel()
  53. lim := newLimiter(device1, wrapper)
  54. device2ReadLimit := dev2Conf.MaxRecvKbps
  55. device2WriteLimit := dev2Conf.MaxSendKbps
  56. expectedR := map[protocol.DeviceID]*rate.Limiter{
  57. device2: rate.NewLimiter(rate.Limit(device2ReadLimit*1024), limiterBurstSize),
  58. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  59. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  60. }
  61. expectedW := map[protocol.DeviceID]*rate.Limiter{
  62. device2: rate.NewLimiter(rate.Limit(device2WriteLimit*1024), limiterBurstSize),
  63. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  64. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  65. }
  66. actualR := lim.deviceReadLimiters
  67. actualW := lim.deviceWriteLimiters
  68. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  69. }
  70. func TestSetDeviceLimits(t *testing.T) {
  71. wrapper, wrapperCancel := initConfig()
  72. defer wrapperCancel()
  73. lim := newLimiter(device1, wrapper)
  74. // should still be inf/inf because this is local device
  75. dev1ReadLimit := rand.Int() % 100000
  76. dev1WriteLimit := rand.Int() % 100000
  77. dev1Conf.MaxRecvKbps = dev1ReadLimit
  78. dev1Conf.MaxSendKbps = dev1WriteLimit
  79. dev2ReadLimit := rand.Int() % 100000
  80. dev2WriteLimit := rand.Int() % 100000
  81. dev2Conf.MaxRecvKbps = dev2ReadLimit
  82. dev2Conf.MaxSendKbps = dev2WriteLimit
  83. dev3ReadLimit := rand.Int() % 10000
  84. dev3Conf.MaxRecvKbps = dev3ReadLimit
  85. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  86. cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf})
  87. })
  88. waiter.Wait()
  89. expectedR := map[protocol.DeviceID]*rate.Limiter{
  90. device2: rate.NewLimiter(rate.Limit(dev2ReadLimit*1024), limiterBurstSize),
  91. device3: rate.NewLimiter(rate.Limit(dev3ReadLimit*1024), limiterBurstSize),
  92. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  93. }
  94. expectedW := map[protocol.DeviceID]*rate.Limiter{
  95. device2: rate.NewLimiter(rate.Limit(dev2WriteLimit*1024), limiterBurstSize),
  96. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  97. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  98. }
  99. actualR := lim.deviceReadLimiters
  100. actualW := lim.deviceWriteLimiters
  101. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  102. }
  103. func TestRemoveDevice(t *testing.T) {
  104. wrapper, wrapperCancel := initConfig()
  105. defer wrapperCancel()
  106. lim := newLimiter(device1, wrapper)
  107. waiter, _ := wrapper.RemoveDevice(device3)
  108. waiter.Wait()
  109. expectedR := map[protocol.DeviceID]*rate.Limiter{
  110. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  111. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  112. }
  113. expectedW := map[protocol.DeviceID]*rate.Limiter{
  114. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  115. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  116. }
  117. actualR := lim.deviceReadLimiters
  118. actualW := lim.deviceWriteLimiters
  119. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  120. }
  121. func TestAddDevice(t *testing.T) {
  122. wrapper, wrapperCancel := initConfig()
  123. defer wrapperCancel()
  124. lim := newLimiter(device1, wrapper)
  125. addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU")
  126. addDevConf := newDeviceConfiguration(wrapper, addedDevice, "addedDevice")
  127. addDevConf.MaxRecvKbps = 120
  128. addDevConf.MaxSendKbps = 240
  129. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  130. cfg.SetDevice(addDevConf)
  131. })
  132. waiter.Wait()
  133. expectedR := map[protocol.DeviceID]*rate.Limiter{
  134. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  135. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  136. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  137. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize),
  138. }
  139. expectedW := map[protocol.DeviceID]*rate.Limiter{
  140. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  141. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  142. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  143. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize),
  144. }
  145. actualR := lim.deviceReadLimiters
  146. actualW := lim.deviceWriteLimiters
  147. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  148. }
  149. func TestAddAndRemove(t *testing.T) {
  150. wrapper, wrapperCancel := initConfig()
  151. defer wrapperCancel()
  152. lim := newLimiter(device1, wrapper)
  153. addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU")
  154. addDevConf := newDeviceConfiguration(wrapper, addedDevice, "addedDevice")
  155. addDevConf.MaxRecvKbps = 120
  156. addDevConf.MaxSendKbps = 240
  157. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  158. cfg.SetDevice(addDevConf)
  159. })
  160. waiter.Wait()
  161. waiter, _ = wrapper.RemoveDevice(device3)
  162. waiter.Wait()
  163. expectedR := map[protocol.DeviceID]*rate.Limiter{
  164. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  165. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  166. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize),
  167. }
  168. expectedW := map[protocol.DeviceID]*rate.Limiter{
  169. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  170. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  171. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize),
  172. }
  173. actualR := lim.deviceReadLimiters
  174. actualW := lim.deviceWriteLimiters
  175. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  176. }
  177. func TestLimitedWriterWrite(t *testing.T) {
  178. // Check that the limited writer writes the correct data in the correct manner.
  179. // A buffer with random data that is larger than the write size and not
  180. // a precise multiple either.
  181. src := make([]byte, int(12.5*8192))
  182. if _, err := crand.Reader.Read(src); err != nil {
  183. t.Fatal(err)
  184. }
  185. // Write it to the destination using a limited writer, with a wrapper to
  186. // count the write calls. The defaults on the limited writer should mean
  187. // it is used (and doesn't take the fast path). In practice the limiter
  188. // won't delay the test as the burst size is large enough to accommodate
  189. // regardless of the rate.
  190. dst := new(bytes.Buffer)
  191. cw := &countingWriter{w: dst}
  192. lw := &limitedWriter{
  193. writer: cw,
  194. waiterHolder: waiterHolder{
  195. waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  196. limitsLAN: new(atomic.Bool),
  197. isLAN: false, // enables limiting
  198. },
  199. }
  200. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  201. t.Fatal(err)
  202. }
  203. // Verify there were lots of writes (we expect one kilobyte write size
  204. // for the very low rate in this test) and that the end result is
  205. // identical.
  206. if cw.writeCount < 10*8 {
  207. t.Error("expected lots of smaller writes")
  208. }
  209. if cw.writeCount > 15*8 {
  210. t.Error("expected fewer larger writes")
  211. }
  212. if !bytes.Equal(src, dst.Bytes()) {
  213. t.Error("results should be equal")
  214. }
  215. // Write it to the destination using a limited writer, with a wrapper to
  216. // count the write calls. Now we make sure the fast path is used.
  217. dst = new(bytes.Buffer)
  218. cw = &countingWriter{w: dst}
  219. lw = &limitedWriter{
  220. writer: cw,
  221. waiterHolder: waiterHolder{
  222. waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  223. limitsLAN: new(atomic.Bool),
  224. isLAN: true, // disables limiting
  225. },
  226. }
  227. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  228. t.Fatal(err)
  229. }
  230. // Verify there were a single write and that the end result is identical.
  231. if cw.writeCount != 1 {
  232. t.Error("expected just the one write")
  233. }
  234. if !bytes.Equal(src, dst.Bytes()) {
  235. t.Error("results should be equal")
  236. }
  237. // Once more, but making sure the fast path is used for an unlimited
  238. // rate, with multiple unlimited raters even (global and per-device).
  239. dst = new(bytes.Buffer)
  240. cw = &countingWriter{w: dst}
  241. lw = &limitedWriter{
  242. writer: cw,
  243. waiterHolder: waiterHolder{
  244. waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
  245. limitsLAN: new(atomic.Bool),
  246. isLAN: false, // enables limiting
  247. },
  248. }
  249. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  250. t.Fatal(err)
  251. }
  252. // Verify there were a single write and that the end result is identical.
  253. if cw.writeCount != 1 {
  254. t.Error("expected just the one write")
  255. }
  256. if !bytes.Equal(src, dst.Bytes()) {
  257. t.Error("results should be equal")
  258. }
  259. // Once more, but making sure we *don't* take the fast path when there
  260. // is a combo of limited and unlimited writers.
  261. dst = new(bytes.Buffer)
  262. cw = &countingWriter{w: dst}
  263. lw = &limitedWriter{
  264. writer: cw,
  265. waiterHolder: waiterHolder{
  266. waiter: totalWaiter{
  267. rate.NewLimiter(rate.Inf, limiterBurstSize),
  268. rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  269. rate.NewLimiter(rate.Inf, limiterBurstSize),
  270. },
  271. limitsLAN: new(atomic.Bool),
  272. isLAN: false, // enables limiting
  273. },
  274. }
  275. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  276. t.Fatal(err)
  277. }
  278. // Verify there were lots of writes and that the end result is identical.
  279. if cw.writeCount < 10*8 {
  280. t.Error("expected lots of smaller writes")
  281. }
  282. if cw.writeCount > 15*8 {
  283. t.Error("expected fewer larger writes")
  284. }
  285. if !bytes.Equal(src, dst.Bytes()) {
  286. t.Error("results should be equal")
  287. }
  288. }
  289. func TestTotalWaiterLimit(t *testing.T) {
  290. cases := []struct {
  291. w waiter
  292. r rate.Limit
  293. }{
  294. {
  295. totalWaiter{},
  296. rate.Inf,
  297. },
  298. {
  299. totalWaiter{rate.NewLimiter(rate.Inf, 42)},
  300. rate.Inf,
  301. },
  302. {
  303. totalWaiter{rate.NewLimiter(rate.Inf, 42), rate.NewLimiter(rate.Inf, 42)},
  304. rate.Inf,
  305. },
  306. {
  307. totalWaiter{rate.NewLimiter(rate.Inf, 42), rate.NewLimiter(rate.Limit(12), 42), rate.NewLimiter(rate.Limit(15), 42)},
  308. rate.Limit(12),
  309. },
  310. }
  311. for _, tc := range cases {
  312. l := tc.w.Limit()
  313. if l != tc.r {
  314. t.Error("incorrect limit returned")
  315. }
  316. }
  317. }
  318. func checkActualAndExpected(t *testing.T, actualR, actualW, expectedR, expectedW map[protocol.DeviceID]*rate.Limiter) {
  319. t.Helper()
  320. if len(expectedW) != len(actualW) || len(expectedR) != len(actualR) {
  321. t.Errorf("Map lengths differ!")
  322. }
  323. for key, val := range expectedR {
  324. if _, ok := actualR[key]; !ok {
  325. t.Errorf("Device %s not found in limiter", key)
  326. }
  327. if val.Limit() != actualR[key].Limit() {
  328. t.Errorf("Read limits for device %s differ actual: %f, expected: %f", key, actualR[key].Limit(), val.Limit())
  329. }
  330. if expectedW[key].Limit() != actualW[key].Limit() {
  331. t.Errorf("Write limits for device %s differ actual: %f, expected: %f", key, actualW[key].Limit(), expectedW[key].Limit())
  332. }
  333. }
  334. }
  335. type countingWriter struct {
  336. w io.Writer
  337. writeCount int
  338. }
  339. func (w *countingWriter) Write(data []byte) (int, error) {
  340. w.writeCount++
  341. return w.w.Write(data)
  342. }