Skip to content

Commit

Permalink
decompressionobj: refactor decompress()
Browse files Browse the repository at this point in the history
The loop termination logic and handling of `unused_data` was a bit
convoluted. The logic was subtle enough that it warrants documenting,
which I did in the CFFI backend.

We now only set `unused_data` after a frame is decoded. I'm not sure
if this actually changed meaningful behavior. But that is the semantic
intent of the attribute and we should have the logic mirror that.

We also change a loop termination condition to avoid a
`Zstd_decompressStream()` on input input in the case of a partially
filled output buffer. I believe this is safe per the zstd API docs.

One change here is we now append chunks before testing `zresult == 0`.
If chunk append raises, this could change behavior so we no longer mark
the decompressor as finalized. I believe the behavior was sufficiently
undefined to not explicitly document in the changelog. Hopefully
Hyrum's Law doesn't manifest.
  • Loading branch information
indygreg committed Oct 29, 2022
1 parent 30bf0bf commit a1deff5
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 26 deletions.
18 changes: 10 additions & 8 deletions c-ext/decompressobj.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,12 @@ static PyObject *DecompressionObj_decompress(ZstdDecompressionObj *self,
ZSTD_decompressStream(self->decompressor->dctx, &output, &input);
Py_END_ALLOW_THREADS

if (ZSTD_isError(zresult)) {
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "zstd decompressor error: %s",
ZSTD_getErrorName(zresult));
goto except;
}

if (0 == zresult) {
self->finished = 1;
}

if (output.pos) {
if (result) {
resultSize = PyBytes_GET_SIZE(result);
Expand All @@ -93,15 +89,21 @@ static PyObject *DecompressionObj_decompress(ZstdDecompressionObj *self,
}
}

if (zresult == 0 || (input.pos == input.size && output.pos == 0)) {
if (0 == zresult) {
self->finished = 1;

/* We should only get here at most once. */
assert(!self->unused_data);
self->unused_data = PyBytes_FromStringAndSize((char *)(input.src) + input.pos, input.size - input.pos);

break;
}

output.pos = 0;
else if (input.pos == input.size && output.pos == 0) {
break;
}
else {
output.pos = 0;
}
}

if (!result) {
Expand Down
4 changes: 4 additions & 0 deletions docs/news.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ Changes
and previous release(s) likely worked with 3.11 without any changes.
* CFFI's build system now respects distutils's ``compiler.preprocessor`` if it
is set. (#179)
* The internal logic of ``ZstdDecompressionObj.decompress()`` was refactored.
This may have fixed unconfirmed issues where ``unused_data`` was set
prematurely. The new logic will also avoid an extra call to
``ZSTD_decompressStream()`` in some scenarios, possibly improving performance.

0.18.0 (released 2022-06-20)
============================
Expand Down
17 changes: 9 additions & 8 deletions rust-ext/src/decompressionobj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,28 @@ impl ZstdDecompressionObj {
.decompress_into_vec(&mut dest_buffer, &mut in_buffer)
.map_err(|msg| ZstdError::new_err(format!("zstd decompress error: {}", msg)))?;

if zresult == 0 {
self.finished = true;
// TODO clear out decompressor?
}

if !dest_buffer.is_empty() {
// TODO avoid buffer copy.
let chunk = PyBytes::new(py, &dest_buffer);
chunks.append(chunk)?;
}

if zresult == 0 || (in_buffer.pos == in_buffer.size && dest_buffer.is_empty()) {
if zresult == 0 {
self.finished = true;
// TODO clear out decompressor?

if let Some(data) = data.as_slice(py) {
let unused = &data[in_buffer.pos..in_buffer.size];
self.unused_data = unused.iter().map(|x| x.get()).collect::<Vec<_>>();
}

break;
} else if in_buffer.pos == in_buffer.size && dest_buffer.len() < dest_buffer.capacity()
{
break;
} else {
dest_buffer.clear();
}

dest_buffer.clear();
}

let empty = PyBytes::new(py, &[]);
Expand Down
27 changes: 17 additions & 10 deletions zstandard/backend_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2974,22 +2974,29 @@ def decompress(self, data):
"zstd decompressor error: %s" % _zstd_error(zresult)
)

if zresult == 0:
self._finished = True
self._decompressor = None

# Always record any output from decompressor.
if out_buffer.pos:
chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])

if zresult == 0 or (
in_buffer.pos == in_buffer.size and out_buffer.pos == 0
):
# Preserve any remaining input to be exposed via `unused_data`.
# 0 is only seen when a frame is fully decoded *and* fully flushed.
# But there may be extra input data: make that available to
# `unused_input`.
if zresult == 0:
self._finished = True
self._decompressor = None
self._unused_input = data[in_buffer.pos : in_buffer.size]

break

out_buffer.pos = 0
# We're not at the end of the frame *or* we're not fully flushed.

# The decompressor will write out all the bytes it can to the output
# buffer. So if the output buffer is partially filled and the input
# is exhausted, there's nothing more to write. So we've done all we
# can.
elif in_buffer.pos == in_buffer.size and out_buffer.pos < out_buffer.size:
break
else:
out_buffer.pos = 0

return b"".join(chunks)

Expand Down

0 comments on commit a1deff5

Please sign in to comment.