From b55ae009ac55620ca479b263eb484b35af6be67f Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 1 Oct 2020 13:45:29 -0700 Subject: [PATCH 1/7] [zstdmt] Remove singleBlockingThread mode This is already handled by zstd, so this logic is never used. --- lib/compress/zstdmt_compress.c | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index b809d602e61..fcb922e8982 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -819,7 +819,6 @@ struct ZSTDMT_CCtx_s { roundBuff_t roundBuff; serialState_t serial; rsyncState_t rsync; - unsigned singleBlockingThread; unsigned jobIDMask; unsigned doneJobID; unsigned nextJobID; @@ -1441,16 +1440,6 @@ size_t ZSTDMT_initCStream_internal( if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN; if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX; - mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ - if (mtctx->singleBlockingThread) { - ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(¶ms); - DEBUGLOG(5, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); - assert(singleThreadParams.nbWorkers == 0); - return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], - dict, dictSize, cdict, - &singleThreadParams, pledgedSrcSize); - } - DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers); if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ @@ -2031,10 +2020,6 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, assert(output->pos <= output->size); assert(input->pos <= input->size); - if (mtctx->singleBlockingThread) { /* delegate to single-thread (synchronous) */ - return ZSTD_compressStream2(mtctx->cctxPool->cctx[0], output, input, endOp); - } - if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) { /* current frame being ended. Only flush/end are allowed */ return ERROR(stage_wrong); @@ -2138,15 +2123,11 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) { DEBUGLOG(5, "ZSTDMT_flushStream"); - if (mtctx->singleBlockingThread) - return ZSTD_flushStream(mtctx->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_flush); } size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) { DEBUGLOG(4, "ZSTDMT_endStream"); - if (mtctx->singleBlockingThread) - return ZSTD_endStream(mtctx->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_end); } From 1784c4b4ab408f15094575bc29b60ed9ada9fad7 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 1 Oct 2020 14:29:13 -0700 Subject: [PATCH 2/7] [zstdmt] Remove single-pass shortcut Simplifies the code and removes blocking from zstdmt. At this point we could completely delete `ZSTDMT_compress_advanced_internal()`. However I'm leaving it in because I think we want to do that in the zstd-1.5.0 release, in case anyone is still using the ZSTDMT API, even though it is not installed by default. Fixes #2327. --- lib/compress/zstdmt_compress.c | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index fcb922e8982..9b583e5f6d1 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -2025,25 +2025,6 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, return ERROR(stage_wrong); } - /* single-pass shortcut (note : synchronous-mode) */ - if ( (!mtctx->params.rsyncable) /* rsyncable mode is disabled */ - && (mtctx->nextJobID == 0) /* just started */ - && (mtctx->inBuff.filled == 0) /* nothing buffered */ - && (!mtctx->jobReady) /* no job already created */ - && (endOp == ZSTD_e_end) /* end order */ - && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */ - size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, - (char*)output->dst + output->pos, output->size - output->pos, - (const char*)input->src + input->pos, input->size - input->pos, - mtctx->cdict, mtctx->params); - if (ZSTD_isError(cSize)) return cSize; - input->pos = input->size; - output->pos += cSize; - mtctx->allJobsCompleted = 1; - mtctx->frameEnded = 1; - return 0; - } - /* fill input buffer */ if ( (!mtctx->jobReady) && (input->size > input->pos) ) { /* support NULL input */ From c51a9e79b9b810587146795c1123aaa62bd1174d Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 1 Oct 2020 18:47:54 -0700 Subject: [PATCH 3/7] [zstdmt] Rip out the zstdmt API This commit leaves only the functions used by zstd_compress.c. All other functions have been removed from the API. The ZSTDMT unit tests in fuzzer.c and zstreamtest.c have been rewritten to use the ZSTD API. And the --mt zstreamtest tests have been ripped out. --- lib/compress/zstdmt_compress.c | 321 +-------------------------------- lib/compress/zstdmt_compress.h | 133 +++----------- tests/Makefile | 1 - tests/fuzzer.c | 26 ++- tests/zstreamtest.c | 302 +------------------------------ 5 files changed, 48 insertions(+), 735 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 9b583e5f6d1..6f3e0622976 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -884,7 +884,7 @@ static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) { /* ZSTDMT_CCtxParam_setNbWorkers(): * Internal use only */ -size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) +static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) { return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers); } @@ -943,11 +943,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem, #endif } -ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers) -{ - return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem, NULL); -} - /* ZSTDMT_releaseAllJobResources() : * note : ensure all workers are killed first ! */ @@ -1019,65 +1014,6 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx) + mtctx->roundBuff.capacity; } -/* Internal only */ -size_t -ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, - ZSTDMT_parameter parameter, - int value) -{ - DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter"); - switch(parameter) - { - case ZSTDMT_p_jobSize : - DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %i", value); - return ZSTD_CCtxParams_setParameter(params, ZSTD_c_jobSize, value); - case ZSTDMT_p_overlapLog : - DEBUGLOG(4, "ZSTDMT_p_overlapLog : %i", value); - return ZSTD_CCtxParams_setParameter(params, ZSTD_c_overlapLog, value); - case ZSTDMT_p_rsyncable : - DEBUGLOG(4, "ZSTD_p_rsyncable : %i", value); - return ZSTD_CCtxParams_setParameter(params, ZSTD_c_rsyncable, value); - default : - return ERROR(parameter_unsupported); - } -} - -size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int value) -{ - DEBUGLOG(4, "ZSTDMT_setMTCtxParameter"); - return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); -} - -size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int* value) -{ - switch (parameter) { - case ZSTDMT_p_jobSize: - return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_jobSize, value); - case ZSTDMT_p_overlapLog: - return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_overlapLog, value); - case ZSTDMT_p_rsyncable: - return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_rsyncable, value); - default: - return ERROR(parameter_unsupported); - } -} - -/* Sets parameters relevant to the compression job, - * initializing others to default values. */ -static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(const ZSTD_CCtx_params* params) -{ - ZSTD_CCtx_params jobParams = *params; - /* Clear parameters related to multithreading */ - jobParams.forceWindow = 0; - jobParams.nbWorkers = 0; - jobParams.jobSize = 0; - jobParams.overlapLog = 0; - jobParams.rsyncable = 0; - ZSTD_memset(&jobParams.ldmParams, 0, sizeof(ldmParams_t)); - ZSTD_memset(&jobParams.customMem, 0, sizeof(ZSTD_customMem)); - return jobParams; -} - /* ZSTDMT_resize() : * @return : error code if fails, 0 on success */ @@ -1248,174 +1184,6 @@ static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params) return (ovLog==0) ? 0 : (size_t)1 << ovLog; } -static unsigned -ZSTDMT_computeNbJobs(const ZSTD_CCtx_params* params, size_t srcSize, unsigned nbWorkers) -{ - assert(nbWorkers>0); - { size_t const jobSizeTarget = (size_t)1 << ZSTDMT_computeTargetJobLog(params); - size_t const jobMaxSize = jobSizeTarget << 2; - size_t const passSizeMax = jobMaxSize * nbWorkers; - unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; - unsigned const nbJobsLarge = multiplier * nbWorkers; - unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1; - unsigned const nbJobsSmall = MIN(nbJobsMax, nbWorkers); - return (multiplier>1) ? nbJobsLarge : nbJobsSmall; -} } - -/* ZSTDMT_compress_advanced_internal() : - * This is a blocking function : it will only give back control to caller after finishing its compression job. - */ -static size_t -ZSTDMT_compress_advanced_internal( - ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - const ZSTD_CDict* cdict, - ZSTD_CCtx_params params) -{ - ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(¶ms); - size_t const overlapSize = ZSTDMT_computeOverlapSize(¶ms); - unsigned const nbJobs = ZSTDMT_computeNbJobs(¶ms, srcSize, params.nbWorkers); - size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; - size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */ - const char* const srcStart = (const char*)src; - size_t remainingSrcSize = srcSize; - unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */ - size_t frameStartPos = 0, dstBufferPos = 0; - assert(jobParams.nbWorkers == 0); - assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); - - params.jobSize = (U32)avgJobSize; - DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", - nbJobs, (U32)proposedJobSize, (U32)avgJobSize); - - if ((nbJobs==1) | (params.nbWorkers<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ - ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; - DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: fallback to single-thread mode"); - if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); - return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, &jobParams); - } - - assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ - ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); - /* LDM doesn't even try to load the dictionary in single-ingestion mode */ - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize, NULL, 0, ZSTD_dct_auto)) - return ERROR(memory_allocation); - - FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) , ""); /* only expands if necessary */ - - { unsigned u; - for (u=0; ujobs[u].prefix.start = srcStart + frameStartPos - dictSize; - mtctx->jobs[u].prefix.size = dictSize; - mtctx->jobs[u].src.start = srcStart + frameStartPos; - mtctx->jobs[u].src.size = jobSize; assert(jobSize > 0); /* avoid job.src.size == 0 */ - mtctx->jobs[u].consumed = 0; - mtctx->jobs[u].cSize = 0; - mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; - mtctx->jobs[u].fullFrameSize = srcSize; - mtctx->jobs[u].params = jobParams; - /* do not calculate checksum within sections, but write it in header for first section */ - mtctx->jobs[u].dstBuff = dstBuffer; - mtctx->jobs[u].cctxPool = mtctx->cctxPool; - mtctx->jobs[u].bufPool = mtctx->bufPool; - mtctx->jobs[u].seqPool = mtctx->seqPool; - mtctx->jobs[u].serial = &mtctx->serial; - mtctx->jobs[u].jobID = u; - mtctx->jobs[u].firstJob = (u==0); - mtctx->jobs[u].lastJob = (u==nbJobs-1); - - DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)jobSize); - DEBUG_PRINTHEX(6, mtctx->jobs[u].prefix.start, 12); - POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]); - - frameStartPos += jobSize; - dstBufferPos += dstBufferCapacity; - remainingSrcSize -= jobSize; - } } - - /* collect result */ - { size_t error = 0, dstPos = 0; - unsigned jobID; - for (jobID=0; jobIDjobs[jobID].job_mutex); - while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID); - ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); - } - ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); - DEBUGLOG(5, "ready to write job %u ", jobID); - - { size_t const cSize = mtctx->jobs[jobID].cSize; - if (ZSTD_isError(cSize)) error = cSize; - if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); - if (jobID) { /* note : job 0 is written directly at dst, which is correct position */ - if (!error) - ZSTD_memmove((char*)dst + dstPos, mtctx->jobs[jobID].dstBuff.start, cSize); /* may overlap when job compressed within dst */ - if (jobID >= compressWithinDst) { /* job compressed into its own buffer, which must be released */ - DEBUGLOG(5, "releasing buffer %u>=%u", jobID, compressWithinDst); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); - } } - mtctx->jobs[jobID].dstBuff = g_nullBuffer; - mtctx->jobs[jobID].cSize = 0; - dstPos += cSize ; - } - } /* for (jobID=0; jobIDserial.xxhState); - if (dstPos + 4 > dstCapacity) { - error = ERROR(dstSize_tooSmall); - } else { - DEBUGLOG(4, "writing checksum : %08X \n", checksum); - MEM_writeLE32((char*)dst + dstPos, checksum); - dstPos += 4; - } } - - if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); - return error ? error : dstPos; - } -} - -size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - const ZSTD_CDict* cdict, - ZSTD_parameters params, - int overlapLog) -{ - ZSTD_CCtx_params cctxParams = mtctx->params; - cctxParams.cParams = params.cParams; - cctxParams.fParams = params.fParams; - assert(ZSTD_OVERLAPLOG_MIN <= overlapLog && overlapLog <= ZSTD_OVERLAPLOG_MAX); - cctxParams.overlapLog = overlapLog; - return ZSTDMT_compress_advanced_internal(mtctx, - dst, dstCapacity, - src, srcSize, - cdict, cctxParams); -} - - -size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - int compressionLevel) -{ - ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); - int const overlapLog = ZSTDMT_overlapLog_default(params.cParams.strategy); - params.fParams.contentSizeFlag = 1; - return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapLog); -} - - /* ====================================== */ /* ======= Streaming API ======= */ /* ====================================== */ @@ -1528,53 +1296,6 @@ size_t ZSTDMT_initCStream_internal( return 0; } -size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, - const void* dict, size_t dictSize, - ZSTD_parameters params, - unsigned long long pledgedSrcSize) -{ - ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */ - DEBUGLOG(4, "ZSTDMT_initCStream_advanced (pledgedSrcSize=%u)", (U32)pledgedSrcSize); - cctxParams.cParams = params.cParams; - cctxParams.fParams = params.fParams; - return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, ZSTD_dct_auto, NULL, - cctxParams, pledgedSrcSize); -} - -size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, - const ZSTD_CDict* cdict, - ZSTD_frameParameters fParams, - unsigned long long pledgedSrcSize) -{ - ZSTD_CCtx_params cctxParams = mtctx->params; - if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */ - cctxParams.cParams = ZSTD_getCParamsFromCDict(cdict); - cctxParams.fParams = fParams; - return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, ZSTD_dct_auto, cdict, - cctxParams, pledgedSrcSize); -} - - -/* ZSTDMT_resetCStream() : - * pledgedSrcSize can be zero == unknown (for the time being) - * prefer using ZSTD_CONTENTSIZE_UNKNOWN, - * as `0` might mean "empty" in the future */ -size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) -{ - if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; - return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dct_auto, 0, mtctx->params, - pledgedSrcSize); -} - -size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { - ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0); - ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */ - DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel); - cctxParams.cParams = params.cParams; - cctxParams.fParams = params.fParams; - return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dct_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); -} - /* ZSTDMT_writeLastEmptyBlock() * Write a single empty block with an end-of-frame to finish a frame. @@ -2072,43 +1793,3 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, return remainingToFlush; } } - - -size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) -{ - FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) , ""); - - /* recommended next input size : fill current input buffer */ - return mtctx->targetSectionSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ -} - - -static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame) -{ - size_t const srcSize = mtctx->inBuff.filled; - DEBUGLOG(5, "ZSTDMT_flushStream_internal"); - - if ( mtctx->jobReady /* one job ready for a worker to pick up */ - || (srcSize > 0) /* still some data within input buffer */ - || ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */ - DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)", - (U32)srcSize, (U32)endFrame); - FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) , ""); - } - - /* check if there is any data available to flush */ - return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame); -} - - -size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) -{ - DEBUGLOG(5, "ZSTDMT_flushStream"); - return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_flush); -} - -size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) -{ - DEBUGLOG(4, "ZSTDMT_endStream"); - return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_end); -} diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 8aad78c4f46..0a9e551c99b 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -19,24 +19,12 @@ /* Note : This is an internal API. * These APIs used to be exposed with ZSTDLIB_API, * because it used to be the only way to invoke MT compression. - * Now, it's recommended to use ZSTD_compress2 and ZSTD_compressStream2() - * instead. - * - * If you depend on these APIs and can't switch, then define - * ZSTD_LEGACY_MULTITHREADED_API when making the dynamic library. - * However, we may completely remove these functions in a future - * release, so please switch soon. + * Now, you must use ZSTD_compress2 and ZSTD_compressStream2() instead. * * This API requires ZSTD_MULTITHREAD to be defined during compilation, * otherwise ZSTDMT_createCCtx*() will fail. */ -#ifdef ZSTD_LEGACY_MULTITHREADED_API -# define ZSTDMT_API ZSTDLIB_API -#else -# define ZSTDMT_API -#endif - /* === Dependencies === */ #include "../common/zstd_deps.h" /* size_t */ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */ @@ -54,79 +42,34 @@ #define ZSTDMT_JOBSIZE_MAX (MEM_32bits() ? (512 MB) : (1024 MB)) +/* ======================================================== + * === Private interface, for use by ZSTD_compress.c === + * === Not exposed in libzstd. Never invoke directly === + * ======================================================== */ + /* === Memory management === */ typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx; /* Requires ZSTD_MULTITHREAD to be defined during compilation, otherwise it will return NULL. */ -ZSTDMT_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers); -/* Requires ZSTD_MULTITHREAD to be defined during compilation, otherwise it will return NULL. */ -ZSTDMT_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, - ZSTD_customMem cMem, - ZSTD_threadPool *pool); -ZSTDMT_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx); - -ZSTDMT_API size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx); - - -/* === Simple one-pass compression function === */ - -ZSTDMT_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - int compressionLevel); - +ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, + ZSTD_customMem cMem, + ZSTD_threadPool *pool); +size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx); +size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx); /* === Streaming functions === */ -ZSTDMT_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); -ZSTDMT_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it will change in the future to mean "empty" */ - -ZSTDMT_API size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx); -ZSTDMT_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input); - -ZSTDMT_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ -ZSTDMT_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ - - -/* === Advanced functions and parameters === */ - -ZSTDMT_API size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - const ZSTD_CDict* cdict, - ZSTD_parameters params, - int overlapLog); - -ZSTDMT_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, - const void* dict, size_t dictSize, /* dict can be released after init, a local copy is preserved within zcs */ - ZSTD_parameters params, - unsigned long long pledgedSrcSize); /* pledgedSrcSize is optional and can be zero == unknown */ - -ZSTDMT_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, - const ZSTD_CDict* cdict, - ZSTD_frameParameters fparams, - unsigned long long pledgedSrcSize); /* note : zero means empty */ - -/* ZSTDMT_parameter : - * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ -typedef enum { - ZSTDMT_p_jobSize, /* Each job is compressed in parallel. By default, this value is dynamically determined depending on compression parameters. Can be set explicitly here. */ - ZSTDMT_p_overlapLog, /* Each job may reload a part of previous job to enhance compression ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */ - ZSTDMT_p_rsyncable /* Enables rsyncable mode. */ -} ZSTDMT_parameter; - -/* ZSTDMT_setMTCtxParameter() : - * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter. - * The function must be called typically after ZSTD_createCCtx() but __before ZSTDMT_init*() !__ - * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions. - * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ -ZSTDMT_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int value); - -/* ZSTDMT_getMTCtxParameter() : - * Query the ZSTDMT_CCtx for a parameter value. - * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ -ZSTDMT_API size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int* value); +size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx); +/*! ZSTDMT_initCStream_internal() : + * Private use only. Init streaming operation. + * expects params to be valid. + * must receive dict, or cdict, or none, but not both. + * @return : 0, or an error code */ +size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, + const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, + const ZSTD_CDict* cdict, + ZSTD_CCtx_params params, unsigned long long pledgedSrcSize); /*! ZSTDMT_compressStream_generic() : * Combines ZSTDMT_compressStream() with optional ZSTDMT_flushStream() or ZSTDMT_endStream() @@ -135,16 +78,10 @@ ZSTDMT_API size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter * 0 if fully flushed * or an error code * note : needs to be init using any ZSTD_initCStream*() variant */ -ZSTDMT_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, - ZSTD_outBuffer* output, - ZSTD_inBuffer* input, - ZSTD_EndDirective endOp); - - -/* ======================================================== - * === Private interface, for use by ZSTD_compress.c === - * === Not exposed in libzstd. Never invoke directly === - * ======================================================== */ +size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, + ZSTD_outBuffer* output, + ZSTD_inBuffer* input, + ZSTD_EndDirective endOp); /*! ZSTDMT_toFlushNow() * Tell how many bytes are ready to be flushed immediately. @@ -154,15 +91,6 @@ ZSTDMT_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, * therefore flushing is limited by speed of oldest job. */ size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx); -/*! ZSTDMT_CCtxParam_setMTCtxParameter() - * like ZSTDMT_setMTCtxParameter(), but into a ZSTD_CCtx_Params */ -size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, int value); - -/*! ZSTDMT_CCtxParam_setNbWorkers() - * Set nbWorkers, and clamp it. - * Also reset jobSize and overlapLog */ -size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers); - /*! ZSTDMT_updateCParams_whileCompressing() : * Updates only a selected set of compression parameters, to remain compatible with current frame. * New parameters will be applied to next compression job. */ @@ -175,17 +103,6 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx); -/*! ZSTDMT_initCStream_internal() : - * Private use only. Init streaming operation. - * expects params to be valid. - * must receive dict, or cdict, or none, but not both. - * @return : 0, or an error code */ -size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, - const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, - const ZSTD_CDict* cdict, - ZSTD_CCtx_params params, unsigned long long pledgedSrcSize); - - #if defined (__cplusplus) } #endif diff --git a/tests/Makefile b/tests/Makefile index d347a948a6c..1a01442e988 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -380,7 +380,6 @@ test-zbuff32: zbufftest32 test-zstream: zstreamtest $(QEMU_SYS) ./zstreamtest -v $(ZSTREAM_TESTTIME) $(FUZZER_FLAGS) - $(QEMU_SYS) ./zstreamtest --mt -t1 $(ZSTREAM_TESTTIME) $(FUZZER_FLAGS) $(QEMU_SYS) ./zstreamtest --newapi -t1 $(ZSTREAM_TESTTIME) $(FUZZER_FLAGS) test-zstream32: zstreamtest32 diff --git a/tests/fuzzer.c b/tests/fuzzer.c index 8b10078accb..9a7b50fd8c5 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -32,7 +32,6 @@ #include "fse.h" #include "zstd.h" /* ZSTD_VERSION_STRING */ #include "zstd_errors.h" /* ZSTD_getErrorCode */ -#include "zstdmt_compress.h" #define ZDICT_STATIC_LINKING_ONLY #include "zdict.h" /* ZDICT_trainFromBuffer */ #include "mem.h" @@ -1352,19 +1351,20 @@ static int basicUnitTests(U32 const seed, double compressibility) /* ZSTDMT simple MT compression test */ DISPLAYLEVEL(3, "test%3i : create ZSTDMT CCtx : ", testNb++); - { ZSTDMT_CCtx* const mtctx = ZSTDMT_createCCtx(2); + { ZSTD_CCtx* const mtctx = ZSTD_createCCtx(); if (mtctx==NULL) { DISPLAY("mtctx : not enough memory, aborting \n"); testResult = 1; goto _end; } + CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_nbWorkers, 2) ); + CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_compressionLevel, 1) ); DISPLAYLEVEL(3, "OK \n"); DISPLAYLEVEL(3, "test%3u : compress %u bytes with 2 threads : ", testNb++, (unsigned)CNBuffSize); - CHECK_VAR(cSize, ZSTDMT_compressCCtx(mtctx, + CHECK_VAR(cSize, ZSTD_compress2(mtctx, compressedBuffer, compressedBufferSize, - CNBuffer, CNBuffSize, - 1) ); + CNBuffer, CNBuffSize) ); DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (unsigned)cSize, (double)cSize/CNBuffSize*100); DISPLAYLEVEL(3, "test%3i : decompressed size test : ", testNb++); @@ -1388,14 +1388,12 @@ static int basicUnitTests(U32 const seed, double compressibility) DISPLAYLEVEL(3, "OK \n"); DISPLAYLEVEL(3, "test%3i : compress -T2 with checksum : ", testNb++); - { ZSTD_parameters params = ZSTD_getParams(1, CNBuffSize, 0); - params.fParams.checksumFlag = 1; - params.fParams.contentSizeFlag = 1; - CHECK_VAR(cSize, ZSTDMT_compress_advanced(mtctx, - compressedBuffer, compressedBufferSize, - CNBuffer, CNBuffSize, - NULL, params, 3 /*overlapRLog*/) ); - } + CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_checksumFlag, 1) ); + CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_contentSizeFlag, 1) ); + CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_overlapLog, 3) ); + CHECK_VAR(cSize, ZSTD_compress2(mtctx, + compressedBuffer, compressedBufferSize, + CNBuffer, CNBuffSize) ); DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (unsigned)cSize, (double)cSize/CNBuffSize*100); DISPLAYLEVEL(3, "test%3i : decompress %u bytes : ", testNb++, (unsigned)CNBuffSize); @@ -1403,7 +1401,7 @@ static int basicUnitTests(U32 const seed, double compressibility) if (r != CNBuffSize) goto _output_error; } DISPLAYLEVEL(3, "OK \n"); - ZSTDMT_freeCCtx(mtctx); + ZSTD_freeCCtx(mtctx); } DISPLAYLEVEL(3, "test%3u : compress empty string and decompress with small window log : ", testNb++); diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 79d5a8281bb..d3136096da9 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -31,7 +31,6 @@ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_maxCLevel, ZSTD_customMem, ZSTD_getDictID_fromFrame */ #include "zstd.h" /* ZSTD_compressBound */ #include "zstd_errors.h" /* ZSTD_error_srcSize_wrong */ -#include "zstdmt_compress.h" #include "zdict.h" /* ZDICT_trainFromBuffer */ #include "datagen.h" /* RDG_genBuffer */ #define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */ @@ -274,7 +273,7 @@ static int basicUnitTests(U32 seed, double compressibility) U32 coreSeed = 0; /* this name to conform with CHECK_Z macro display */ ZSTD_CStream* zc = ZSTD_createCStream(); ZSTD_DStream* zd = ZSTD_createDStream(); - ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2); + ZSTD_CCtx* mtctx = ZSTD_createCCtx(); ZSTD_inBuffer inBuff, inBuff2; ZSTD_outBuffer outBuff; @@ -283,12 +282,14 @@ static int basicUnitTests(U32 seed, double compressibility) unsigned dictID = 0; /* Create compressible test buffer */ - if (!CNBuffer || !compressedBuffer || !decodedBuffer || !zc || !zd) { + if (!CNBuffer || !compressedBuffer || !decodedBuffer || !zc || !zd || !mtctx) { DISPLAY("Not enough memory, aborting \n"); goto _output_error; } RDG_genBuffer(CNBuffer, CNBufferSize, compressibility, 0., seed); + CHECK_Z(ZSTD_CCtx_setParameter(mtctx, ZSTD_c_nbWorkers, 2)); + /* Create dictionary */ DISPLAYLEVEL(3, "creating dictionary for unit tests \n"); dictionary = FUZ_createDictionary(CNBuffer, CNBufferSize / 3, 16 KB, 48 KB); @@ -1144,12 +1145,10 @@ static int basicUnitTests(U32 seed, double compressibility) /* Basic multithreading compression test */ DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads : ", testNb++, COMPRESSIBLE_NOISE_LENGTH); - { ZSTD_parameters const params = ZSTD_getParams(1, 0, 0); - int jobSize; - CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize)); + { int jobSize; + CHECK_Z( ZSTD_CCtx_getParameter(mtctx, ZSTD_c_jobSize, &jobSize)); CHECK(jobSize != 0, "job size non-zero"); - CHECK_Z( ZSTDMT_initCStream_advanced(mtctx, CNBuffer, dictSize, params, CNBufferSize) ); - CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize)); + CHECK_Z( ZSTD_CCtx_getParameter(mtctx, ZSTD_c_jobSize, &jobSize)); CHECK(jobSize != 0, "job size non-zero"); } outBuff.dst = compressedBuffer; @@ -1158,7 +1157,7 @@ static int basicUnitTests(U32 seed, double compressibility) inBuff.src = CNBuffer; inBuff.size = CNBufferSize; inBuff.pos = 0; - { size_t const compressResult = ZSTDMT_compressStream_generic(mtctx, &outBuff, &inBuff, ZSTD_e_end); + { size_t const compressResult = ZSTD_compressStream2(mtctx, &outBuff, &inBuff, ZSTD_e_end); if (compressResult != 0) goto _output_error; /* compression must be completed in a single round */ } if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */ @@ -1516,7 +1515,7 @@ static int basicUnitTests(U32 seed, double compressibility) FUZ_freeDictionary(dictionary); ZSTD_freeCStream(zc); ZSTD_freeDStream(zd); - ZSTDMT_freeCCtx(mtctx); + ZSTD_freeCCtx(mtctx); free(CNBuffer); free(compressedBuffer); free(decodedBuffer); @@ -1826,283 +1825,6 @@ static int fuzzerTests(U32 seed, unsigned nbTests, unsigned startTest, double co goto _cleanup; } - -/* fuzzing ZSTDMT_* interface */ -static int fuzzerTests_MT(U32 seed, int nbTests, int startTest, - double compressibility, int bigTests) -{ - const U32 maxSrcLog = bigTests ? 24 : 22; - static const U32 maxSampleLog = 19; - size_t const srcBufferSize = (size_t)1<= testNb) { - DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); - } else { - DISPLAYUPDATE(2, "\r%6u ", testNb); - } - lseed = coreSeed ^ prime32; - - /* states full reset (deliberately not synchronized) */ - /* some issues can only happen when reusing states */ - if ((FUZ_rand(&lseed) & 0xFF) == 131) { - nbThreads = (FUZ_rand(&lseed) % nbThreadsMax) + 1; - DISPLAYLEVEL(5, "Creating new context with %u threads \n", nbThreads); - ZSTDMT_freeCCtx(zc); - zc = ZSTDMT_createCCtx(nbThreads); - CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error") - } - if ((FUZ_rand(&lseed) & 0xFF) == 132) { - ZSTD_freeDStream(zd); - zd = ZSTD_createDStream(); - CHECK(zd==NULL, "ZSTDMT_createCCtx allocation error") - ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */ - } - - /* srcBuffer selection [0-4] */ - { U32 buffNb = FUZ_rand(&lseed) & 0x7F; - if (buffNb & 7) buffNb=2; /* most common : compressible (P) */ - else { - buffNb >>= 3; - if (buffNb & 7) { - const U32 tnb[2] = { 1, 3 }; /* barely/highly compressible */ - buffNb = tnb[buffNb >> 3]; - } else { - const U32 tnb[2] = { 0, 4 }; /* not compressible / sparse */ - buffNb = tnb[buffNb >> 3]; - } } - srcBuffer = cNoiseBuffer[buffNb]; - } - - /* compression init */ - { U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; - U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog; - int const cLevelCandidate = ( FUZ_rand(&lseed) - % (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) ) - + 1; - int const cLevelThreadAdjusted = cLevelCandidate - (nbThreads * 2) + 2; /* reduce cLevel when multiple threads to reduce memory consumption */ - int const cLevelMin = MAX(cLevelThreadAdjusted, 1); /* no negative cLevel yet */ - int const cLevel = MIN(cLevelMin, cLevelMax); - maxTestSize = FUZ_rLogLength(&lseed, testLog); - - if (FUZ_rand(&lseed)&1) { /* simple init */ - int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1; - DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel); - CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) ); - } else { /* advanced init */ - /* random dictionary selection */ - dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0; - { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); - dict = srcBuffer + dictStart; - } - { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; - ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); - DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n", - params.cParams.windowLog, (unsigned)pledgedSrcSize, (unsigned)dictSize); - params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; - params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; - params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1; - DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag); - CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapLog, FUZ_rand(&lseed) % 12) ); - CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custom job size */ - CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) ); - } } } - - /* multi-segments compression test */ - XXH64_reset(&xxhState, 0); - { ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ; - U32 n; - for (n=0, cSize=0, totalTestSize=0 ; totalTestSize < maxTestSize ; n++) { - /* compress random chunks into randomly sized dst buffers */ - { size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog); - size_t const srcSize = MIN (maxTestSize-totalTestSize, randomSrcSize); - size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize); - size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); - size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize); - ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 }; - outBuff.size = outBuff.pos + dstBuffSize; - - DISPLAYLEVEL(6, "Sending %u bytes to compress \n", (unsigned)srcSize); - CHECK_Z( ZSTDMT_compressStream(zc, &outBuff, &inBuff) ); - DISPLAYLEVEL(6, "%u bytes read by ZSTDMT_compressStream \n", (unsigned)inBuff.pos); - - XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos); - memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos); - totalTestSize += inBuff.pos; - } - - /* random flush operation, to mess around */ - if ((FUZ_rand(&lseed) & 15) == 0) { - size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); - size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); - size_t const previousPos = outBuff.pos; - outBuff.size = outBuff.pos + adjustedDstSize; - DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (unsigned)adjustedDstSize); - CHECK_Z( ZSTDMT_flushStream(zc, &outBuff) ); - assert(outBuff.pos >= previousPos); - DISPLAYLEVEL(6, "%u bytes flushed by ZSTDMT_flushStream \n", (unsigned)(outBuff.pos-previousPos)); - } } - - /* final frame epilogue */ - { size_t remainingToFlush = (size_t)(-1); - while (remainingToFlush) { - size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); - size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); - size_t const previousPos = outBuff.pos; - outBuff.size = outBuff.pos + adjustedDstSize; - DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (unsigned)adjustedDstSize); - remainingToFlush = ZSTDMT_endStream(zc, &outBuff); - CHECK (ZSTD_isError(remainingToFlush), "ZSTDMT_endStream error : %s", ZSTD_getErrorName(remainingToFlush)); - assert(outBuff.pos >= previousPos); - DISPLAYLEVEL(6, "%u bytes flushed by ZSTDMT_endStream \n", (unsigned)(outBuff.pos-previousPos)); - DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (unsigned)remainingToFlush); - } } - crcOrig = XXH64_digest(&xxhState); - cSize = outBuff.pos; - DISPLAYLEVEL(5, "Frame completed : %u bytes compressed into %u bytes \n", - (unsigned)totalTestSize, (unsigned)cSize); - } - - /* multi - fragments decompression test */ - assert(totalTestSize < dstBufferSize); - memset(dstBuffer, 170, totalTestSize); /* init dest area */ - if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) { - CHECK_Z( ZSTD_resetDStream(zd) ); - } else { - CHECK_Z( ZSTD_initDStream_usingDict(zd, dict, dictSize) ); - } - { size_t decompressionResult = 1; - ZSTD_inBuffer inBuff = { cBuffer, cSize, 0 }; - ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 }; - for (totalGenSize = 0 ; decompressionResult ; ) { - size_t const readCSrcSize = FUZ_randomLength(&lseed, maxSampleLog); - size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); - size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); - inBuff.size = inBuff.pos + readCSrcSize; - outBuff.size = outBuff.pos + dstBuffSize; - DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes into outBuff %u bytes \n", - (unsigned)readCSrcSize, (unsigned)dstBuffSize); - decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); - if (ZSTD_isError(decompressionResult)) { - DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult)); - findDiff(copyBuffer, dstBuffer, totalTestSize); - } - CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); - DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u and produced (outBuff.pos) = %u \n", - (unsigned)inBuff.pos, (unsigned)outBuff.pos); - } - CHECK (outBuff.pos != totalTestSize, - "decompressed data : wrong size (%u != %u)", - (unsigned)outBuff.pos, (unsigned)totalTestSize ); - CHECK (inBuff.pos != cSize, - "compressed data should be fully read (%u != %u)", - (unsigned)inBuff.pos, (unsigned)cSize ); - { U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0); - if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize); - CHECK (crcDest!=crcOrig, "decompressed data corrupted"); - } } - - /*===== noisy/erroneous src decompression test =====*/ - - /* add some noise */ - { U32 const nbNoiseChunks = (FUZ_rand(&lseed) & 7) + 2; - U32 nn; for (nn=0; nn Date: Thu, 1 Oct 2020 15:02:15 -0700 Subject: [PATCH 4/7] [zstreamtest] Add compression determinism tests * Run compression twice and check the compressed data is byte-identical. The compression loop had to be rewritten to ensure deteriminism. It is guaranteed by always making maximal forward progress. * When nbWorkers > 0, change the number of workers 1/8 of the time. * Run in single-pass mode 1/4 of the time. I've run a few hundred thousand iterations of zstreamtest and have seen no deteriminism issues so far. Before the zstdmt fix that skips the single-pass shortcut non-determinism showed up in a few hundred iterations. --- tests/zstreamtest.c | 132 +++++++++++++++++++++++++++++++------------- 1 file changed, 94 insertions(+), 38 deletions(-) diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index d3136096da9..e4b9f0dec31 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -1901,6 +1901,8 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest, U32 resetAllowed = 1; size_t maxTestSize; ZSTD_parameters savedParams; + int isRefPrefix = 0; + U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; /* init */ if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); } @@ -1971,8 +1973,8 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest, dict = srcBuffer + dictStart; if (!dictSize) dict=NULL; } - { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; - ZSTD_compressionParameters cParams = ZSTD_getCParams(cLevel, pledgedSrcSize, dictSize); + pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; + { ZSTD_compressionParameters cParams = ZSTD_getCParams(cLevel, pledgedSrcSize, dictSize); const U32 windowLogMax = bigTests ? 24 : 20; const U32 searchLogMax = bigTests ? 15 : 13; if (dictSize) @@ -2028,6 +2030,8 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest, if (FUZ_rand(&lseed) & 1) { DISPLAYLEVEL(5, "t%u: pledgedSrcSize : %u \n", testNb, (unsigned)pledgedSrcSize); CHECK_Z( ZSTD_CCtx_setPledgedSrcSize(zc, pledgedSrcSize) ); + } else { + pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; } /* multi-threading parameters. Only adjust occasionally for small tests. */ @@ -2061,6 +2065,7 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest, CHECK_Z( ZSTD_CCtx_loadDictionary_byReference(zc, dict, dictSize) ); } } else { + isRefPrefix = 1; CHECK_Z( ZSTD_CCtx_refPrefix(zc, dict, dictSize) ); } } } @@ -2068,45 +2073,96 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest, CHECK_Z(getCCtxParams(zc, &savedParams)); /* multi-segments compression test */ - XXH64_reset(&xxhState, 0); - { ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ; - for (cSize=0, totalTestSize=0 ; (totalTestSize < maxTestSize) ; ) { - /* compress random chunks into randomly sized dst buffers */ - size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog); - size_t const srcSize = MIN(maxTestSize-totalTestSize, randomSrcSize); - size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize); - size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1); - size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize); - ZSTD_EndDirective const flush = (FUZ_rand(&lseed) & 15) ? ZSTD_e_continue : ZSTD_e_flush; - ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 }; - outBuff.size = outBuff.pos + dstBuffSize; + { int iter; + int const startSeed = lseed; + XXH64_hash_t compressedCrcs[2]; + for (iter = 0; iter < 2; ++iter, lseed = startSeed) { + ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ; + int const singlePass = (FUZ_rand(&lseed) & 3) == 0; + int nbWorkers; + + XXH64_reset(&xxhState, 0); + + CHECK_Z( ZSTD_CCtx_setPledgedSrcSize(zc, pledgedSrcSize) ); + if (isRefPrefix) { + DISPLAYLEVEL(6, "t%u: Reloading prefix\n", testNb); + /* Need to reload the prefix because it gets dropped after one compression */ + CHECK_Z( ZSTD_CCtx_refPrefix(zc, dict, dictSize) ); + } - CHECK_Z( ZSTD_compressStream2(zc, &outBuff, &inBuff, flush) ); - DISPLAYLEVEL(6, "t%u: compress consumed %u bytes (total : %u) ; flush: %u (total : %u) \n", - testNb, (unsigned)inBuff.pos, (unsigned)(totalTestSize + inBuff.pos), (unsigned)flush, (unsigned)outBuff.pos); + /* Adjust number of workers occassionally - result must be deterministic independent of nbWorkers */ + CHECK_Z(ZSTD_CCtx_getParameter(zc, ZSTD_c_nbWorkers, &nbWorkers)); + if (nbWorkers > 0 && (FUZ_rand(&lseed) & 7) == 0) { + DISPLAYLEVEL(6, "t%u: Modify nbWorkers: %d -> %d \n", testNb, nbWorkers, nbWorkers + iter); + CHECK_Z(ZSTD_CCtx_setParameter(zc, ZSTD_c_nbWorkers, nbWorkers + iter)); + } - XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos); - memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos); - totalTestSize += inBuff.pos; - } + if (singlePass) { + ZSTD_inBuffer inBuff = { srcBuffer, maxTestSize, 0 }; + CHECK_Z(ZSTD_compressStream2(zc, &outBuff, &inBuff, ZSTD_e_end)); + DISPLAYLEVEL(6, "t%u: Single pass compression: consumed %u bytes ; produced %u bytes \n", + testNb, (unsigned)inBuff.pos, (unsigned)outBuff.pos); + CHECK(inBuff.pos != inBuff.size, "Input not consumed!"); + crcOrig = XXH64(srcBuffer, maxTestSize, 0); + totalTestSize = maxTestSize; + } else { + outBuff.size = 0; + for (totalTestSize=0 ; (totalTestSize < maxTestSize) ; ) { + /* compress random chunks into randomly sized dst buffers */ + size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog); + size_t const srcSize = MIN(maxTestSize-totalTestSize, randomSrcSize); + size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize); + ZSTD_EndDirective const flush = (FUZ_rand(&lseed) & 15) ? ZSTD_e_continue : ZSTD_e_flush; + ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 }; + int forwardProgress; + do { + size_t const ipos = inBuff.pos; + size_t const opos = outBuff.pos; + size_t ret; + if (outBuff.pos == outBuff.size) { + size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1); + size_t const dstBuffSize = MIN(cBufferSize - outBuff.pos, randomDstSize); + outBuff.size = outBuff.pos + dstBuffSize; + } + CHECK_Z( ret = ZSTD_compressStream2(zc, &outBuff, &inBuff, flush) ); + DISPLAYLEVEL(6, "t%u: compress consumed %u bytes (total : %u) ; flush: %u (total : %u) \n", + testNb, (unsigned)inBuff.pos, (unsigned)(totalTestSize + inBuff.pos), (unsigned)flush, (unsigned)outBuff.pos); + + /* We've completed the flush */ + if (flush == ZSTD_e_flush && ret == 0) + break; + + /* Ensure maximal forward progress for determinism */ + forwardProgress = (inBuff.pos != ipos) || (outBuff.pos != opos); + } while (forwardProgress); + + XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos); + memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos); + totalTestSize += inBuff.pos; + } - /* final frame epilogue */ - { size_t remainingToFlush = 1; - while (remainingToFlush) { - ZSTD_inBuffer inBuff = { NULL, 0, 0 }; - size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1); - size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); - outBuff.size = outBuff.pos + adjustedDstSize; - DISPLAYLEVEL(6, "t%u: End-flush into dst buffer of size %u \n", testNb, (unsigned)adjustedDstSize); - remainingToFlush = ZSTD_compressStream2(zc, &outBuff, &inBuff, ZSTD_e_end); - DISPLAYLEVEL(6, "t%u: Total flushed so far : %u bytes \n", testNb, (unsigned)outBuff.pos); - CHECK( ZSTD_isError(remainingToFlush), - "ZSTD_compressStream2 w/ ZSTD_e_end error : %s", - ZSTD_getErrorName(remainingToFlush) ); - } } - crcOrig = XXH64_digest(&xxhState); - cSize = outBuff.pos; - DISPLAYLEVEL(5, "Frame completed : %zu bytes \n", cSize); + /* final frame epilogue */ + { size_t remainingToFlush = 1; + while (remainingToFlush) { + ZSTD_inBuffer inBuff = { NULL, 0, 0 }; + size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1); + size_t const adjustedDstSize = MIN(cBufferSize - outBuff.pos, randomDstSize); + outBuff.size = outBuff.pos + adjustedDstSize; + DISPLAYLEVEL(6, "t%u: End-flush into dst buffer of size %u \n", testNb, (unsigned)adjustedDstSize); + /* ZSTD_e_end guarantees maximal forward progress */ + remainingToFlush = ZSTD_compressStream2(zc, &outBuff, &inBuff, ZSTD_e_end); + DISPLAYLEVEL(6, "t%u: Total flushed so far : %u bytes \n", testNb, (unsigned)outBuff.pos); + CHECK( ZSTD_isError(remainingToFlush), + "ZSTD_compressStream2 w/ ZSTD_e_end error : %s", + ZSTD_getErrorName(remainingToFlush) ); + } } + crcOrig = XXH64_digest(&xxhState); + } + cSize = outBuff.pos; + compressedCrcs[iter] = XXH64(cBuffer, cSize, 0); + DISPLAYLEVEL(5, "Frame completed : %zu bytes \n", cSize); + } + CHECK(!(compressedCrcs[0] == compressedCrcs[1]), "Compression is not deterministic!"); } CHECK(badParameters(zc, savedParams), "CCtx params are wrong"); From ede4f97153324bf640cdc9020eed1a85edd18ad4 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Mon, 5 Oct 2020 17:37:19 -0700 Subject: [PATCH 5/7] [zstdmt] Fix bug where extra empty blocks are emitted When zstdmt cannot get a buffer and `ZSTD_e_end` is passed an empty compression job can be created. Additionally, `mtctx->frameEnded` can be set to 1, which could potentially cause problems like unterminated blocks. The fix is to adjust to `ZSTD_e_flush` even when we can't get a buffer. --- lib/compress/zstdmt_compress.c | 12 ++++++++++-- tests/zstreamtest.c | 6 +++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 6f3e0622976..98c4ce0bbb1 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1773,8 +1773,16 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, mtctx->inBuff.filled += syncPoint.toLoad; forwardInputProgress = syncPoint.toLoad>0; } - if ((input->pos < input->size) && (endOp == ZSTD_e_end)) - endOp = ZSTD_e_flush; /* can't end now : not all input consumed */ + } + if ((input->pos < input->size) && (endOp == ZSTD_e_end)) { + /* Can't end yet because the input is not fully consumed. + * We are in one of these cases: + * - Input buffer is NULL & empty + * - We filled the input buffer + * - We hit a synchronization point + */ + assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable); + endOp = ZSTD_e_flush; } if ( (mtctx->jobReady) diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index e4b9f0dec31..13c1530cd80 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -2048,7 +2048,11 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest, } } /* Enable rsyncable mode 1 in 4 times. */ - setCCtxParameter(zc, cctxParams, ZSTD_c_rsyncable, (FUZ_rand(&lseed) % 4 == 0), opaqueAPI); + { + int const rsyncable = (FUZ_rand(&lseed) % 4 == 0); + DISPLAYLEVEL(5, "t%u: rsyncable : %d \n", testNb, rsyncable); + setCCtxParameter(zc, cctxParams, ZSTD_c_rsyncable, rsyncable, opaqueAPI); + } if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_c_forceMaxWindow, FUZ_rand(&lseed) & 1, opaqueAPI) ); From efff5d8b2df02e2eed5decea1b321faef6b14931 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Mon, 5 Oct 2020 19:14:19 -0700 Subject: [PATCH 6/7] [zstdmt] Fix determinism issue with rsyncable mode The problem occurs in this scenario: 1. We find a synchronization point. 2. We attmept to create the job. 3. We fail because the job table is full: `mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask`. 4. We call `ZSTDMT_compressStream_generic` again. 5. We forget that we're at a sync point already, and we continue looking for the next sync point. This fix is to detect if we're currently paused at a sync point, and if we are then don't load any more input. Caught by zstreamtest. I modified it to make the bug occur more often (~1/100K -> ~1/200) and verified that it is fixed after. I then ran a few hundred thousand unmodified zstreamtest iterations to verify. --- lib/compress/zstdmt_compress.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 98c4ce0bbb1..eeb805b3582 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1687,6 +1687,16 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) pos = 0; prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH; hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH); + if ((hash & hitMask) == hitMask) { + /* We're already at a sync point so don't load any more until + * we're able to flush this sync point. + * This likely happened because the job table was full so we + * couldn't add our job. + */ + syncPoint.toLoad = 0; + syncPoint.flush = 1; + return syncPoint; + } } else { /* We don't have enough bytes buffered to initialize the hash, but * we know we have at least RSYNC_LENGTH bytes total. From 441ce4178ff138fc827e7d49a216677fe7ac9ca8 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Mon, 12 Oct 2020 12:58:13 -0700 Subject: [PATCH 7/7] [zstdmt] Clarify a comment --- lib/compress/zstdmt_compress.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index eeb805b3582..b7be2d13b9a 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1787,9 +1787,9 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, if ((input->pos < input->size) && (endOp == ZSTD_e_end)) { /* Can't end yet because the input is not fully consumed. * We are in one of these cases: - * - Input buffer is NULL & empty - * - We filled the input buffer - * - We hit a synchronization point + * - mtctx->inBuff is NULL & empty: we couldn't get an input buffer so don't create a new job. + * - We filled the input buffer: flush this job but don't end the frame. + * - We hit a synchronization point: flush this job but don't end the frame. */ assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable); endOp = ZSTD_e_flush;