Skip to content

Commit

Permalink
[WIP] SuperBlock fix
Browse files Browse the repository at this point in the history
  • Loading branch information
terrelln committed Jan 10, 2020
1 parent d1cc9d2 commit 7d0bd17
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 86 deletions.
87 changes: 40 additions & 47 deletions lib/compress/zstd_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
* Note that the result from this function is only compatible with the "normal"
* full-block strategy.
* When there are a lot of small blocks due to frequent flush in streaming mode
* or targetCBlockSize, the overhead of headers can make the compressed data to
* be larger than the return value of ZSTD_compressBound().
* the overhead of headers can make the compressed data to be larger than the
* return value of ZSTD_compressBound().
*/
size_t ZSTD_compressBound(size_t srcSize) {
return ZSTD_COMPRESSBOUND(srcSize);
Expand Down Expand Up @@ -2385,6 +2385,13 @@ static int ZSTD_isRLE(const BYTE *ip, size_t length) {
return 1;
}

static void ZSTD_confirmRepcodesAndEntropyTables(ZSTD_CCtx* zc)
{
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
}

static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize, U32 frame)
Expand Down Expand Up @@ -2435,10 +2442,7 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,

out:
if (!ZSTD_isError(cSize) && cSize > 1) {
/* confirm repcodes and entropy tables when emitting a compressed block */
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
ZSTD_confirmRepcodesAndEntropyTables(zc);
}
/* We check that dictionaries have offset codes available for the first
* block. After the first block, the offcode table might not have large
Expand All @@ -2450,57 +2454,45 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
return cSize;
}

static void ZSTD_confirmRepcodesAndEntropyTables(ZSTD_CCtx* zc)
{
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
}

static size_t ZSTD_compressBlock_targetCBlockSize_body(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const size_t bss, U32 lastBlock)
{
DEBUGLOG(6, "Attempting ZSTD_compressSuperBlock()");
/* Attempt superblock compression and return early if successful */
if (bss == ZSTDbss_compress) {
/* Attempt superblock compression.
*
* Note that compressed size of ZSTD_compressSuperBlock() is not bound by the
* standard ZSTD_compressBound(). This is a problem, because even if we have
* space now, taking an extra byte now could cause us to run out of space later
* and violate ZSTD_compressBound().
*
* Define blockBound(blockSize) = blockSize + ZSTD_blockHeaderSize.
*
* In order to respect ZSTD_compressBound() we must attempt to emit a raw
* uncompressed block in these cases:
* * cSize == 0: Return code for an uncompressed block.
* * cSize == dstSize_tooSmall: We may have expanded beyond blockBound(srcSize).
* ZSTD_noCompressBlock() will return dstSize_tooSmall if we are really out of
* output space.
* * cSize >= blockBound(srcSize): We have expanded the block too much so
* emit an uncompressed block.
*/
size_t const cSize = ZSTD_compressSuperBlock(zc, dst, dstCapacity, lastBlock);
FORWARD_IF_ERROR(cSize);
if (cSize != 0) {
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize;
}
}

DEBUGLOG(6, "Attempting ZSTD_noCompressSuperBlock()");
/* Superblock compression failed, attempt to emit noCompress superblocks
* and return early if that is successful and we have enough room for checksum */
{
size_t const cSize = ZSTD_noCompressSuperBlock(dst, dstCapacity, src, srcSize, zc->appliedParams.targetCBlockSize, lastBlock);
if (cSize != ERROR(dstSize_tooSmall) && (dstCapacity - cSize) >= 4)
return cSize;
}

