poolTests.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. /*
  2. * Copyright (c) 2016-2021, Yann Collet, Facebook, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under both the BSD-style license (found in the
  6. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  7. * in the COPYING file in the root directory of this source tree).
  8. * You may select, at your option, one of the above-listed licenses.
  9. */
  10. #include "pool.h"
  11. #include "threading.h"
  12. #include "util.h"
  13. #include "timefn.h"
  14. #include <stddef.h>
  15. #include <stdio.h>
  16. #define ASSERT_TRUE(p) \
  17. do { \
  18. if (!(p)) { \
  19. return 1; \
  20. } \
  21. } while (0)
  22. #define ASSERT_FALSE(p) ASSERT_TRUE(!(p))
  23. #define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs))
  24. struct data {
  25. ZSTD_pthread_mutex_t mutex;
  26. unsigned data[16];
  27. size_t i;
  28. };
  29. static void fn(void *opaque)
  30. {
  31. struct data *data = (struct data *)opaque;
  32. ZSTD_pthread_mutex_lock(&data->mutex);
  33. data->data[data->i] = (unsigned)(data->i);
  34. ++data->i;
  35. ZSTD_pthread_mutex_unlock(&data->mutex);
  36. }
  37. static int testOrder(size_t numThreads, size_t queueSize)
  38. {
  39. struct data data;
  40. POOL_ctx* const ctx = POOL_create(numThreads, queueSize);
  41. ASSERT_TRUE(ctx);
  42. data.i = 0;
  43. ASSERT_FALSE(ZSTD_pthread_mutex_init(&data.mutex, NULL));
  44. { size_t i;
  45. for (i = 0; i < 16; ++i) {
  46. POOL_add(ctx, &fn, &data);
  47. }
  48. }
  49. POOL_free(ctx);
  50. ASSERT_EQ(16, data.i);
  51. { size_t i;
  52. for (i = 0; i < data.i; ++i) {
  53. ASSERT_EQ(i, data.data[i]);
  54. }
  55. }
  56. ZSTD_pthread_mutex_destroy(&data.mutex);
  57. return 0;
  58. }
  59. /* --- test deadlocks --- */
  60. static void waitFn(void *opaque) {
  61. (void)opaque;
  62. UTIL_sleepMilli(1);
  63. }
  64. /* Tests for deadlock */
  65. static int testWait(size_t numThreads, size_t queueSize) {
  66. struct data data;
  67. POOL_ctx* const ctx = POOL_create(numThreads, queueSize);
  68. ASSERT_TRUE(ctx);
  69. { size_t i;
  70. for (i = 0; i < 16; ++i) {
  71. POOL_add(ctx, &waitFn, &data);
  72. }
  73. }
  74. POOL_free(ctx);
  75. return 0;
  76. }
  77. /* --- test POOL_resize() --- */
  78. typedef struct {
  79. ZSTD_pthread_mutex_t mut;
  80. int countdown;
  81. int val;
  82. int max;
  83. ZSTD_pthread_cond_t cond;
  84. } poolTest_t;
  85. static void waitLongFn(void *opaque) {
  86. poolTest_t* const test = (poolTest_t*) opaque;
  87. ZSTD_pthread_mutex_lock(&test->mut);
  88. test->val++;
  89. if (test->val > test->max)
  90. test->max = test->val;
  91. ZSTD_pthread_mutex_unlock(&test->mut);
  92. UTIL_sleepMilli(10);
  93. ZSTD_pthread_mutex_lock(&test->mut);
  94. test->val--;
  95. test->countdown--;
  96. if (test->countdown == 0)
  97. ZSTD_pthread_cond_signal(&test->cond);
  98. ZSTD_pthread_mutex_unlock(&test->mut);
  99. }
  100. static int testThreadReduction_internal(POOL_ctx* ctx, poolTest_t test)
  101. {
  102. int const nbWaits = 16;
  103. test.countdown = nbWaits;
  104. test.val = 0;
  105. test.max = 0;
  106. { int i;
  107. for (i=0; i<nbWaits; i++)
  108. POOL_add(ctx, &waitLongFn, &test);
  109. }
  110. ZSTD_pthread_mutex_lock(&test.mut);
  111. while (test.countdown > 0)
  112. ZSTD_pthread_cond_wait(&test.cond, &test.mut);
  113. ASSERT_EQ(test.val, 0);
  114. ASSERT_EQ(test.max, 4);
  115. ZSTD_pthread_mutex_unlock(&test.mut);
  116. ASSERT_EQ( POOL_resize(ctx, 2/*nbThreads*/) , 0 );
  117. test.countdown = nbWaits;
  118. test.val = 0;
  119. test.max = 0;
  120. { int i;
  121. for (i=0; i<nbWaits; i++)
  122. POOL_add(ctx, &waitLongFn, &test);
  123. }
  124. ZSTD_pthread_mutex_lock(&test.mut);
  125. while (test.countdown > 0)
  126. ZSTD_pthread_cond_wait(&test.cond, &test.mut);
  127. ASSERT_EQ(test.val, 0);
  128. ASSERT_EQ(test.max, 2);
  129. ZSTD_pthread_mutex_unlock(&test.mut);
  130. return 0;
  131. }
  132. static int testThreadReduction(void) {
  133. int result;
  134. poolTest_t test;
  135. POOL_ctx* const ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/);
  136. ASSERT_TRUE(ctx);
  137. memset(&test, 0, sizeof(test));
  138. ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
  139. ASSERT_FALSE( ZSTD_pthread_cond_init(&test.cond, NULL) );
  140. result = testThreadReduction_internal(ctx, test);
  141. ZSTD_pthread_mutex_destroy(&test.mut);
  142. ZSTD_pthread_cond_destroy(&test.cond);
  143. POOL_free(ctx);
  144. return result;
  145. }
  146. /* --- test abrupt ending --- */
  147. typedef struct {
  148. ZSTD_pthread_mutex_t mut;
  149. int val;
  150. } abruptEndCanary_t;
  151. static void waitIncFn(void *opaque) {
  152. abruptEndCanary_t* test = (abruptEndCanary_t*) opaque;
  153. UTIL_sleepMilli(10);
  154. ZSTD_pthread_mutex_lock(&test->mut);
  155. test->val = test->val + 1;
  156. ZSTD_pthread_mutex_unlock(&test->mut);
  157. }
  158. static int testAbruptEnding_internal(abruptEndCanary_t test)
  159. {
  160. int const nbWaits = 16;
  161. POOL_ctx* const ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
  162. ASSERT_TRUE(ctx);
  163. test.val = 0;
  164. { int i;
  165. for (i=0; i<nbWaits; i++)
  166. POOL_add(ctx, &waitIncFn, &test); /* all jobs pushed into queue */
  167. }
  168. ASSERT_EQ( POOL_resize(ctx, 1 /*numThreads*/) , 0 ); /* downsize numThreads, to try to break end condition */
  169. POOL_free(ctx); /* must finish all jobs in queue before giving back control */
  170. ASSERT_EQ(test.val, nbWaits);
  171. return 0;
  172. }
  173. static int testAbruptEnding(void) {
  174. int result;
  175. abruptEndCanary_t test;
  176. memset(&test, 0, sizeof(test));
  177. ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
  178. result = testAbruptEnding_internal(test);
  179. ZSTD_pthread_mutex_destroy(&test.mut);
  180. return result;
  181. }
  182. /* --- test launcher --- */
  183. int main(int argc, const char **argv) {
  184. size_t numThreads;
  185. (void)argc;
  186. (void)argv;
  187. if (POOL_create(0, 1)) { /* should not be possible */
  188. printf("FAIL: should not create POOL with 0 threads\n");
  189. return 1;
  190. }
  191. for (numThreads = 1; numThreads <= 4; ++numThreads) {
  192. size_t queueSize;
  193. for (queueSize = 0; queueSize <= 2; ++queueSize) {
  194. printf("queueSize==%u, numThreads=%u \n",
  195. (unsigned)queueSize, (unsigned)numThreads);
  196. if (testOrder(numThreads, queueSize)) {
  197. printf("FAIL: testOrder\n");
  198. return 1;
  199. }
  200. printf("SUCCESS: testOrder\n");
  201. if (testWait(numThreads, queueSize)) {
  202. printf("FAIL: testWait\n");
  203. return 1;
  204. }
  205. printf("SUCCESS: testWait\n");
  206. }
  207. }
  208. if (testThreadReduction()) {
  209. printf("FAIL: thread reduction not effective \n");
  210. return 1;
  211. } else {
  212. printf("SUCCESS: thread reduction effective \n");
  213. }
  214. if (testAbruptEnding()) {
  215. printf("FAIL: jobs in queue not completed on early end \n");
  216. return 1;
  217. } else {
  218. printf("SUCCESS: all jobs in queue completed on early end \n");
  219. }
  220. printf("PASS: all POOL tests\n");
  221. return 0;
  222. }