Parallel.cpp 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include "UserTypes.h"
  9. #include <AzCore/std/parallel/atomic.h>
  10. #include <AzCore/std/parallel/combinable.h>
  11. #include <AzCore/std/parallel/mutex.h>
  12. #include <AzCore/std/parallel/semaphore.h>
  13. #include <AzCore/std/parallel/binary_semaphore.h>
  14. #include <AzCore/std/parallel/spin_mutex.h>
  15. #include <AzCore/std/parallel/lock.h>
  16. #include <AzCore/std/parallel/shared_mutex.h>
  17. #include <AzCore/std/parallel/condition_variable.h>
  18. #include <AzCore/std/parallel/threadbus.h>
  19. #include <AzCore/std/parallel/thread.h>
  20. #include <AzCore/std/chrono/chrono.h>
  21. #include <AzCore/Memory/SystemAllocator.h>
  22. namespace UnitTest
  23. {
  24. using namespace AZStd;
  25. using namespace UnitTestInternal;
  26. /**
  27. * Synchronization primitives test.
  28. */
  29. TEST(Parallel, Mutex)
  30. {
  31. mutex m;
  32. m.lock();
  33. m.unlock();
  34. }
  35. TEST(Parallel, RecursiveMutex)
  36. {
  37. recursive_mutex m1;
  38. m1.lock();
  39. EXPECT_TRUE(m1.try_lock()); // we should be able to lock it from the same thread again...
  40. m1.unlock();
  41. m1.unlock();
  42. {
  43. mutex m2;
  44. lock_guard<mutex> l(m2);
  45. }
  46. }
  47. TEST(Parallel, Semaphore_Sanity)
  48. {
  49. semaphore sema;
  50. sema.release(1);
  51. sema.acquire();
  52. }
  53. // MARGIN_OF_ERROR_MS: margin of error for waits.
  54. // This is necessary because timers are not exact.
  55. // Also, the failure conditions we are looking for are massive failures, for example, asking it to wait
  56. // 100ms and having it not wait at all!
  57. // Note that on most platforms, a margin of 2ms and wait time of 20ms was adequate, but
  58. // there are some platforms that have poor timer resolution. So we'll greatly increase the margin.
  59. constexpr AZStd::chrono::milliseconds MARGIN_OF_ERROR_MS(20);
  60. // This is how long we wait when asked to wait a FULL duration. This number should be as small as possible
  61. // for test efficiency while still being significant compared to the margin above.
  62. constexpr AZStd::chrono::milliseconds WAIT_TIME_MS(60);
  63. TEST(Parallel, Semaphore_TryAcquireFor_WaitsMinimumTime)
  64. {
  65. // try_acquire_for according to standard is a minimum amount of time.
  66. // that it should wait for.
  67. semaphore sema;
  68. auto minDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  69. auto minDurationWithMarginForError = minDuration - AZStd::chrono::milliseconds(MARGIN_OF_ERROR_MS);
  70. auto startTime = AZStd::chrono::steady_clock::now();
  71. EXPECT_FALSE(sema.try_acquire_for(minDuration));
  72. auto actualDuration = AZStd::chrono::steady_clock::now() - startTime;
  73. EXPECT_GE(actualDuration, minDurationWithMarginForError);
  74. }
  75. TEST(Parallel, Semaphore_TryAcquireUntil_ActuallyWaits)
  76. {
  77. // try_acquire_until should not wake up until the time specified
  78. semaphore sema;
  79. auto minDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  80. auto minDurationWithMarginForError = minDuration - AZStd::chrono::milliseconds(MARGIN_OF_ERROR_MS);
  81. auto startTime = AZStd::chrono::steady_clock::now();
  82. auto absTime = startTime + minDuration;
  83. EXPECT_FALSE(sema.try_acquire_until(absTime));
  84. auto duration = AZStd::chrono::steady_clock::now() - startTime;
  85. EXPECT_GE(duration, minDurationWithMarginForError);
  86. }
  87. TEST(Parallel, Semaphore_TryAcquireFor_Signalled_DoesNotWait)
  88. {
  89. semaphore sema;
  90. // this duration should not matter since it should not wait at all so we don't need an error margin.
  91. auto minDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  92. auto startTime = AZStd::chrono::steady_clock::now();
  93. sema.release();
  94. EXPECT_TRUE(sema.try_acquire_for(minDuration));
  95. auto durationSpent = AZStd::chrono::steady_clock::now() - startTime;
  96. EXPECT_LT(durationSpent, minDuration);
  97. }
  98. TEST(Parallel, Semaphore_TryAcquireUntil_Signalled_DoesNotWait)
  99. {
  100. semaphore sema;
  101. // we should not wait all at here, since we start with it already signalled.
  102. auto minDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  103. auto startTime = AZStd::chrono::steady_clock::now();
  104. auto absTime = startTime + minDuration;
  105. sema.release();
  106. EXPECT_TRUE(sema.try_acquire_until(absTime));
  107. auto duration = AZStd::chrono::steady_clock::now() - startTime;
  108. EXPECT_LT(duration, minDuration);
  109. }
  110. TEST(Parallel, BinarySemaphore)
  111. {
  112. binary_semaphore event;
  113. event.release();
  114. event.acquire();
  115. }
  116. TEST(Parallel, SpinMutex)
  117. {
  118. spin_mutex sm;
  119. sm.lock();
  120. sm.unlock();
  121. }
  122. /**
  123. * Thread test
  124. */
  125. class Parallel_Thread
  126. : public LeakDetectionFixture
  127. {
  128. protected:
  129. int m_data{};
  130. int m_dataMax{};
  131. static const int m_threadStackSize = 32 * 1024;
  132. thread_desc m_desc[3];
  133. int m_numThreadDesc = 0;
  134. public:
  135. void increment_data()
  136. {
  137. while (m_data < m_dataMax)
  138. {
  139. m_data++;
  140. }
  141. }
  142. void sleep_thread(chrono::milliseconds time)
  143. {
  144. this_thread::sleep_for(time);
  145. }
  146. void get_thread_id(AZStd::thread::id* id)
  147. {
  148. *id = this_thread::get_id();
  149. }
  150. class MfTest
  151. {
  152. public:
  153. mutable unsigned int m_hash{};
  154. MfTest() = default;
  155. int f0() { f1(17); return 0; }
  156. int g0() const { g1(17); return 0; }
  157. int f1(int a1) { m_hash = (m_hash * 17041 + a1) % 32768; return 0; }
  158. int g1(int a1) const { m_hash = (m_hash * 17041 + a1 * 2) % 32768; return 0; }
  159. int f2(int a1, int a2) { f1(a1); f1(a2); return 0; }
  160. int g2(int a1, int a2) const { g1(a1); g1(a2); return 0; }
  161. int f3(int a1, int a2, int a3) { f2(a1, a2); f1(a3); return 0; }
  162. int g3(int a1, int a2, int a3) const { g2(a1, a2); g1(a3); return 0; }
  163. int f4(int a1, int a2, int a3, int a4) { f3(a1, a2, a3); f1(a4); return 0; }
  164. int g4(int a1, int a2, int a3, int a4) const { g3(a1, a2, a3); g1(a4); return 0; }
  165. int f5(int a1, int a2, int a3, int a4, int a5) { f4(a1, a2, a3, a4); f1(a5); return 0; }
  166. int g5(int a1, int a2, int a3, int a4, int a5) const { g4(a1, a2, a3, a4); g1(a5); return 0; }
  167. int f6(int a1, int a2, int a3, int a4, int a5, int a6) { f5(a1, a2, a3, a4, a5); f1(a6); return 0; }
  168. int g6(int a1, int a2, int a3, int a4, int a5, int a6) const { g5(a1, a2, a3, a4, a5); g1(a6); return 0; }
  169. int f7(int a1, int a2, int a3, int a4, int a5, int a6, int a7) { f6(a1, a2, a3, a4, a5, a6); f1(a7); return 0; }
  170. int g7(int a1, int a2, int a3, int a4, int a5, int a6, int a7) const { g6(a1, a2, a3, a4, a5, a6); g1(a7); return 0; }
  171. int f8(int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8) { f7(a1, a2, a3, a4, a5, a6, a7); f1(a8); return 0; }
  172. int g8(int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8) const { g7(a1, a2, a3, a4, a5, a6, a7); g1(a8); return 0; }
  173. };
  174. void do_nothing_id(AZStd::thread::id* my_id)
  175. {
  176. *my_id = this_thread::get_id();
  177. }
  178. AZStd::thread make_thread(AZStd::thread::id* the_id)
  179. {
  180. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  181. return AZStd::thread(desc1, [this](AZStd::thread::id* threadId) { do_nothing_id(threadId); }, the_id);
  182. }
  183. void simple_thread()
  184. {
  185. m_data = 999;
  186. }
  187. void comparison_thread(AZStd::thread::id parent)
  188. {
  189. AZStd::thread::id const my_id = this_thread::get_id();
  190. EXPECT_NE(parent, my_id);
  191. AZStd::thread::id const my_id2 = this_thread::get_id();
  192. EXPECT_EQ(my_id2, my_id);
  193. AZStd::thread::id const no_thread_id = AZStd::thread::id();
  194. EXPECT_NE(no_thread_id, my_id);
  195. }
  196. struct non_copyable_functor
  197. {
  198. unsigned value;
  199. non_copyable_functor()
  200. : value(0)
  201. {}
  202. void operator()()
  203. {
  204. value = 999;
  205. }
  206. private:
  207. non_copyable_functor(const non_copyable_functor&);
  208. non_copyable_functor& operator=(const non_copyable_functor&);
  209. };
  210. };
  211. TEST_F(Parallel_Thread, ThreadSanityTest)
  212. {
  213. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  214. // We need to have at least one processor
  215. EXPECT_GE(AZStd::thread::hardware_concurrency(), 1);
  216. // Create thread to increment data till we need to
  217. m_data = 0;
  218. m_dataMax = 10;
  219. AZStd::thread tr(
  220. desc1,
  221. [this]()
  222. {
  223. increment_data();
  224. });
  225. tr.join();
  226. EXPECT_EQ(m_dataMax, m_data);
  227. chrono::steady_clock::time_point startTime = chrono::steady_clock::now();
  228. {
  229. AZStd::thread tr1(desc1, [this](AZStd::chrono::milliseconds waitTime) { sleep_thread(waitTime); },
  230. chrono::milliseconds(100));
  231. tr1.join();
  232. }
  233. auto sleepTime = chrono::steady_clock::now() - startTime;
  234. // printf("\nSleeptime: %d Ms\n",(unsigned int) ());
  235. // On Windows use Sleep. Sleep is dependent on MM timers.
  236. // 99000 can be used only if the OS supports 1 ms resolution timeGetDevCaps() and it is set to timeBeginPeriod(1) timeEndPeriod(1)
  237. EXPECT_GE(sleepTime.count(), 50000);
  238. }
  239. TEST_F(Parallel_Thread, ThreadCreation_Succeeds)
  240. {
  241. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  242. m_data = 0;
  243. AZStd::thread t(
  244. desc1,
  245. [this]()
  246. {
  247. simple_thread();
  248. });
  249. t.join();
  250. EXPECT_EQ(999, m_data);
  251. }
  252. TEST_F(Parallel_Thread, ThreadCreationThroughRefWrapper_Succeeds)
  253. {
  254. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  255. non_copyable_functor f;
  256. AZStd::thread thrd(desc1, AZStd::ref(f));
  257. thrd.join();
  258. EXPECT_EQ(999, f.value);
  259. }
  260. TEST_F(Parallel_Thread, ThreadIdIsComparable_Succeeds)
  261. {
  262. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  263. m_data = 0;
  264. AZStd::thread t(
  265. desc1,
  266. [this]()
  267. {
  268. this->simple_thread();
  269. });
  270. t.join();
  271. EXPECT_EQ(999, m_data);
  272. }
  273. TEST_F(Parallel_Thread, TestSwapThread_Succeeds)
  274. {
  275. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  276. const thread_desc desc2 = m_numThreadDesc ? m_desc[1] : thread_desc{};
  277. AZStd::thread t(desc1, [this]() { simple_thread(); });
  278. AZStd::thread t2(desc2, [this]() { simple_thread(); });
  279. AZStd::thread::id id1 = t.get_id();
  280. AZStd::thread::id id2 = t2.get_id();
  281. t.swap(t2);
  282. EXPECT_EQ(id2, t.get_id());
  283. EXPECT_EQ(id1, t2.get_id());
  284. swap(t, t2);
  285. EXPECT_EQ(id1, t.get_id());
  286. EXPECT_EQ(id2, t2.get_id());
  287. t.join();
  288. t2.join();
  289. }
  290. TEST_F(Parallel_Thread, ThreadIdIsDefaultConstructForThread_Succeeds)
  291. {
  292. AZStd::thread t;
  293. EXPECT_EQ(AZStd::thread::id(), t.get_id());
  294. }
  295. TEST_F(Parallel_Thread, ThreadIdForCurrentThread_IsNottDefaultConstructed_Succeeds)
  296. {
  297. const thread_desc desc = m_numThreadDesc ? m_desc[0] : thread_desc{};
  298. AZStd::thread t(desc, [](){});
  299. EXPECT_NE(AZStd::thread::id(), t.get_id());
  300. t.join();
  301. }
  302. TEST_F(Parallel_Thread, DifferentThreadsHaveDifferentThreadIds_Succeeds)
  303. {
  304. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  305. const thread_desc desc2 = m_numThreadDesc ? m_desc[1] : thread_desc{};
  306. AZStd::thread t(desc1, [](){});
  307. AZStd::thread t2(desc2, [](){});
  308. EXPECT_NE(t.get_id(), t2.get_id());
  309. t.join();
  310. t2.join();
  311. }
  312. TEST_F(Parallel_Thread, ThreadIdsAreTotallyOrdered_Succeeds)
  313. {
  314. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  315. const thread_desc desc2 = m_numThreadDesc ? m_desc[1] : thread_desc{};
  316. const thread_desc desc3 = m_numThreadDesc ? m_desc[2] : thread_desc{};
  317. AZStd::thread t(desc1, [](){});
  318. AZStd::thread t2(desc2, [](){});
  319. AZStd::thread t3(desc3, [](){});
  320. EXPECT_NE(t2.get_id(), t.get_id());
  321. EXPECT_NE(t3.get_id(), t.get_id());
  322. EXPECT_NE(t3.get_id(), t2.get_id());
  323. EXPECT_NE((t2.get_id() < t.get_id()), (t.get_id() < t2.get_id()));
  324. EXPECT_NE((t3.get_id() < t.get_id()), (t.get_id() < t3.get_id()));
  325. EXPECT_NE((t3.get_id() < t2.get_id()), (t2.get_id() < t3.get_id()));
  326. EXPECT_NE((t2.get_id() > t.get_id()), (t.get_id() > t2.get_id()));
  327. EXPECT_NE((t3.get_id() > t.get_id()), (t.get_id() > t3.get_id()));
  328. EXPECT_NE((t3.get_id() > t2.get_id()), (t2.get_id() > t3.get_id()));
  329. EXPECT_EQ((t2.get_id() > t.get_id()), (t.get_id() < t2.get_id()));
  330. EXPECT_EQ((t.get_id() > t2.get_id()), (t2.get_id() < t.get_id()));
  331. EXPECT_EQ((t3.get_id() > t.get_id()), (t.get_id() < t3.get_id()));
  332. EXPECT_EQ((t.get_id() > t3.get_id()), (t3.get_id() < t.get_id()));
  333. EXPECT_EQ((t3.get_id() > t2.get_id()), (t2.get_id() < t3.get_id()));
  334. EXPECT_EQ((t2.get_id() > t3.get_id()), (t3.get_id() < t2.get_id()));
  335. EXPECT_EQ((t2.get_id() >= t.get_id()), (t.get_id() < t2.get_id()));
  336. EXPECT_EQ((t.get_id() >= t2.get_id()), (t2.get_id() < t.get_id()));
  337. EXPECT_EQ((t3.get_id() >= t.get_id()), (t.get_id() < t3.get_id()));
  338. EXPECT_EQ((t.get_id() >= t3.get_id()), (t3.get_id() < t.get_id()));
  339. EXPECT_EQ((t3.get_id() >= t2.get_id()), (t2.get_id() < t3.get_id()));
  340. EXPECT_EQ((t2.get_id() >= t3.get_id()), (t3.get_id() < t2.get_id()));
  341. EXPECT_EQ((t2.get_id() > t.get_id()), (t.get_id() <= t2.get_id()));
  342. EXPECT_EQ((t.get_id() > t2.get_id()), (t2.get_id() <= t.get_id()));
  343. EXPECT_EQ((t3.get_id() > t.get_id()), (t.get_id() <= t3.get_id()));
  344. EXPECT_EQ((t.get_id() > t3.get_id()), (t3.get_id() <= t.get_id()));
  345. EXPECT_EQ((t3.get_id() > t2.get_id()), (t2.get_id() <= t3.get_id()));
  346. EXPECT_EQ((t2.get_id() > t3.get_id()), (t3.get_id() <= t2.get_id()));
  347. if ((t.get_id() < t2.get_id()) && (t2.get_id() < t3.get_id()))
  348. {
  349. EXPECT_LT(t.get_id(), t3.get_id());
  350. }
  351. else if ((t.get_id() < t3.get_id()) && (t3.get_id() < t2.get_id()))
  352. {
  353. EXPECT_LT(t.get_id(), t2.get_id());
  354. }
  355. else if ((t2.get_id() < t3.get_id()) && (t3.get_id() < t.get_id()))
  356. {
  357. EXPECT_LT(t2.get_id(), t.get_id());
  358. }
  359. else if ((t2.get_id() < t.get_id()) && (t.get_id() < t3.get_id()))
  360. {
  361. EXPECT_LT(t2.get_id(), t3.get_id());
  362. }
  363. else if ((t3.get_id() < t.get_id()) && (t.get_id() < t2.get_id()))
  364. {
  365. EXPECT_LT(t3.get_id(), t2.get_id());
  366. }
  367. else if ((t3.get_id() < t2.get_id()) && (t2.get_id() < t.get_id()))
  368. {
  369. EXPECT_LT(t3.get_id(), t.get_id());
  370. }
  371. else
  372. {
  373. GTEST_FAIL();
  374. }
  375. AZStd::thread::id default_id;
  376. EXPECT_LT(default_id, t.get_id());
  377. EXPECT_LT(default_id, t2.get_id());
  378. EXPECT_LT(default_id, t3.get_id());
  379. EXPECT_LE(default_id, t.get_id());
  380. EXPECT_LE(default_id, t2.get_id());
  381. EXPECT_LE(default_id, t3.get_id());
  382. EXPECT_FALSE(default_id > t.get_id());
  383. EXPECT_FALSE(default_id > t2.get_id());
  384. EXPECT_FALSE(default_id > t3.get_id());
  385. EXPECT_FALSE(default_id >= t.get_id());
  386. EXPECT_FALSE(default_id >= t2.get_id());
  387. EXPECT_FALSE(default_id >= t3.get_id());
  388. t.join();
  389. t2.join();
  390. t3.join();
  391. }
  392. TEST_F(Parallel_Thread, ThreadIdOfCurrentThreadReturnedByThisThreadId_Succeeds)
  393. {
  394. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  395. AZStd::thread::id id;
  396. AZStd::thread t(desc1, [this, &id]() { get_thread_id(&id); });
  397. AZStd::thread::id t_id = t.get_id();
  398. t.join();
  399. EXPECT_EQ(t_id, id);
  400. }
  401. TEST_F(Parallel_Thread, ThreadInvokesMemberFunction_Succeeds)
  402. {
  403. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  404. MfTest x;
  405. AZStd::function<void()> func = [xPtr = &x]()
  406. {
  407. xPtr->f0();
  408. };
  409. AZStd::thread(desc1, func).join();
  410. func = [&x]
  411. {
  412. x.f0();
  413. };
  414. AZStd::thread(desc1, func).join();
  415. func = [xPtr = &x]
  416. {
  417. xPtr->g0();
  418. };
  419. AZStd::thread(desc1, func).join();
  420. func = [x]
  421. {
  422. x.g0();
  423. };
  424. AZStd::thread(desc1, func).join();
  425. func = [&x]
  426. {
  427. x.g0();
  428. };
  429. AZStd::thread(desc1, func).join();
  430. EXPECT_EQ(1366, x.m_hash);
  431. }
  432. TEST_F(Parallel_Thread, ThreadCanBeMovedAssigned_Succeeds)
  433. {
  434. const thread_desc desc1 = m_numThreadDesc ? m_desc[0] : thread_desc{};
  435. AZStd::thread::id the_id;
  436. AZStd::thread x;
  437. x = AZStd::thread(desc1, [this, &the_id]() { do_nothing_id(&the_id); });
  438. AZStd::thread::id x_id = x.get_id();
  439. x.join();
  440. EXPECT_EQ(x_id, the_id);
  441. }
  442. TEST_F(Parallel_Thread, ThreadMoveConstructorIsInvokedOnReturn_Succeeds)
  443. {
  444. AZStd::thread::id the_id;
  445. AZStd::thread x;
  446. x = make_thread(&the_id);
  447. AZStd::thread::id x_id = x.get_id();
  448. x.join();
  449. EXPECT_EQ(x_id, the_id);
  450. }
  451. TEST_F(Parallel_Thread, Hashable)
  452. {
  453. constexpr size_t ThreadCount = 100;
  454. // Make sure threadids can be added to a map.
  455. AZStd::vector<AZStd::thread*> threadVector;
  456. AZStd::unordered_map<AZStd::thread_id, AZStd::thread*> threadMap;
  457. // Create a bunch of threads and add them to a map
  458. for (uint32_t i = 0; i < ThreadCount; ++i)
  459. {
  460. AZStd::thread* thread = new AZStd::thread([i]() { return i; });
  461. threadVector.push_back(thread);
  462. threadMap[thread->get_id()] = thread;
  463. }
  464. // Check and make sure they threads can be found by id and match the ones created.
  465. for (uint32_t i = 0; i < ThreadCount; ++i)
  466. {
  467. AZStd::thread* thread = threadVector.at(i);
  468. EXPECT_TRUE(threadMap.at(thread->get_id()) == thread);
  469. }
  470. // Clean up the threads
  471. AZStd::for_each(threadVector.begin(), threadVector.end(),
  472. [](AZStd::thread* thread)
  473. {
  474. thread->join();
  475. delete thread;
  476. }
  477. );
  478. }
  479. class Parallel_Combinable
  480. : public LeakDetectionFixture
  481. {
  482. public:
  483. void run()
  484. {
  485. //initialize with default value
  486. {
  487. combinable<TestStruct> c;
  488. TestStruct& s = c.local();
  489. EXPECT_EQ(42, s.m_x);
  490. }
  491. //detect first initialization
  492. {
  493. combinable<int> c;
  494. bool exists;
  495. int& v1 = c.local(exists);
  496. EXPECT_FALSE(exists);
  497. v1 = 42;
  498. int& v2 = c.local(exists);
  499. EXPECT_TRUE(exists);
  500. EXPECT_EQ(42, v2);
  501. int& v3 = c.local();
  502. EXPECT_EQ(42, v3);
  503. }
  504. //custom initializer
  505. {
  506. combinable<int> c(&Initializer);
  507. EXPECT_EQ(43, c.local());
  508. }
  509. //clear
  510. {
  511. combinable<int> c(&Initializer);
  512. bool exists;
  513. int& v1 = c.local(exists);
  514. EXPECT_EQ(43, v1);
  515. EXPECT_FALSE(exists);
  516. v1 = 44;
  517. c.clear();
  518. int& v2 = c.local(exists);
  519. EXPECT_EQ(43, v2);
  520. EXPECT_FALSE(exists);
  521. }
  522. //copy constructor and assignment
  523. {
  524. combinable<int> c1, c2;
  525. int& v = c1.local();
  526. v = 45;
  527. combinable<int> c3(c1);
  528. EXPECT_EQ(45, c3.local());
  529. c2 = c1;
  530. EXPECT_EQ(45, c2.local());
  531. }
  532. //combine
  533. {
  534. combinable<int> c(&Initializer);
  535. //default value when no other values
  536. EXPECT_EQ(43, c.combine(plus<int>()));
  537. c.local() = 50;
  538. EXPECT_EQ(50, c.combine(plus<int>()));
  539. }
  540. //combine_each
  541. {
  542. combinable<int> c(&Initializer);
  543. m_numCombinerCalls = 0;
  544. c.combine_each([this](int value) { MyCombiner(value); });
  545. EXPECT_EQ(0, m_numCombinerCalls);
  546. m_numCombinerCalls = 0;
  547. m_combinerTotal = 0;
  548. c.local() = 50;
  549. c.combine_each([this](int value) { MyCombiner(value); });
  550. EXPECT_EQ(1, m_numCombinerCalls);
  551. EXPECT_EQ(50, m_combinerTotal);
  552. }
  553. //multithread test
  554. {
  555. AZStd::thread_desc desc;
  556. desc.m_name = "Test Thread 1";
  557. AZStd::thread t1(desc, [this](int start, int end) { MyThreadFunc(start, end); }, 0, 10);
  558. desc.m_name = "Test Thread 2";
  559. AZStd::thread t2(desc, [this](int start, int end) { MyThreadFunc(start, end); }, 10, 20);
  560. desc.m_name = "Test Thread 3";
  561. AZStd::thread t3(desc, [this](int start, int end) { MyThreadFunc(start, end); }, 20, 500);
  562. desc.m_name = "Test Thread 4";
  563. AZStd::thread t4(desc, [this](int start, int end) { MyThreadFunc(start, end); }, 500, 510);
  564. desc.m_name = "Test Thread 5";
  565. AZStd::thread t5(desc, [this](int start, int end) { MyThreadFunc(start, end); }, 510, 2001);
  566. t1.join();
  567. t2.join();
  568. t3.join();
  569. t4.join();
  570. t5.join();
  571. m_numCombinerCalls = 0;
  572. m_combinerTotal = 0;
  573. m_threadCombinable.combine_each([this](int value) { MyCombiner(value); });
  574. EXPECT_EQ(5, m_numCombinerCalls);
  575. EXPECT_EQ(2001000, m_combinerTotal);
  576. EXPECT_EQ(2001000, m_threadCombinable.combine(plus<int>()));
  577. m_threadCombinable.clear();
  578. }
  579. }
  580. static int Initializer()
  581. {
  582. return 43;
  583. }
  584. void MyThreadFunc(int start, int end)
  585. {
  586. int& v = m_threadCombinable.local();
  587. v = 0;
  588. for (int i = start; i < end; ++i)
  589. {
  590. v += i;
  591. }
  592. }
  593. void MyCombiner(int v)
  594. {
  595. ++m_numCombinerCalls;
  596. m_combinerTotal += v;
  597. }
  598. int m_numCombinerCalls;
  599. int m_combinerTotal;
  600. combinable<int> m_threadCombinable;
  601. struct TestStruct
  602. {
  603. TestStruct()
  604. : m_x(42) { }
  605. int m_x;
  606. };
  607. };
  608. TEST_F(Parallel_Combinable, Test)
  609. {
  610. run();
  611. }
  612. class Parallel_SharedMutex
  613. : public LeakDetectionFixture
  614. {
  615. public:
  616. static const int s_numOfReaders = 4;
  617. shared_mutex m_access;
  618. unsigned int m_readSum[s_numOfReaders];
  619. unsigned int m_currentValue;
  620. void Reader(int index)
  621. {
  622. unsigned int lastCurrentValue = 0;
  623. while (true)
  624. {
  625. {
  626. // get shared access
  627. shared_lock<shared_mutex> lock(m_access);
  628. // now we have shared access
  629. if (lastCurrentValue != m_currentValue)
  630. {
  631. lastCurrentValue = m_currentValue;
  632. m_readSum[index] += lastCurrentValue;
  633. if (m_currentValue == 100)
  634. {
  635. break;
  636. }
  637. }
  638. }
  639. this_thread::sleep_for(AZStd::chrono::milliseconds(1));
  640. }
  641. }
  642. void Writer()
  643. {
  644. while (m_currentValue < 100)
  645. {
  646. {
  647. lock_guard<shared_mutex> lock(m_access);
  648. // now we have exclusive access
  649. // m_currentValue must be checked within the mutex as it is possible that
  650. // the other writer thread incremented the m_currentValue to 100 between the check of
  651. // the while loop condition and the acquiring of the shared_mutex exclusive lock
  652. if (m_currentValue < 100)
  653. {
  654. unsigned int currentValue = m_currentValue;
  655. m_currentValue = currentValue + 1;
  656. }
  657. }
  658. this_thread::sleep_for(AZStd::chrono::milliseconds(10));
  659. }
  660. }
  661. void run()
  662. {
  663. // basic operations
  664. {
  665. shared_mutex rwlock;
  666. // try exclusive lock
  667. EXPECT_TRUE(rwlock.try_lock());
  668. rwlock.unlock();
  669. rwlock.lock(); // get the exclusive lock
  670. // while exclusive lock is taken nobody else can get a lock
  671. EXPECT_FALSE(rwlock.try_lock());
  672. EXPECT_FALSE(rwlock.try_lock_shared());
  673. rwlock.unlock();
  674. // try shared lock
  675. EXPECT_TRUE(rwlock.try_lock_shared());
  676. rwlock.unlock_shared();
  677. rwlock.lock_shared(); // get the shared lock
  678. EXPECT_TRUE(rwlock.try_lock_shared()); // make sure we can have multiple shared locks
  679. rwlock.unlock_shared();
  680. rwlock.unlock_shared();
  681. }
  682. // spin threads and run test validity of operations
  683. {
  684. m_currentValue = 0;
  685. memset(m_readSum, 0, sizeof(unsigned int) * AZ_ARRAY_SIZE(m_readSum));
  686. AZStd::thread_desc desc;
  687. desc.m_name = "Test Reader 1";
  688. AZStd::thread t1(desc, [this](int index){ Reader(index); }, 0);
  689. desc.m_name = "Test Reader 2";
  690. AZStd::thread t2(desc, [this](int index){ Reader(index); }, 1);
  691. desc.m_name = "Test Reader 3";
  692. AZStd::thread t3(desc, [this](int index){ Reader(index); }, 2);
  693. desc.m_name = "Test Reader 4";
  694. AZStd::thread t4(desc, [this](int index){ Reader(index); }, 3);
  695. desc.m_name = "Test Writer 1";
  696. AZStd::thread t5(desc, [this](){ Writer(); });
  697. desc.m_name = "Test Writer 2";
  698. AZStd::thread t6(desc, [this](){ Writer(); });
  699. t1.join();
  700. t2.join();
  701. t3.join();
  702. t4.join();
  703. t5.join();
  704. t6.join();
  705. EXPECT_EQ(100, m_currentValue);
  706. // Check for the range of the sums as we don't guarantee adding all numbers.
  707. // The minimum value the range of sums for each thread is 100.
  708. // This occurs in the case where the Reader threads are all starved, while the
  709. // writer threads increments the m_currentValue to 100.
  710. // Afterwards the reader threads grabs the shared_mutex and reads the value of 100 from m_currentValue
  711. // and then finishes the thread execution
  712. EXPECT_GE(m_readSum[0], 100U);
  713. EXPECT_LE(m_readSum[0], 5050U);
  714. EXPECT_GE(m_readSum[1], 100U);
  715. EXPECT_LE(m_readSum[1], 5050U);
  716. EXPECT_GE(m_readSum[2], 100U);
  717. EXPECT_LE(m_readSum[2], 5050U);
  718. EXPECT_GE(m_readSum[3], 100U);
  719. EXPECT_LE(m_readSum[3], 5050U);
  720. }
  721. }
  722. };
  723. TEST_F(Parallel_SharedMutex, Test)
  724. {
  725. run();
  726. }
  727. class ConditionVariable
  728. : public LeakDetectionFixture
  729. {};
  730. TEST_F(ConditionVariable, NotifyOneSingleWait)
  731. {
  732. AZStd::condition_variable cv;
  733. AZStd::mutex cv_mutex;
  734. AZStd::atomic_int i(0);
  735. AZStd::atomic_bool done(false);
  736. auto wait = [&]()
  737. {
  738. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  739. cv.wait(lock, [&]{ return i == 1; });
  740. EXPECT_EQ(1, i);
  741. done = true;
  742. };
  743. auto signal = [&]()
  744. {
  745. cv.notify_one();
  746. EXPECT_EQ(0, i);
  747. EXPECT_FALSE(done);
  748. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  749. i = 1;
  750. while (!done)
  751. {
  752. lock.unlock();
  753. cv.notify_one();
  754. lock.lock();
  755. }
  756. };
  757. EXPECT_EQ(0, i);
  758. EXPECT_FALSE(done);
  759. AZStd::thread waitThread1(wait);
  760. AZStd::thread signalThread(signal);
  761. waitThread1.join();
  762. signalThread.join();
  763. EXPECT_EQ(1, i);
  764. EXPECT_TRUE(done);
  765. }
  766. TEST_F(ConditionVariable, NotifyOneMultipleWait)
  767. {
  768. AZStd::condition_variable cv;
  769. AZStd::mutex cv_mutex;
  770. AZStd::atomic_int i(0);
  771. AZStd::atomic_bool done1(false);
  772. AZStd::atomic_bool done2(false);
  773. auto wait1 = [&]()
  774. {
  775. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  776. cv.wait(lock, [&] { return i == 1; });
  777. EXPECT_EQ(1, i);
  778. done1 = true;
  779. };
  780. auto wait2 = [&]()
  781. {
  782. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  783. cv.wait(lock, [&] { return i == 1; });
  784. EXPECT_EQ(1, i);
  785. done2 = true;
  786. };
  787. auto signal = [&]()
  788. {
  789. cv.notify_one();
  790. EXPECT_EQ(0, i);
  791. EXPECT_FALSE(done1);
  792. EXPECT_FALSE(done2);
  793. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  794. i = 1;
  795. while (!(done1 && done2))
  796. {
  797. lock.unlock();
  798. cv.notify_one();
  799. lock.lock();
  800. }
  801. };
  802. EXPECT_EQ(0, i);
  803. EXPECT_FALSE(done1);
  804. EXPECT_FALSE(done2);
  805. AZStd::thread waitThread1(wait1);
  806. AZStd::thread waitThread2(wait2);
  807. AZStd::thread signalThread(signal);
  808. waitThread1.join();
  809. waitThread2.join();
  810. signalThread.join();
  811. EXPECT_EQ(1, i);
  812. EXPECT_TRUE(done1);
  813. EXPECT_TRUE(done2);
  814. }
  815. TEST_F(ConditionVariable, NotifyAll)
  816. {
  817. AZStd::condition_variable cv;
  818. AZStd::mutex cv_mutex;
  819. AZStd::atomic_int i(0);
  820. auto wait = [&]()
  821. {
  822. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  823. cv.wait(lock, [&] { return i == 1; });
  824. };
  825. auto signal = [&]()
  826. {
  827. AZStd::this_thread::sleep_for(AZStd::chrono::milliseconds(1));
  828. {
  829. AZStd::lock_guard<AZStd::mutex> lock(cv_mutex);
  830. i = 0;
  831. }
  832. cv.notify_all();
  833. EXPECT_EQ(0, i);
  834. AZStd::this_thread::sleep_for(AZStd::chrono::milliseconds(1));
  835. {
  836. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  837. i = 1;
  838. }
  839. cv.notify_all();
  840. };
  841. EXPECT_EQ(0, i);
  842. AZStd::thread waitThreads[8];
  843. for (size_t threadIdx = 0; threadIdx < AZ_ARRAY_SIZE(waitThreads); ++threadIdx)
  844. {
  845. waitThreads[threadIdx] = AZStd::thread(wait);
  846. }
  847. AZStd::thread signalThread(signal);
  848. for (auto& thread : waitThreads)
  849. {
  850. thread.join();
  851. }
  852. signalThread.join();
  853. EXPECT_EQ(1, i);
  854. }
  855. // ensure that WaitUntil actually waits until the time specified instead of returning spuriously and instantly.
  856. TEST_F(ConditionVariable, Wait_Until_NoPredicate_ActuallyWaits)
  857. {
  858. AZStd::condition_variable cv;
  859. AZStd::mutex cv_mutex;
  860. AZStd::atomic<AZStd::cv_status> status = { AZStd::cv_status::no_timeout };
  861. AZStd::chrono::steady_clock::time_point startTime;
  862. // note that we capture the start and end time in the thread - this is because threads starting and stopping
  863. // can have unpredictable scheduling.
  864. AZStd::chrono::milliseconds timeSpent;
  865. auto wait = [&]()
  866. {
  867. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  868. auto waitDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  869. startTime = AZStd::chrono::steady_clock::now();
  870. auto waitUntilTime = startTime + waitDuration;
  871. status = cv.wait_until(lock, waitUntilTime);
  872. timeSpent = AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - startTime);
  873. };
  874. // we aren't going to signal it, and ensure the timeout was reached.
  875. AZStd::thread waitThread1(wait);
  876. waitThread1.join();
  877. // the duration given is a minimum time, for wait_until, so we should have timed out above
  878. EXPECT_GE(timeSpent, AZStd::chrono::milliseconds(WAIT_TIME_MS - MARGIN_OF_ERROR_MS));
  879. EXPECT_TRUE(status == AZStd::cv_status::timeout);
  880. }
  881. TEST_F(ConditionVariable, Wait_Until_TimeAlreadyPassed_DoesNotWait)
  882. {
  883. AZStd::condition_variable cv;
  884. AZStd::mutex cv_mutex;
  885. AZStd::atomic<AZStd::cv_status> status = { AZStd::cv_status::no_timeout };
  886. AZStd::chrono::steady_clock::time_point startTime;
  887. AZStd::chrono::milliseconds timeSpent;
  888. auto wait = [&]()
  889. {
  890. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  891. auto waitUntilTime = AZStd::chrono::steady_clock::now();
  892. startTime = waitUntilTime;
  893. status = cv.wait_until(lock, waitUntilTime);
  894. timeSpent = AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - startTime);
  895. };
  896. AZStd::thread waitThread1(wait);
  897. waitThread1.join();
  898. // we should have timed out immediately
  899. EXPECT_LT(timeSpent, AZStd::chrono::milliseconds(MARGIN_OF_ERROR_MS));
  900. EXPECT_TRUE(status == AZStd::cv_status::timeout);
  901. }
  902. TEST_F(ConditionVariable, Wait_Until_Predicate_TimeAlreadyPassed_DoesNotWait)
  903. {
  904. AZStd::condition_variable cv;
  905. AZStd::mutex cv_mutex;
  906. AZStd::atomic_bool status = { true };
  907. auto pred = [](){ return false; };
  908. AZStd::chrono::steady_clock::time_point startTime;
  909. AZStd::chrono::milliseconds timeSpent;
  910. auto wait = [&]()
  911. {
  912. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  913. auto waitUntilTime = AZStd::chrono::steady_clock::now();
  914. startTime = waitUntilTime;
  915. status = cv.wait_until(lock, waitUntilTime, pred);
  916. timeSpent = AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - startTime);
  917. };
  918. AZStd::thread waitThread1(wait);
  919. waitThread1.join();
  920. // we should have timed out immediately:
  921. EXPECT_LT(timeSpent, AZStd::chrono::milliseconds(MARGIN_OF_ERROR_MS));
  922. EXPECT_FALSE(status); // if the time has passed, the status should be false.
  923. }
  924. // ensure that WaitUntil actually waits until the time specified instead of returning spuriously and instantly.
  925. TEST_F(ConditionVariable, Wait_Until_FalsePredicate_ActuallyWaits)
  926. {
  927. AZStd::condition_variable cv;
  928. AZStd::mutex cv_mutex;
  929. AZStd::atomic_bool retVal = { true };
  930. auto pred = []() { return false; }; // should cause it to wait the entire duration
  931. AZStd::chrono::steady_clock::time_point startTime;
  932. AZStd::chrono::milliseconds timeSpent;
  933. auto wait = [&]()
  934. {
  935. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  936. auto waitDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  937. startTime = AZStd::chrono::steady_clock::now();
  938. auto waitUntilTime = startTime + waitDuration;
  939. retVal = cv.wait_until(lock, waitUntilTime, pred);
  940. timeSpent = AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - startTime);
  941. };
  942. // we aren't going to signal it, and ensure the timeout was reached.
  943. AZStd::thread waitThread1(wait);
  944. waitThread1.join();
  945. // the duration given is a minimum time, for wait_until, so we should have timed out above
  946. EXPECT_GE(timeSpent, AZStd::chrono::milliseconds(WAIT_TIME_MS - MARGIN_OF_ERROR_MS));
  947. EXPECT_FALSE(retVal); // we didn't wake up
  948. }
  949. // ensure that WaitUntil with a predicate returns true when the predicate is true
  950. TEST_F(ConditionVariable, Wait_Until_TruePredicate_DoesNotWait)
  951. {
  952. AZStd::condition_variable cv;
  953. AZStd::mutex cv_mutex;
  954. AZStd::atomic_bool retVal = { true };
  955. AZStd::chrono::steady_clock::time_point startTime;
  956. AZStd::chrono::milliseconds timeSpent;
  957. auto pred = []() { return true; }; // should cause it to immediately return
  958. auto wait = [&]()
  959. {
  960. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  961. auto waitDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  962. startTime = AZStd::chrono::steady_clock::now();
  963. auto waitUntilTime = startTime + waitDuration;
  964. retVal = cv.wait_until(lock, waitUntilTime, pred);
  965. timeSpent = AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - startTime);
  966. };
  967. AZStd::thread waitThread1(wait);
  968. waitThread1.join();
  969. // we should NOT have reached the minimum time or in fact waited at all:
  970. EXPECT_LE(timeSpent, AZStd::chrono::milliseconds(MARGIN_OF_ERROR_MS));
  971. EXPECT_TRUE(retVal); // we didn't wake up but still returned true.
  972. }
  973. // ensure that WaitFor actually waits for a non zero amount of time and that there are no assertions in it
  974. // (if there are, the listener will trigger)
  975. TEST_F(ConditionVariable, Wait_For_ActuallyWaits)
  976. {
  977. AZStd::condition_variable cv;
  978. AZStd::mutex cv_mutex;
  979. AZStd::atomic<AZStd::cv_status> status = { AZStd::cv_status::no_timeout };
  980. AZStd::chrono::steady_clock::time_point startTime;
  981. AZStd::chrono::milliseconds timeSpent;
  982. auto wait = [&]()
  983. {
  984. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  985. auto waitDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  986. startTime = AZStd::chrono::steady_clock::now();
  987. status = cv.wait_for(lock, waitDuration);
  988. timeSpent = AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - startTime);
  989. };
  990. // we aren't going to signal it, and ensure the timeout was reached.
  991. AZStd::thread waitThread1(wait);
  992. waitThread1.join();
  993. // note that wait_for is allowed to spuriously wake up on some platforms but even when it does, its likely to
  994. // have taken longer than margin of error to do so. If the below triggers, its because it wasn't sleeping at
  995. // all and there is an error in the implementation which is causing it to return without sleeping.
  996. EXPECT_GE(timeSpent, AZStd::chrono::milliseconds(MARGIN_OF_ERROR_MS));
  997. EXPECT_TRUE(status == AZStd::cv_status::timeout);
  998. }
  999. TEST_F(ConditionVariable, Wait_For_Predicate_ActuallyWaits)
  1000. {
  1001. AZStd::condition_variable cv;
  1002. AZStd::mutex cv_mutex;
  1003. AZStd::atomic_bool status = {true};
  1004. auto pred = []() { return false; };
  1005. AZStd::chrono::steady_clock::time_point startTime;
  1006. AZStd::chrono::milliseconds timeSpent;
  1007. auto wait = [&]()
  1008. {
  1009. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  1010. startTime = AZStd::chrono::steady_clock::now();
  1011. auto waitDuration = AZStd::chrono::milliseconds(WAIT_TIME_MS);
  1012. status = cv.wait_for(lock, waitDuration, pred);
  1013. timeSpent = AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - startTime);
  1014. };
  1015. // we aren't going to signal it, and ensure the timeout was reached.
  1016. AZStd::thread waitThread1(wait);
  1017. waitThread1.join();
  1018. // wait for with predicate false should always wait the full time.
  1019. EXPECT_GE(timeSpent, AZStd::chrono::milliseconds(WAIT_TIME_MS - MARGIN_OF_ERROR_MS));
  1020. EXPECT_FALSE(status); // we get no signal, we return false.
  1021. }
  1022. TEST_F(ConditionVariable, WaitUntil_Signalled_WakesUp)
  1023. {
  1024. AZStd::condition_variable cv;
  1025. AZStd::mutex cv_mutex;
  1026. AZStd::atomic_int i(0);
  1027. AZStd::atomic_bool done(false);
  1028. AZStd::chrono::steady_clock::time_point startTime;
  1029. AZStd::chrono::milliseconds timeSpent;
  1030. constexpr AZStd::chrono::seconds waitTimeCrossThread(10);
  1031. // normally we'd wait for WAIT_TIME_MS, but in this case, a completely different thread is doing the signalling,
  1032. // and it could be very slow to start if the machine is under load. So instead, we wait for a long time.
  1033. // In normal conditions, the wait will be very short (milliseconds), since we start the other thread that wakes
  1034. // this one up immediately.
  1035. auto wait = [&]()
  1036. {
  1037. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  1038. auto waitDuration = waitTimeCrossThread;
  1039. startTime = AZStd::chrono::steady_clock::now();
  1040. auto waitUntilTime = startTime + waitDuration;
  1041. // we expect the other thread to wake us up before the timeout expires so the following should return true
  1042. EXPECT_TRUE(cv.wait_until(lock, waitUntilTime, [&]{ return i == 1; }));
  1043. timeSpent = AZStd::chrono::duration_cast<AZStd::chrono::milliseconds>(AZStd::chrono::steady_clock::now() - startTime);
  1044. EXPECT_EQ(1, i);
  1045. done = true;
  1046. };
  1047. auto signal = [&]()
  1048. {
  1049. cv.notify_one();
  1050. EXPECT_EQ(0, i);
  1051. EXPECT_FALSE(done);
  1052. AZStd::unique_lock<AZStd::mutex> lock(cv_mutex);
  1053. i = 1;
  1054. while (!done)
  1055. {
  1056. lock.unlock();
  1057. cv.notify_one();
  1058. lock.lock();
  1059. }
  1060. };
  1061. EXPECT_EQ(0, i);
  1062. EXPECT_FALSE(done);
  1063. AZStd::thread waitThread1(wait);
  1064. AZStd::thread signalThread(signal);
  1065. waitThread1.join();
  1066. signalThread.join();
  1067. // we expect this to resolve before the maximum timeout.
  1068. EXPECT_LT(timeSpent, waitTimeCrossThread);
  1069. EXPECT_EQ(1, i);
  1070. EXPECT_TRUE(done);
  1071. }
  1072. // Fixture for thread-event-bus related calls
  1073. // exists only to categorize the tests.
  1074. class ThreadEventsBus :
  1075. public LeakDetectionFixture
  1076. {
  1077. };
  1078. template <typename T> class ThreadEventCounter :
  1079. public T
  1080. {
  1081. public:
  1082. void Connect() { T::BusConnect(); }
  1083. void Disconnect() { T::BusDisconnect(); }
  1084. virtual ~ThreadEventCounter() {}
  1085. int m_enterCount = 0;
  1086. int m_exitCount = 0;
  1087. protected:
  1088. void OnThreadEnter(const AZStd::thread::id&, const AZStd::thread_desc*) override
  1089. {
  1090. ++m_enterCount;
  1091. }
  1092. void OnThreadExit([[maybe_unused]] const AZStd::thread::id& id) override
  1093. {
  1094. ++m_exitCount;
  1095. }
  1096. };
  1097. TEST_F(ThreadEventsBus, Broadcasts_BothBusses)
  1098. {
  1099. ThreadEventCounter<AZStd::ThreadEventBus::Handler> eventBusCounter;
  1100. auto thread_function = [&]()
  1101. {
  1102. ; // intentionally left blank
  1103. };
  1104. eventBusCounter.Connect();
  1105. AZStd::thread starter = AZStd::thread(thread_function);
  1106. starter.join();
  1107. EXPECT_EQ(eventBusCounter.m_enterCount, 1);
  1108. EXPECT_EQ(eventBusCounter.m_exitCount, 1);
  1109. eventBusCounter.Disconnect();
  1110. }
  1111. // This class tests for deadlocks caused by multiple threads interacting with the ThreadEventBus.
  1112. // Client code can connect to the ThreadEventBus and be told when threads are started and stopped.
  1113. // A deadlock condition could be caused if they lock a mutex that another thread needs in order to proceed.
  1114. // This test makes sure that using the ThreadEventBus does NOT cause a deadlock.
  1115. // We will simulate this series of events by doing the following
  1116. // 1. Main thread listens on the ThreadEventBus
  1117. // 2. OnThreadExit will lock a mutex, perform an allocation, unlock a mutex
  1118. // 3. The thread itself will lock the mutex, perform an allocation, unlock the mutex.
  1119. // As long as there is no cross talk between threads, the above operation should not deadlock.
  1120. // If there is, then a deadlock can occur where one thread will be unable to perform
  1121. // its allocation because the other is in OnThreadExit() and the other will not be able to perform
  1122. // OnThreadExit() because it cannot lock the mutex.
  1123. class ThreadEventsDeathTest :
  1124. public LeakDetectionFixture
  1125. {
  1126. };
  1127. class DeadlockCauser : public AZStd::ThreadEventBus::Handler
  1128. {
  1129. public:
  1130. virtual ~DeadlockCauser() { }
  1131. AZStd::recursive_mutex deadlock_mutex;
  1132. void PerformTest()
  1133. {
  1134. BusConnect();
  1135. auto thread_function = [&]()
  1136. {
  1137. // to cause the most amount of flapping threads
  1138. // will yield between each instruction
  1139. AZStd::this_thread::yield();
  1140. AZStd::lock_guard<AZStd::recursive_mutex> locker(this->deadlock_mutex);
  1141. AZStd::this_thread::yield();
  1142. char* mybuffer = (char*)azmalloc(1024 * 1024);
  1143. AZStd::this_thread::yield();
  1144. azfree(mybuffer);
  1145. AZStd::this_thread::yield();
  1146. };
  1147. // IF there's crosstalk between the threads
  1148. // then this deadlocks, instantly, every time, even with just 2 threads.
  1149. // To avoid killing our test CI system, we'll do this in another thread, and kill it
  1150. // if it takes longer than a few seconds.
  1151. AZStd::atomic_bool doneIt = {false};
  1152. auto deadlocker_function = [&]()
  1153. {
  1154. // this test is a 10/10 failure
  1155. // at just 4 retry_count, we quadruple it here to make sure
  1156. for (int retry_count = 0; retry_count < 16; ++retry_count)
  1157. {
  1158. // cause some contention for the mutex
  1159. // it takes only 2 threads to do this 10/10 times... quadruple it
  1160. // to ensure reproduction.
  1161. AZStd::thread waitThreads[8];
  1162. for (size_t threadIdx = 0; threadIdx < AZ_ARRAY_SIZE(waitThreads); ++threadIdx)
  1163. {
  1164. AZStd::this_thread::yield();
  1165. waitThreads[threadIdx] = AZStd::thread(thread_function);
  1166. AZStd::this_thread::yield();
  1167. }
  1168. for (auto& thread : waitThreads)
  1169. {
  1170. thread.join();
  1171. }
  1172. }
  1173. doneIt.store(true);
  1174. };
  1175. AZStd::thread deadlocker_thread = AZStd::thread(deadlocker_function);
  1176. chrono::steady_clock::time_point startTime = chrono::steady_clock::now();
  1177. while (!doneIt.load())
  1178. {
  1179. AZStd::this_thread::yield();
  1180. auto sleepTime = chrono::steady_clock::now() - startTime;
  1181. // the test normally succeeds in under a second
  1182. // but machines can be slow, so we'll give it 20x the
  1183. // necessary time
  1184. if (AZStd::chrono::duration_cast<AZStd::chrono::seconds>(sleepTime).count() > 20)
  1185. {
  1186. // this should not take that long, we have deadlocked.
  1187. break;
  1188. }
  1189. }
  1190. // if we're deadlocked, doneIt will be false
  1191. // and if that happens, we have to exit() because
  1192. // everyting will deadlock (forever) and this will jeaopardize the CI system.
  1193. EXPECT_TRUE(doneIt.load()) << "A test has deadlocked, aborting module";
  1194. if (!doneIt.load())
  1195. {
  1196. exit(1);
  1197. }
  1198. BusDisconnect();
  1199. deadlocker_thread.join();
  1200. }
  1201. void OnThreadExit(const AZStd::thread::id&) override
  1202. {
  1203. AZStd::this_thread::yield();
  1204. AZStd::lock_guard<AZStd::recursive_mutex> locker(this->deadlock_mutex);
  1205. AZStd::this_thread::yield();
  1206. char* mybuffer = (char*)azmalloc(1024 * 1024);
  1207. AZStd::this_thread::yield();
  1208. azfree(mybuffer);
  1209. AZStd::this_thread::yield();
  1210. }
  1211. void OnThreadEnter(const AZStd::thread::id& id, const AZStd::thread_desc*) override
  1212. {
  1213. OnThreadExit(id);
  1214. }
  1215. };
  1216. #if GTEST_HAS_DEATH_TEST
  1217. #if AZ_TRAIT_DISABLE_FAILED_DEATH_TESTS
  1218. TEST_F(ThreadEventsDeathTest, DISABLED_UsingClientBus_AvoidsDeadlock)
  1219. #else
  1220. TEST_F(ThreadEventsDeathTest, UsingClientBus_AvoidsDeadlock)
  1221. #endif
  1222. {
  1223. EXPECT_EXIT(
  1224. {
  1225. DeadlockCauser cause;
  1226. cause.PerformTest();
  1227. // you MUST exit for EXPECT_EXIT to function.
  1228. exit(0); // this will cause spew, but it wont be considered to have failed.
  1229. }
  1230. , ::testing::ExitedWithCode(0),".*");
  1231. }
  1232. #endif // GTEST_HAS_DEATH_TEST
  1233. }