DEBUGLOG(6, "Attempting ZSTD_compressSequences() on superblock");
/* noCompress superblock emission failed. Attempt to compress normally
* and return early if that is successful */
{
size_t const cSize = ZSTD_compressSequences(&zc->seqStore,
&zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy,
&zc->appliedParams, (BYTE*)dst+ZSTD_blockHeaderSize, dstCapacity-ZSTD_blockHeaderSize,
srcSize, zc->entropyWorkspace, HUF_WORKSPACE_SIZE, zc->bmi2);
FORWARD_IF_ERROR(cSize);
if (cSize != 0) {
U32 const cBlockHeader24 = lastBlock + (((U32)bt_compressed)<<1) + (U32)(cSize << 3);
MEM_writeLE24((BYTE*)dst, cBlockHeader24);
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize + ZSTD_blockHeaderSize;
if (cSize != ERROR(dstSize_tooSmall)) {
FORWARD_IF_ERROR(cSize);
if (cSize != 0 && cSize < srcSize + ZSTD_blockHeaderSize) {
ZSTD_confirmRepcodesAndEntropyTables(zc);
return cSize;
}
}
}

DEBUGLOG(6, "Resorting to ZSTD_noCompressBlock() on superblock");
/* Everything failed. Just emit a regular noCompress block */
DEBUGLOG(6, "Resorting to ZSTD_noCompressBlock()");
/* Superblock compression failed, attempt to emit a single no compress block.
* The decoder will be able to stream this block since it is uncompressed.
*/
return ZSTD_noCompressBlock(dst, dstCapacity, src, srcSize, lastBlock);
}

