123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- /*
- * Copyright (c) 2016-2021, Yann Collet, Facebook, Inc.
- * All rights reserved.
- *
- * This source code is licensed under both the BSD-style license (found in the
- * LICENSE file in the root directory of this source tree) and the GPLv2 (found
- * in the COPYING file in the root directory of this source tree).
- * You may select, at your option, one of the above-listed licenses.
- */
- #include "pool.h"
- #include "threading.h"
- #include "util.h"
- #include "timefn.h"
- #include <stddef.h>
- #include <stdio.h>
- #define ASSERT_TRUE(p) \
- do { \
- if (!(p)) { \
- return 1; \
- } \
- } while (0)
- #define ASSERT_FALSE(p) ASSERT_TRUE(!(p))
- #define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs))
- struct data {
- ZSTD_pthread_mutex_t mutex;
- unsigned data[16];
- size_t i;
- };
- static void fn(void *opaque)
- {
- struct data *data = (struct data *)opaque;
- ZSTD_pthread_mutex_lock(&data->mutex);
- data->data[data->i] = (unsigned)(data->i);
- ++data->i;
- ZSTD_pthread_mutex_unlock(&data->mutex);
- }
- static int testOrder(size_t numThreads, size_t queueSize)
- {
- struct data data;
- POOL_ctx* const ctx = POOL_create(numThreads, queueSize);
- ASSERT_TRUE(ctx);
- data.i = 0;
- ASSERT_FALSE(ZSTD_pthread_mutex_init(&data.mutex, NULL));
- { size_t i;
- for (i = 0; i < 16; ++i) {
- POOL_add(ctx, &fn, &data);
- }
- }
- POOL_free(ctx);
- ASSERT_EQ(16, data.i);
- { size_t i;
- for (i = 0; i < data.i; ++i) {
- ASSERT_EQ(i, data.data[i]);
- }
- }
- ZSTD_pthread_mutex_destroy(&data.mutex);
- return 0;
- }
- /* --- test deadlocks --- */
- static void waitFn(void *opaque) {
- (void)opaque;
- UTIL_sleepMilli(1);
- }
- /* Tests for deadlock */
- static int testWait(size_t numThreads, size_t queueSize) {
- struct data data;
- POOL_ctx* const ctx = POOL_create(numThreads, queueSize);
- ASSERT_TRUE(ctx);
- { size_t i;
- for (i = 0; i < 16; ++i) {
- POOL_add(ctx, &waitFn, &data);
- }
- }
- POOL_free(ctx);
- return 0;
- }
- /* --- test POOL_resize() --- */
- typedef struct {
- ZSTD_pthread_mutex_t mut;
- int countdown;
- int val;
- int max;
- ZSTD_pthread_cond_t cond;
- } poolTest_t;
- static void waitLongFn(void *opaque) {
- poolTest_t* const test = (poolTest_t*) opaque;
- ZSTD_pthread_mutex_lock(&test->mut);
- test->val++;
- if (test->val > test->max)
- test->max = test->val;
- ZSTD_pthread_mutex_unlock(&test->mut);
- UTIL_sleepMilli(10);
- ZSTD_pthread_mutex_lock(&test->mut);
- test->val--;
- test->countdown--;
- if (test->countdown == 0)
- ZSTD_pthread_cond_signal(&test->cond);
- ZSTD_pthread_mutex_unlock(&test->mut);
- }
- static int testThreadReduction_internal(POOL_ctx* ctx, poolTest_t test)
- {
- int const nbWaits = 16;
- test.countdown = nbWaits;
- test.val = 0;
- test.max = 0;
- { int i;
- for (i=0; i<nbWaits; i++)
- POOL_add(ctx, &waitLongFn, &test);
- }
- ZSTD_pthread_mutex_lock(&test.mut);
- while (test.countdown > 0)
- ZSTD_pthread_cond_wait(&test.cond, &test.mut);
- ASSERT_EQ(test.val, 0);
- ASSERT_EQ(test.max, 4);
- ZSTD_pthread_mutex_unlock(&test.mut);
- ASSERT_EQ( POOL_resize(ctx, 2/*nbThreads*/) , 0 );
- test.countdown = nbWaits;
- test.val = 0;
- test.max = 0;
- { int i;
- for (i=0; i<nbWaits; i++)
- POOL_add(ctx, &waitLongFn, &test);
- }
- ZSTD_pthread_mutex_lock(&test.mut);
- while (test.countdown > 0)
- ZSTD_pthread_cond_wait(&test.cond, &test.mut);
- ASSERT_EQ(test.val, 0);
- ASSERT_EQ(test.max, 2);
- ZSTD_pthread_mutex_unlock(&test.mut);
- return 0;
- }
- static int testThreadReduction(void) {
- int result;
- poolTest_t test;
- POOL_ctx* const ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/);
- ASSERT_TRUE(ctx);
- memset(&test, 0, sizeof(test));
- ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
- ASSERT_FALSE( ZSTD_pthread_cond_init(&test.cond, NULL) );
- result = testThreadReduction_internal(ctx, test);
- ZSTD_pthread_mutex_destroy(&test.mut);
- ZSTD_pthread_cond_destroy(&test.cond);
- POOL_free(ctx);
- return result;
- }
- /* --- test abrupt ending --- */
- typedef struct {
- ZSTD_pthread_mutex_t mut;
- int val;
- } abruptEndCanary_t;
- static void waitIncFn(void *opaque) {
- abruptEndCanary_t* test = (abruptEndCanary_t*) opaque;
- UTIL_sleepMilli(10);
- ZSTD_pthread_mutex_lock(&test->mut);
- test->val = test->val + 1;
- ZSTD_pthread_mutex_unlock(&test->mut);
- }
- static int testAbruptEnding_internal(abruptEndCanary_t test)
- {
- int const nbWaits = 16;
- POOL_ctx* const ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
- ASSERT_TRUE(ctx);
- test.val = 0;
- { int i;
- for (i=0; i<nbWaits; i++)
- POOL_add(ctx, &waitIncFn, &test); /* all jobs pushed into queue */
- }
- ASSERT_EQ( POOL_resize(ctx, 1 /*numThreads*/) , 0 ); /* downsize numThreads, to try to break end condition */
- POOL_free(ctx); /* must finish all jobs in queue before giving back control */
- ASSERT_EQ(test.val, nbWaits);
- return 0;
- }
- static int testAbruptEnding(void) {
- int result;
- abruptEndCanary_t test;
- memset(&test, 0, sizeof(test));
- ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
- result = testAbruptEnding_internal(test);
- ZSTD_pthread_mutex_destroy(&test.mut);
- return result;
- }
- /* --- test launcher --- */
- int main(int argc, const char **argv) {
- size_t numThreads;
- (void)argc;
- (void)argv;
- if (POOL_create(0, 1)) { /* should not be possible */
- printf("FAIL: should not create POOL with 0 threads\n");
- return 1;
- }
- for (numThreads = 1; numThreads <= 4; ++numThreads) {
- size_t queueSize;
- for (queueSize = 0; queueSize <= 2; ++queueSize) {
- printf("queueSize==%u, numThreads=%u \n",
- (unsigned)queueSize, (unsigned)numThreads);
- if (testOrder(numThreads, queueSize)) {
- printf("FAIL: testOrder\n");
- return 1;
- }
- printf("SUCCESS: testOrder\n");
- if (testWait(numThreads, queueSize)) {
- printf("FAIL: testWait\n");
- return 1;
- }
- printf("SUCCESS: testWait\n");
- }
- }
- if (testThreadReduction()) {
- printf("FAIL: thread reduction not effective \n");
- return 1;
- } else {
- printf("SUCCESS: thread reduction effective \n");
- }
- if (testAbruptEnding()) {
- printf("FAIL: jobs in queue not completed on early end \n");
- return 1;
- } else {
- printf("SUCCESS: all jobs in queue completed on early end \n");
- }
- printf("PASS: all POOL tests\n");
- return 0;
- }
|