StreamerTests.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include <AzCore/UnitTest/TestTypes.h>
  9. #include <FileIOBaseTestTypes.h>
  10. #include <AzCore/IO/CompressionBus.h>
  11. #include <AzCore/IO/FileIO.h>
  12. #include <AzCore/IO/Streamer/FileRequest.h>
  13. #include <AzCore/IO/Streamer/Streamer.h>
  14. #include <AzCore/IO/Streamer/StreamerComponent.h>
  15. #include <AzCore/IO/Streamer/StreamerConfiguration.h>
  16. #include <AzCore/std/containers/vector.h>
  17. #include <AzCore/std/parallel/atomic.h>
  18. #include <AzCore/std/parallel/binary_semaphore.h>
  19. #include <AzCore/std/parallel/thread.h>
  20. #include <AzCore/std/string/string.h>
  21. #include <AzTest/GemTestEnvironment.h>
  22. namespace AZ::IO
  23. {
  24. namespace Utils
  25. {
  26. //! Create a test file that stores 4 byte integers starting at 0 and incrementing.
  27. //! @filename The name of the file to write to.
  28. //! @filesize The size the new file needs to be in bytes. The stored values will continue till fileSize / 4.
  29. //! @paddingSize The amount of data to insert before and after the file. In total paddingSize / 4 integers
  30. //! will be added. The prefix will be marked with "0xdeadbeef" and the postfix with "0xd15ea5ed".
  31. static void CreateTestFile(const AZStd::string& name, size_t fileSize, size_t paddingSize)
  32. {
  33. constexpr size_t bufferByteSize = 1_mib;
  34. constexpr size_t bufferSize = bufferByteSize / sizeof(u32);
  35. u32* buffer = new u32[bufferSize];
  36. AZ_Assert(paddingSize < bufferByteSize, "Padding can't currently be larger than %i bytes.", bufferByteSize);
  37. size_t paddingCount = paddingSize / sizeof(u32);
  38. FileIOStream stream(name.c_str(), OpenMode::ModeWrite | OpenMode::ModeBinary);
  39. // Write pre-padding
  40. for (size_t i = 0; i < paddingCount; ++i)
  41. {
  42. buffer[i] = 0xdeadbeef;
  43. }
  44. stream.Write(paddingSize, buffer);
  45. // Write content
  46. u32 startIndex = 0;
  47. while (fileSize > bufferByteSize)
  48. {
  49. for (u32 i = 0; i < bufferSize; ++i)
  50. {
  51. buffer[i] = startIndex + i;
  52. }
  53. startIndex += bufferSize;
  54. stream.Write(bufferByteSize, buffer);
  55. fileSize -= bufferByteSize;
  56. }
  57. for (u32 i = 0; i < bufferSize; ++i)
  58. {
  59. buffer[i] = startIndex + i;
  60. }
  61. stream.Write(fileSize, buffer);
  62. // Write post-padding
  63. for (size_t i = 0; i < paddingCount; ++i)
  64. {
  65. buffer[i] = 0xd15ea5ed;
  66. }
  67. stream.Write(paddingSize, buffer);
  68. delete[] buffer;
  69. }
  70. }
  71. struct DedicatedCache_Uncompressed {};
  72. struct GlobalCache_Uncompressed {};
  73. struct DedicatedCache_Compressed {};
  74. struct GlobalCache_Compressed {};
  75. enum class PadArchive : bool
  76. {
  77. Yes,
  78. No
  79. };
  80. class MockFileBase
  81. {
  82. public:
  83. virtual ~MockFileBase() = default;
  84. virtual void CreateTestFile(AZ::IO::PathView filePath, size_t fileSize, PadArchive padding) = 0;
  85. virtual AZ::IO::PathView GetFileName() const = 0;
  86. };
  87. class MockUncompressedFile
  88. : public MockFileBase
  89. {
  90. public:
  91. ~MockUncompressedFile() override
  92. {
  93. if (m_hasFile)
  94. {
  95. FileIOBase::GetInstance()->DestroyPath(m_filePath.c_str());
  96. }
  97. }
  98. void CreateTestFile(AZ::IO::PathView filePath, size_t fileSize, PadArchive) override
  99. {
  100. m_fileSize = fileSize;
  101. m_filePath = filePath;
  102. Utils::CreateTestFile(m_filePath.Native(), m_fileSize, 0);
  103. m_hasFile = true;
  104. }
  105. AZ::IO::PathView GetFileName() const override
  106. {
  107. return m_filePath;
  108. }
  109. private:
  110. AZ::IO::Path m_filePath;
  111. size_t m_fileSize = 0;
  112. bool m_hasFile = false;
  113. };
  114. class MockCompressedFile
  115. : public MockFileBase
  116. , public CompressionBus::Handler
  117. {
  118. public:
  119. static constexpr uint32_t s_tag = static_cast<uint32_t>('T') << 24 | static_cast<uint32_t>('E') << 16 | static_cast<uint32_t>('S') << 8 | static_cast<uint32_t>('T');
  120. static constexpr uint32_t s_paddingSize = 512; // Use this amount of bytes before and after a generated file as padding.
  121. ~MockCompressedFile() override
  122. {
  123. if (m_hasFile)
  124. {
  125. BusDisconnect();
  126. FileIOBase::GetInstance()->DestroyPath(m_filePath.c_str());
  127. }
  128. }
  129. void CreateTestFile(AZ::IO::PathView filePath, size_t fileSize, PadArchive padding) override
  130. {
  131. m_fileSize = fileSize;
  132. m_filePath = filePath;
  133. m_hasPadding = (padding == PadArchive::Yes);
  134. uint32_t paddingSize = s_paddingSize;
  135. Utils::CreateTestFile(m_filePath.Native(), m_fileSize / 2, m_hasPadding ? paddingSize : 0);
  136. m_hasFile = true;
  137. BusConnect();
  138. }
  139. AZ::IO::PathView GetFileName() const override
  140. {
  141. return m_filePath;
  142. }
  143. void Decompress(const AZ::IO::CompressionInfo& info, const void* compressed, size_t compressedSize,
  144. void* uncompressed, size_t uncompressedSize)
  145. {
  146. constexpr uint32_t tag = s_tag;
  147. ASSERT_EQ(info.m_compressionTag.m_code, tag);
  148. ASSERT_EQ(info.m_compressedSize, m_fileSize / 2);
  149. ASSERT_TRUE(info.m_isCompressed);
  150. uint32_t paddingSize = s_paddingSize;
  151. ASSERT_EQ(info.m_offset, m_hasPadding ? paddingSize : 0);
  152. ASSERT_EQ(info.m_uncompressedSize, m_fileSize);
  153. // Check the input
  154. ASSERT_EQ(compressedSize, m_fileSize / 2);
  155. const u32* values = reinterpret_cast<const u32*>(compressed);
  156. const size_t numValues = compressedSize / sizeof(u32);
  157. for (size_t i = 0; i < numValues; ++i)
  158. {
  159. EXPECT_EQ(values[i], i);
  160. }
  161. // Create the fake uncompressed data.
  162. ASSERT_EQ(uncompressedSize, m_fileSize);
  163. u32* output = reinterpret_cast<u32*>(uncompressed);
  164. size_t outputSize = uncompressedSize / sizeof(u32);
  165. for (size_t i = 0; i < outputSize; ++i)
  166. {
  167. output[i] = static_cast<u32>(i);
  168. }
  169. }
  170. //@{ CompressionBus Handler implementation.
  171. void FindCompressionInfo(bool& found, AZ::IO::CompressionInfo& info, const AZ::IO::PathView filePath) override
  172. {
  173. if (m_hasFile && m_filePath == filePath)
  174. {
  175. found = true;
  176. info.m_archiveFilename = RequestPath(m_filePath);
  177. ASSERT_TRUE(info.m_archiveFilename.IsValid());
  178. info.m_compressedSize = m_fileSize / 2;
  179. const uint32_t tag = s_tag;
  180. info.m_compressionTag.m_code = tag;
  181. info.m_isCompressed = true;
  182. uint32_t paddingSize = s_paddingSize;
  183. info.m_offset = m_hasPadding ? paddingSize : 0;
  184. info.m_uncompressedSize = m_fileSize;
  185. info.m_decompressor =
  186. [this](const AZ::IO::CompressionInfo& info, const void* compressed,
  187. size_t compressedSize, void* uncompressed, size_t uncompressedSize) -> bool
  188. {
  189. Decompress(info, compressed, compressedSize, uncompressed, uncompressedSize);
  190. return true;
  191. };
  192. }
  193. }
  194. //@}
  195. private:
  196. AZ::IO::Path m_filePath;
  197. size_t m_fileSize = 0;
  198. bool m_hasFile = false;
  199. bool m_hasPadding = false;
  200. };
  201. class GemTestApplication
  202. : public AZ::ComponentApplication
  203. {
  204. public:
  205. // ComponentApplication
  206. void SetSettingsRegistrySpecializations(SettingsRegistryInterface::Specializations& specializations) override
  207. {
  208. ComponentApplication::SetSettingsRegistrySpecializations(specializations);
  209. specializations.Append("test");
  210. specializations.Append("gemtest");
  211. }
  212. };
  213. class StreamerTestBase
  214. : public UnitTest::LeakDetectionFixture
  215. {
  216. public:
  217. void SetUp() override
  218. {
  219. LeakDetectionFixture::SetUp();
  220. m_prevFileIO = FileIOBase::GetInstance();
  221. FileIOBase::SetInstance(&m_fileIO);
  222. m_application = aznew GemTestApplication();
  223. AZ::ComponentApplication::Descriptor appDesc;
  224. appDesc.m_useExistingAllocator = true;
  225. auto m_systemEntity = m_application->Create(appDesc);
  226. m_systemEntity->AddComponent(aznew AZ::StreamerComponent());
  227. m_systemEntity->Init();
  228. m_systemEntity->Activate();
  229. m_streamer = Interface<IO::IStreamer>::Get();
  230. }
  231. void TearDown() override
  232. {
  233. m_streamer = nullptr;
  234. m_application->Destroy();
  235. delete m_application;
  236. m_application = nullptr;
  237. FileIOBase::SetInstance(m_prevFileIO);
  238. LeakDetectionFixture::TearDown();
  239. }
  240. //! Requests are typically completed by Streamer before it updates it's internal bookkeeping.
  241. //! If a test depends on getting status information such as if cache files have been cleared
  242. //! then call WaitForScheduler to give Steamers scheduler some time to update it's internal status.
  243. void WaitForScheduler()
  244. {
  245. AZStd::this_thread::sleep_for(AZStd::chrono::microseconds(1));
  246. }
  247. protected:
  248. virtual AZStd::unique_ptr<MockFileBase> CreateMockFile() = 0;
  249. virtual bool IsUsingArchive() const = 0;
  250. virtual bool CreateDedicatedCache() const = 0;
  251. //! Create a test file that stores 4 byte integers starting at 0 and incrementing.
  252. //! @filesize The size the new file needs to be in bytes. The stored values will continue till fileSize / 4.
  253. //! @return The name of the test file.
  254. AZStd::unique_ptr<MockFileBase> CreateTestFile(size_t fileSize, PadArchive padding)
  255. {
  256. AZStd::string name = AZStd::string::format("TestFile_%zu.test", m_testFileCount++);
  257. AZ::IO::Path testFullPath = m_tempDirectory.GetDirectory();
  258. testFullPath /= name;
  259. AZStd::unique_ptr<MockFileBase> result = CreateMockFile();
  260. result->CreateTestFile(testFullPath.c_str(), fileSize, padding);
  261. if (CreateDedicatedCache())
  262. {
  263. AZ::Interface<AZ::IO::IStreamer>::Get()->CreateDedicatedCache(name.c_str());
  264. }
  265. return result;
  266. }
  267. void VerifyTestFile(const void* buffer, size_t fileSize, size_t offset = 0)
  268. {
  269. size_t count = fileSize / sizeof(u32);
  270. size_t numOffset = offset / sizeof(u32);
  271. const u32* data = reinterpret_cast<const u32*>(buffer);
  272. for (size_t i = 0; i < count; ++i)
  273. {
  274. EXPECT_EQ(data[i], i + numOffset);
  275. }
  276. }
  277. void AssertTestFile(const void* buffer, size_t fileSize, size_t offset = 0)
  278. {
  279. size_t count = fileSize / sizeof(u32);
  280. size_t numOffset = offset / sizeof(u32);
  281. const u32* data = reinterpret_cast<const u32*>(buffer);
  282. for (size_t i = 0; i < count; ++i)
  283. {
  284. ASSERT_EQ(data[i], i + numOffset);
  285. }
  286. }
  287. void PeriodicallyCheckedRead(AZ::IO::PathView filePath, void* buffer, u64 fileSize, u64 offset, AZStd::chrono::seconds timeOut, bool& result)
  288. {
  289. AZStd::binary_semaphore sync;
  290. AZStd::atomic_bool readSuccessful = false;
  291. auto callback = [&readSuccessful, &sync](FileRequestHandle request)
  292. {
  293. auto streamer = AZ::Interface<AZ::IO::IStreamer>::Get();
  294. readSuccessful = streamer->GetRequestStatus(request) == IStreamerTypes::RequestStatus::Completed;
  295. sync.release();
  296. };
  297. FileRequestPtr request = this->m_streamer->Read(filePath.Native(), buffer, fileSize, fileSize,
  298. IStreamerTypes::s_deadlineNow, IStreamerTypes::s_priorityMedium, offset);
  299. this->m_streamer->SetRequestCompleteCallback(request, AZStd::move(callback));
  300. this->m_streamer->QueueRequest(AZStd::move(request));
  301. bool hasTimedOut = !sync.try_acquire_for(timeOut);
  302. result = readSuccessful && !hasTimedOut;
  303. ASSERT_FALSE(hasTimedOut);
  304. ASSERT_TRUE(readSuccessful);
  305. }
  306. AZ::Test::ScopedAutoTempDirectory m_tempDirectory;
  307. UnitTest::TestFileIOBase m_fileIO;
  308. FileIOBase* m_prevFileIO{ nullptr };
  309. IStreamer* m_streamer{ nullptr };
  310. AZ::ComponentApplication* m_application{ nullptr };
  311. size_t m_testFileCount{ 0 };
  312. };
  313. template<typename TestTag>
  314. class StreamerTest : public StreamerTestBase
  315. {
  316. protected:
  317. bool IsUsingArchive() const override
  318. {
  319. AZ_Assert(false, "Not correctly specialized.");
  320. return false;
  321. }
  322. bool CreateDedicatedCache() const override
  323. {
  324. AZ_Assert(false, "Not correctly specialized.");
  325. return false;
  326. }
  327. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  328. {
  329. AZ_Assert(false, "Not correctly specialized.");
  330. return nullptr;
  331. }
  332. };
  333. template<>
  334. class StreamerTest<DedicatedCache_Uncompressed> : public StreamerTestBase
  335. {
  336. protected:
  337. bool IsUsingArchive() const override { return false; }
  338. bool CreateDedicatedCache() const override { return true; }
  339. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  340. {
  341. return AZStd::make_unique<MockUncompressedFile>();
  342. }
  343. };
  344. template<>
  345. class StreamerTest<GlobalCache_Uncompressed> : public StreamerTestBase
  346. {
  347. protected:
  348. bool IsUsingArchive() const override { return false; }
  349. bool CreateDedicatedCache() const override { return false; }
  350. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  351. {
  352. return AZStd::make_unique<MockUncompressedFile>();
  353. }
  354. };
  355. template<>
  356. class StreamerTest<DedicatedCache_Compressed> : public StreamerTestBase
  357. {
  358. protected:
  359. bool IsUsingArchive() const override { return true; }
  360. bool CreateDedicatedCache() const override { return true; }
  361. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  362. {
  363. return AZStd::make_unique<MockCompressedFile>();
  364. }
  365. };
  366. template<>
  367. class StreamerTest<GlobalCache_Compressed> : public StreamerTestBase
  368. {
  369. protected:
  370. bool IsUsingArchive() const override { return true; }
  371. bool CreateDedicatedCache() const override { return false; }
  372. AZStd::unique_ptr<MockFileBase> CreateMockFile() override
  373. {
  374. return AZStd::make_unique<MockCompressedFile>();
  375. }
  376. };
  377. #if !AZ_TRAIT_DISABLE_FAILED_STREAMER_TESTS
  378. TYPED_TEST_CASE_P(StreamerTest);
  379. // Read a file that's smaller than the cache.
  380. TYPED_TEST_P(StreamerTest, Read_ReadSmallFileEntirely_FileFullyRead)
  381. {
  382. constexpr size_t fileSize = 50_kib;
  383. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  384. char buffer[fileSize];
  385. bool readResult{ false };
  386. this->PeriodicallyCheckedRead(testFile->GetFileName(), buffer, fileSize, 0, AZStd::chrono::seconds(5), readResult);
  387. EXPECT_TRUE(readResult);
  388. if(readResult)
  389. {
  390. this->VerifyTestFile(buffer, fileSize);
  391. }
  392. }
  393. // Read a large file that will need to be broken into chunks.
  394. TYPED_TEST_P(StreamerTest, Read_ReadLargeFileEntirely_FileFullyRead)
  395. {
  396. constexpr size_t fileSize = 10_mib;
  397. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  398. char* buffer = new char[fileSize];
  399. bool readResult{ false };
  400. this->PeriodicallyCheckedRead(testFile->GetFileName(), buffer, fileSize, 0, AZStd::chrono::seconds(5), readResult);
  401. EXPECT_TRUE(readResult);
  402. if(readResult)
  403. {
  404. this->VerifyTestFile(buffer, fileSize);
  405. }
  406. delete[] buffer;
  407. }
  408. // Reads multiple small pieces to make sure that the cache is hit, seeded and copied properly.
  409. TYPED_TEST_P(StreamerTest, Read_ReadMultiplePieces_AllReadRequestWereSuccessful)
  410. {
  411. constexpr size_t fileSize = 2_mib;
  412. // Deliberately not taking a multiple of the file size so at least one read will have a partial cache hit.
  413. #if defined(AZ_DEBUG_BUILD)
  414. constexpr size_t bufferSize = 4800;
  415. #else
  416. constexpr size_t bufferSize = 480;
  417. #endif
  418. constexpr size_t readBlock = bufferSize * sizeof(u32);
  419. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  420. u32 buffer[bufferSize];
  421. size_t block = 0;
  422. size_t fileRemainder = fileSize;
  423. for (block = 0; block < fileSize; block += readBlock)
  424. {
  425. size_t blockSize = AZStd::min(readBlock, fileRemainder);
  426. bool readResult{ false };
  427. this->PeriodicallyCheckedRead(testFile->GetFileName(), buffer, blockSize, block, AZStd::chrono::seconds(5), readResult);
  428. EXPECT_TRUE(readResult);
  429. if (readResult)
  430. {
  431. this->AssertTestFile(buffer, blockSize, block);
  432. }
  433. fileRemainder -= blockSize;
  434. }
  435. }
  436. // Same as the previous test, but all requests are submitted in a single batch.
  437. TYPED_TEST_P(StreamerTest, Read_ReadMultiplePiecesWithBatch_AllReadRequestWereSuccessful)
  438. {
  439. constexpr size_t fileSize = 2_mib;
  440. // Deliberately not taking a multiple of the file size so at least one read will have a partial cache hit.
  441. #if defined(AZ_DEBUG_BUILD)
  442. constexpr size_t bufferSize = 4800 * sizeof(u32);
  443. #else
  444. constexpr size_t bufferSize = 480 * sizeof(u32);
  445. #endif
  446. constexpr size_t numRequests = (fileSize / bufferSize) + 1;
  447. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  448. AZStd::vector<FileRequestPtr> requests;
  449. this->m_streamer->CreateRequestBatch(requests, numRequests);
  450. AZStd::binary_semaphore sync;
  451. AZStd::atomic_int remainingReads = numRequests;
  452. AZStd::atomic_bool readSuccessful = true;
  453. auto callback = [&readSuccessful, &sync, &remainingReads](FileRequestHandle request)
  454. {
  455. if (AZ::Interface<IStreamer>::Get()->GetRequestStatus(request) != IStreamerTypes::RequestStatus::Completed)
  456. {
  457. readSuccessful = false;
  458. }
  459. if (--remainingReads == 0)
  460. {
  461. sync.release();
  462. }
  463. };
  464. u8* buffer = new u8[fileSize];
  465. size_t block = 0;
  466. size_t fileRemainder = fileSize;
  467. size_t requestIndex = 0;
  468. for (block = 0; block < fileSize; block += bufferSize)
  469. {
  470. size_t blockSize = AZStd::min(bufferSize, fileRemainder);
  471. this->m_streamer->Read(requests[requestIndex], testFile->GetFileName().Native(), buffer + block, blockSize, blockSize,
  472. IStreamerTypes::s_deadlineNow, IStreamerTypes::s_priorityMedium, block);
  473. this->m_streamer->SetRequestCompleteCallback(requests[requestIndex], callback);
  474. fileRemainder -= blockSize;
  475. requestIndex++;
  476. }
  477. this->m_streamer->QueueRequestBatch(requests);
  478. bool hasTimedOut = !sync.try_acquire_for(AZStd::chrono::minutes(10)); // Especially in debug this can take a long time.
  479. EXPECT_FALSE(hasTimedOut);
  480. EXPECT_TRUE(readSuccessful);
  481. fileRemainder = fileSize;
  482. for (block = 0; block < fileSize; block += bufferSize)
  483. {
  484. size_t blockSize = AZStd::min(bufferSize, fileRemainder);
  485. this->AssertTestFile(buffer + block, blockSize, block);
  486. fileRemainder -= blockSize;
  487. }
  488. delete[] buffer;
  489. }
  490. // Queue a request on a suspended device, then resume to see if gets picked up again.
  491. TYPED_TEST_P(StreamerTest, SuspendProcessing_SuspendWhileFileIsQueued_FileIsNotReadUntilProcessingIsRestarted)
  492. {
  493. constexpr size_t fileSize = 50_kib;
  494. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  495. AZStd::binary_semaphore sync;
  496. AZStd::atomic_bool readSuccessful = false;
  497. auto callback = [&readSuccessful, &sync](FileRequestHandle request)
  498. {
  499. readSuccessful = AZ::Interface<IStreamer>::Get()->GetRequestStatus(request) == IStreamerTypes::RequestStatus::Completed;
  500. sync.release();
  501. };
  502. char buffer[fileSize];
  503. FileRequestPtr request = this->m_streamer->Read(testFile->GetFileName().Native(), buffer, fileSize, fileSize);
  504. this->m_streamer->SetRequestCompleteCallback(request, AZStd::move(callback));
  505. this->m_streamer->SuspendProcessing();
  506. this->m_streamer->QueueRequest(AZStd::move(request));
  507. // Sleep for a short while to make sure the test doesn't outrun the Streamer.
  508. AZStd::this_thread::sleep_for(AZStd::chrono::seconds(1));
  509. EXPECT_EQ(IStreamerTypes::RequestStatus::Pending, this->m_streamer->GetRequestStatus(request));
  510. // Wait for a maximum of a few seconds for the request to complete. If it doesn't, the suspend is most likely stuck and the test should fail.
  511. this->m_streamer->ResumeProcessing();
  512. bool hasTimedOut = !sync.try_acquire_for(AZStd::chrono::seconds(5));
  513. EXPECT_FALSE(hasTimedOut);
  514. EXPECT_TRUE(readSuccessful);
  515. }
  516. TYPED_TEST_P(StreamerTest, FlushCaches_FlushAfterEveryRead_FilesAreReadCorrectly)
  517. {
  518. constexpr size_t fileSize = 4_mib;
  519. constexpr size_t fileCount = 128;
  520. AZStd::vector<AZStd::unique_ptr<MockFileBase>> testFiles;
  521. AZStd::vector<AZStd::unique_ptr<char[]>> testData;
  522. AZStd::vector<FileRequestPtr> requests;
  523. testFiles.reserve(fileCount);
  524. testData.reserve(fileCount);
  525. requests.reserve(fileCount * 2);
  526. AZStd::binary_semaphore sync;
  527. AZStd::atomic_bool readSuccessful = true;
  528. AZStd::atomic_int counter = fileCount * 2;
  529. auto callback = [&sync, &counter, &readSuccessful](FileRequestHandle request)
  530. {
  531. readSuccessful = readSuccessful && AZ::Interface<IStreamer>::Get()->GetRequestStatus(request) == IStreamerTypes::RequestStatus::Completed;
  532. counter--;
  533. if (counter == 0)
  534. {
  535. sync.release();
  536. }
  537. };
  538. for (size_t i = 0; i < fileCount; ++i)
  539. {
  540. auto testFile = this->CreateTestFile(fileSize, PadArchive::No);
  541. AZStd::unique_ptr<char[]> buffer(new char[fileSize]);
  542. auto readRequest = this->m_streamer->Read(testFile->GetFileName().Native(), buffer.get(), fileSize, fileSize);
  543. this->m_streamer->SetRequestCompleteCallback(readRequest, callback);
  544. auto flushRequest = this->m_streamer->FlushCaches();
  545. this->m_streamer->SetRequestCompleteCallback(flushRequest, callback);
  546. requests.push_back(AZStd::move(readRequest));
  547. requests.push_back(AZStd::move(flushRequest));
  548. testFiles.push_back(AZStd::move(testFile));
  549. testData.push_back(AZStd::move(buffer));
  550. }
  551. for (size_t i = 0; i < fileCount * 2; i += 2)
  552. {
  553. this->m_streamer->QueueRequest(requests[i]);
  554. this->m_streamer->QueueRequest(requests[i + 1]);
  555. AZStd::this_thread::yield();
  556. }
  557. bool hasTimedOut = !sync.try_acquire_for(AZStd::chrono::seconds(30));
  558. EXPECT_FALSE(hasTimedOut);
  559. EXPECT_TRUE(readSuccessful);
  560. }
  561. REGISTER_TYPED_TEST_CASE_P(StreamerTest,
  562. Read_ReadSmallFileEntirely_FileFullyRead,
  563. Read_ReadLargeFileEntirely_FileFullyRead,
  564. Read_ReadMultiplePieces_AllReadRequestWereSuccessful,
  565. Read_ReadMultiplePiecesWithBatch_AllReadRequestWereSuccessful,
  566. SuspendProcessing_SuspendWhileFileIsQueued_FileIsNotReadUntilProcessingIsRestarted,
  567. FlushCaches_FlushAfterEveryRead_FilesAreReadCorrectly);
  568. using StreamerTestCases = ::testing::Types<GlobalCache_Uncompressed, DedicatedCache_Uncompressed, GlobalCache_Compressed, DedicatedCache_Compressed>;
  569. INSTANTIATE_TYPED_TEST_CASE_P(StreamerTests, StreamerTest, StreamerTestCases);
  570. #endif // AZ_TRAIT_DISABLE_FAILED_STREAMER_TESTS
  571. } // namespace AZ::IO