Expand Down Expand Up @@ -2593,6 +2585,8 @@ static size_t ZSTD_compress_frameChunk (ZSTD_CCtx* cctx,
if (ZSTD_useTargetCBlockSize(&cctx->appliedParams)) {
cSize = ZSTD_compressBlock_targetCBlockSize(cctx, op, dstCapacity, ip, blockSize, lastBlock);
FORWARD_IF_ERROR(cSize);
assert(cSize > 0);
assert(cSize <= blockSize + ZSTD_blockHeaderSize);
} else {
cSize = ZSTD_compressBlock_internal(cctx,
op+ZSTD_blockHeaderSize, dstCapacity-ZSTD_blockHeaderSize,
Expand Down Expand Up @@ -3796,7 +3790,6 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,

case zcss_load:
if ( (flushMode == ZSTD_e_end)
&& !ZSTD_useTargetCBlockSize(&zcs->appliedParams)
&& ((size_t)(oend-op) >= ZSTD_compressBound(iend-ip)) /* enough dstCapacity */
&& (zcs->inBuffPos == 0) ) {
/* shortcut to compression pass directly into output buffer */
Expand Down
26 changes: 1 addition & 25 deletions lib/compress/zstd_compress_superblock.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ static size_t ZSTD_compressSubBlock_multi(const seqStore_t* seqStorePtr,
/* I think there is an optimization opportunity here.
* Calling ZSTD_estimateSubBlockSize for every sequence can be wasteful
* since it recalculates estimate from scratch.
* For example, it would recount literal distribution and symbol codes everytime.
* For example, it would recount literal distribution and symbol codes everytime.
*/
cBlockSizeEstimate = ZSTD_estimateSubBlockSize(lp, litSize, ofCodePtr, llCodePtr, mlCodePtr, seqCount,
entropy, entropyMetadata,
Expand Down Expand Up @@ -716,27 +716,3 @@ size_t ZSTD_compressSuperBlock(ZSTD_CCtx* zc,
zc->bmi2, lastBlock,
zc->entropyWorkspace, HUF_WORKSPACE_SIZE /* statically allocated in resetCCtx */);
}

size_t ZSTD_noCompressSuperBlock(void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
size_t targetCBlockSize,
unsigned lastBlock) {
const BYTE* const istart = (const BYTE*)src;
const BYTE* const iend = istart + srcSize;
const BYTE* ip = istart;
BYTE* const ostart = (BYTE*)dst;
BYTE* const oend = ostart + dstCapacity;
BYTE* op = ostart;
DEBUGLOG(5, "ZSTD_noCompressSuperBlock (dstCapacity=%zu, srcSize=%zu, targetCBlockSize=%zu)",
dstCapacity, srcSize, targetCBlockSize);
while (ip < iend) {
size_t remaining = iend-ip;
unsigned lastSubBlock = remaining <= targetCBlockSize;
size_t blockSize = lastSubBlock ? remaining : targetCBlockSize;
size_t cSize = ZSTD_noCompressBlock(op, oend-op, ip, blockSize, lastSubBlock && lastBlock);
FORWARD_IF_ERROR(cSize);
ip += blockSize;
op += cSize;
}
return op-ostart;
}
10 changes: 0 additions & 10 deletions lib/compress/zstd_compress_superblock.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,4 @@ size_t ZSTD_compressSuperBlock(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
unsigned lastBlock);

/* ZSTD_noCompressSuperBlock() :
* Used to break a super block into multiple uncompressed sub blocks
* when targetCBlockSize is being used.
* The given block will be broken into multiple uncompressed sub blocks that are
* around targetCBlockSize. */
size_t ZSTD_noCompressSuperBlock(void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
size_t targetCBlockSize,
unsigned lastBlock);

#endif /* ZSTD_COMPRESS_ADVANCED_H */
39 changes: 35 additions & 4 deletions lib/decompress/zstd_decompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,24 @@ size_t ZSTD_decompress(void* dst, size_t dstCapacity, const void* src, size_t sr
****************************************/
size_t ZSTD_nextSrcSizeToDecompress(ZSTD_DCtx* dctx) { return dctx->expected; }

/**
* Similar to ZSTD_nextSrcSizeToDecompress(), but when when a block input can be streamed,
* we allow taking a partial block as the input. Currently only raw uncompressed blocks can
* be streamed.
*
* For blocks that can be streamed, this allows us to reduce the latency until we produce
* output, and avoid copying the input.
*
* @param inputSize - The total amount of input that the caller currently has.
*/
static size_t ZSTD_nextSrcSizeToDecompressWithInputSize(ZSTD_DCtx* dctx, size_t inputSize) {
if (!(dctx->stage == ZSTDds_decompressBlock || dctx->stage == ZSTDds_decompressLastBlock))
return dctx->expected;
if (dctx->bType != bt_raw)
return dctx->expected;
return MIN(MAX(inputSize, 1), dctx->expected);
}

ZSTD_nextInputType_e ZSTD_nextInputType(ZSTD_DCtx* dctx) {
switch(dctx->stage)
{
Expand Down Expand Up @@ -877,7 +895,7 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
{
DEBUGLOG(5, "ZSTD_decompressContinue (srcSize:%u)", (unsigned)srcSize);
/* Sanity check */
RETURN_ERROR_IF(srcSize != dctx->expected, srcSize_wrong, "not allowed");
RETURN_ERROR_IF(srcSize != ZSTD_nextSrcSizeToDecompressWithInputSize(dctx, srcSize), srcSize_wrong, "not allowed");
if (dstCapacity) ZSTD_checkContinuity(dctx, dst);

switch (dctx->stage)
Expand Down Expand Up @@ -944,22 +962,34 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
case bt_compressed:
DEBUGLOG(5, "ZSTD_decompressContinue: case bt_compressed");
rSize = ZSTD_decompressBlock_internal(dctx, dst, dstCapacity, src, srcSize, /* frame */ 1);
dctx->expected = 0; /* Streaming not supported */
break;
case bt_raw :
assert(srcSize <= dctx->expected);
rSize = ZSTD_copyRawBlock(dst, dstCapacity, src, srcSize);
FORWARD_IF_ERROR(rSize);
assert(rSize == srcSize);
dctx->expected -= rSize;
break;
case bt_rle :
rSize = ZSTD_setRleBlock(dst, dstCapacity, *(const BYTE*)src, dctx->rleSize);
dctx->expected = 0; /* Streaming not supported */
break;
case bt_reserved : /* should never happen */
default:
RETURN_ERROR(corruption_detected);
}
if (ZSTD_isError(rSize)) return rSize;
FORWARD_IF_ERROR(rSize);
RETURN_ERROR_IF(rSize > dctx->fParams.blockSizeMax, corruption_detected, "Decompressed Block Size Exceeds Maximum");
DEBUGLOG(5, "ZSTD_decompressContinue: decoded size from block : %u", (unsigned)rSize);
dctx->decodedSize += rSize;
if (dctx->fParams.checksumFlag) XXH64_update(&dctx->xxhState, dst, rSize);
dctx->previousDstEnd = (char*)dst + rSize;

/* Stay on the same stage until we are finished streaming the block. */
if (dctx->expected > 0) {
return rSize;
}

if (dctx->stage == ZSTDds_decompressLastBlock) { /* end of frame */
DEBUGLOG(4, "ZSTD_decompressContinue: decoded size from frame : %u", (unsigned)dctx->decodedSize);
Expand All @@ -977,7 +1007,6 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
} else {
dctx->stage = ZSTDds_decodeBlockHeader;
dctx->expected = ZSTD_blockHeaderSize;
dctx->previousDstEnd = (char*)dst + rSize;
}
return rSize;
}
Expand Down Expand Up @@ -1645,7 +1674,7 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB

case zdss_read:
DEBUGLOG(5, "stage zdss_read");
{ size_t const neededInSize = ZSTD_nextSrcSizeToDecompress(zds);
{ size_t const neededInSize = ZSTD_nextSrcSizeToDecompressWithInputSize(zds, iend - ip);
DEBUGLOG(5, "neededInSize = %u", (U32)neededInSize);
if (neededInSize==0) { /* end of frame */
zds->streamStage = zdss_init;
Expand Down Expand Up @@ -1673,6 +1702,8 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB
size_t const toLoad = neededInSize - zds->inPos;
int const isSkipFrame = ZSTD_isSkipFrame(zds);
size_t loadedSize;
/* At this point we shouldn't be decompressing a block that we can stream. */
assert(neededInSize == ZSTD_nextSrcSizeToDecompressWithInputSize(zds, iend - ip));
if (isSkipFrame) {
loadedSize = MIN(toLoad, (size_t)(iend-ip));
} else {
Expand Down
38 changes: 38 additions & 0 deletions tests/zstreamtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,44 @@ static int basicUnitTests(U32 seed, double compressibility)
}
DISPLAYLEVEL(3, "OK \n");

DISPLAYLEVEL(3, "test%3i : raw block can be streamed: ", testNb++);
{ size_t const inputSize = 10000;
size_t const compCapacity = ZSTD_compressBound(inputSize);
BYTE* const input = (BYTE*)malloc(inputSize);
BYTE* const comp = (BYTE*)malloc(compCapacity);
BYTE* const decomp = (BYTE*)malloc(inputSize);

CHECK(input == NULL || comp == NULL || decomp == NULL, "failed to alloc buffers");

RDG_genBuffer(input, inputSize, 0.0, 0.0, seed);
{ size_t const compSize = ZSTD_compress(comp, compCapacity, input, inputSize, -(int)inputSize);
ZSTD_inBuffer in = { comp, 0, 0 };
ZSTD_outBuffer out = { decomp, 0, 0 };
CHECK_Z(compSize);
while (in.size < compSize) {
in.size = MIN(in.size + 100, compSize);
while (in.pos < in.size) {
size_t const outPos = out.pos;
if (out.pos == out.size) {
out.size = MIN(out.size + 10, inputSize);
}
CHECK_Z( ZSTD_decompressStream(zd, &out, &in) );
CHECK(!(out.pos > outPos), "We are not streaming (no output generated)");
}
}
CHECK(in.pos != in.size, "Not all input consumedX %zu %zu %zu", in.pos, in.size, compSize);
CHECK(out.pos != out.size, "Not all output producedX");
CHECK(in.pos != compSize, "Not all input consumed!");
CHECK(out.pos != inputSize, "Not all output produced!");
}
CHECK(memcmp(input, decomp, inputSize), "round trip failed!");

free(input);
free(comp);
free(decomp);
}
DISPLAYLEVEL(3, "OK \n");

DISPLAYLEVEL(3, "test%3i : dictionary + uncompressible block + reusing tables checks offset table validity: ", testNb++);
{ ZSTD_CDict* const cdict = ZSTD_createCDict_advanced(
dictionary.start, dictionary.filled,
Expand Down

0 comments on commit 7d0bd17

Please sign in to comment.