Skip to content

Commit

Permalink
Replace zstandard with cramjam
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Nov 19, 2023
1 parent f35aaa6 commit b9c8415
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 21 deletions.
31 changes: 14 additions & 17 deletions aiokafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@
except ImportError:
cramjam = None

try:
import zstandard as zstd
except ImportError:
zstd = None

try:
import lz4.frame as lz4

Expand Down Expand Up @@ -48,7 +43,7 @@ def has_snappy():


def has_zstd():
return zstd is not None
return cramjam is not None


def has_lz4():
Expand Down Expand Up @@ -193,7 +188,7 @@ def snappy_decode(payload):
out.seek(0)
return out.read()
else:
return cramjam.snappy.decompress_raw(payload)
return bytes(cramjam.snappy.decompress_raw(payload))


if lz4:
Expand Down Expand Up @@ -230,18 +225,20 @@ def lz4f_decode(payload):
lz4_decode = None


def zstd_encode(payload):
if not zstd:
def zstd_encode(payload, level=None):
if not has_zstd():
raise NotImplementedError("Zstd codec is not available")
return zstd.ZstdCompressor().compress(payload)

if level is None:
# Default for kafka broker
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level
level = 3

return bytes(cramjam.zstd.compress(payload, level=level))


def zstd_decode(payload):
if not zstd:
if not has_zstd():
raise NotImplementedError("Zstd codec is not available")
try:
return zstd.ZstdDecompressor().decompress(payload)
except zstd.ZstdError:
return zstd.ZstdDecompressor().decompress(
payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE
)

return bytes(cramjam.zstd.decompress(payload))
1 change: 0 additions & 1 deletion requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,4 @@ docutils==0.17.1
Pygments==2.15.0
gssapi==1.8.2
async-timeout==4.0.1
zstandard==0.16.0
cramjam==2.7.0
1 change: 0 additions & 1 deletion requirements-win-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ pytest-mock==3.12.0
docker==6.0.1
chardet==4.0.0 # Until fixed requests is released
lz4==3.1.3
zstandard==0.16.0
cramjam==2.7.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def read(f):
extras_require = {
"snappy": ["cramjam"],
"lz4": ["lz4>=3.1.3"],
"zstd": ["zstandard"],
"zstd": ["cramjam"],
"gssapi": ["gssapi"],
}
extras_require["all"] = sum(extras_require.values(), [])
Expand Down
2 changes: 1 addition & 1 deletion tests/record/test_default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
(DefaultRecordBatch.CODEC_GZIP, None),
(DefaultRecordBatch.CODEC_SNAPPY, 2171068483),
(DefaultRecordBatch.CODEC_LZ4, 462121143),
(DefaultRecordBatch.CODEC_ZSTD, 1679657554),
(DefaultRecordBatch.CODEC_ZSTD, 1714138923),
])
def test_read_write_serde_v2(compression_type, crc):
builder = DefaultRecordBatchBuilder(
Expand Down

0 comments on commit b9c8415

Please sign in to comment.