Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pledgeinsize #64

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/Downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
Pkg.Registry.update()
Pkg.activate(;temp=true)
# force it to use this PR's version of the package
ENV["JULIA_PKG_DEVDIR"]= mktempdir()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures fresh versions of the developed packages, in case this is run locally.

Pkg.develop([
PackageSpec(path="."),
PackageSpec(name="${{ matrix.package }}"),
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/Upstream.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Upstream
on:
push:
branches: [master]
tags: [v*]
pull_request:

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: julia-actions/setup-julia@v2
with:
version: 1
arch: x64
- uses: julia-actions/julia-buildpkg@latest
- name: Load the upstream packages and run the tests
shell: julia --color=yes {0}
run: |
using Pkg
Pkg.Registry.update()
Pkg.activate(;temp=true)
# force it to use this PR's version of the package
ENV["JULIA_PKG_DEVDIR"]= mktempdir()
Pkg.develop([
PackageSpec(path="."),
PackageSpec(name="TranscodingStreams"),
])
Pkg.update()
Pkg.test("CodecZstd"; coverage=true)
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: lcov.info
31 changes: 26 additions & 5 deletions src/compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,38 @@
return :error
end
end
code = reset!(codec.cstream, 0 #=unknown source size=#)
code = reset!(codec.cstream)
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
if iserror(code)
error[] = ErrorException("zstd error")
return :error
error[] = ErrorException("zstd error resetting compression context")
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
:error

Check warning on line 107 in src/compression.jl

View check run for this annotation

Codecov / codecov/patch

src/compression.jl#L106-L107

Added lines #L106 - L107 were not covered by tests
else
:ok
end
end

if isdefined(TranscodingStreams, :pledgeinsize)
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
function TranscodingStreams.pledgeinsize(codec::ZstdCompressor, insize::Int64, error::Error)::Symbol
if codec.cstream.ptr == C_NULL
Base.error("`startproc` must be called before `pledgeinsize`")
end
srcsize = if signbit(insize)
ZSTD_CONTENTSIZE_UNKNOWN
else
Culonglong(insize)
end
code = LibZstd.ZSTD_CCtx_setPledgedSrcSize(codec.cstream, srcsize)
if iserror(code)
error[] = ErrorException("zstd error setting pledged source size")
:error
else
:ok
end
end
return :ok
end

function TranscodingStreams.process(codec::ZstdCompressor, input::Memory, output::Memory, error::Error)
if codec.cstream.ptr == C_NULL
error("startproc must be called before process")
Base.error("`startproc` must be called before `process`")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error is the name of a local variable, so I need to specify Base.error here.

end
cstream = codec.cstream
ibuffer_starting_pos = UInt(0)
Expand Down
16 changes: 2 additions & 14 deletions src/libzstd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,11 @@ function initialize!(cstream::CStream, level::Integer)
return LibZstd.ZSTD_initCStream(cstream, level)
end

function reset!(cstream::CStream, srcsize::Integer)
# ZSTD_resetCStream is deprecated
# https://github.com/facebook/zstd/blob/9d2a45a705e22ad4817b41442949cd0f78597154/lib/zstd.h#L2253-L2272
function reset!(cstream::CStream)
res = LibZstd.ZSTD_CCtx_reset(cstream, LibZstd.ZSTD_reset_session_only)
if iserror(res)
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
return res
end
if srcsize == 0
# From zstd.h:
# Note: ZSTD_resetCStream() interprets pledgedSrcSize == 0 as ZSTD_CONTENTSIZE_UNKNOWN, but
# ZSTD_CCtx_setPledgedSrcSize() does not do the same, so ZSTD_CONTENTSIZE_UNKNOWN must be
# explicitly specified.
srcsize = ZSTD_CONTENTSIZE_UNKNOWN
end
reset!(cstream.ibuffer)
reset!(cstream.obuffer)
return LibZstd.ZSTD_CCtx_setPledgedSrcSize(cstream, srcsize)
return res
end

"""
Expand Down
80 changes: 78 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,17 @@ include("utils.jl")
@test CodecZstd.find_decompressed_size(v) == 22

codec = ZstdCompressor
buffer3 = transcode(codec, b"Hello")
buffer4 = transcode(codec, b"World!")
sink = IOBuffer()
s = TranscodingStream(codec(), sink; stop_on_end=true)
write(s, b"Hello")
close(s)
buffer3 = take!(sink)
@test CodecZstd.find_decompressed_size(buffer3) == CodecZstd.ZSTD_CONTENTSIZE_UNKNOWN
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transcode with ZstdCompressor now records the decompressed size.

sink = IOBuffer()
s = TranscodingStream(codec(), sink; stop_on_end=true)
write(s, b"Hello")
close(s)
buffer4 = take!(sink)
@test CodecZstd.find_decompressed_size(buffer4) == CodecZstd.ZSTD_CONTENTSIZE_UNKNOWN

write(iob, buffer1)
Expand All @@ -156,6 +164,68 @@ include("utils.jl")
@test CodecZstd.find_decompressed_size(v) == CodecZstd.ZSTD_CONTENTSIZE_ERROR
end

@testset "pledgeinsize" begin
if isdefined(TranscodingStreams, :pledgeinsize)
# when pledgeinsize is available transcode should save the
# decompressed size in a header
for n in [0:30; 1000; 1000000;]
v = transcode(ZstdCompressor, rand(UInt8, n))
@test CodecZstd.find_decompressed_size(v) == n
end

# Test what happens if pledgeinsize promise is broken
d1 = zeros(UInt8, 10000)
d2 = zeros(UInt8, 10000)
GC.@preserve d1 d2 begin
@testset "too many bytes" begin
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
codec = ZstdCompressor()
e = TranscodingStreams.Error()
@test TranscodingStreams.startproc(codec, :read, e) === :ok
@test TranscodingStreams.pledgeinsize(codec, 10, e) === :ok
@test TranscodingStreams.process(codec, m1, m2, e) === (0, 0, :error)
@test e[] == ErrorException("zstd error: Src size is incorrect")
TranscodingStreams.finalize(codec)
end
@testset "too few bytes" begin
m1 = TranscodingStreams.Memory(pointer(d1), 10)
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
codec = ZstdCompressor()
e = TranscodingStreams.Error()
@test TranscodingStreams.startproc(codec, :read, e) === :ok
@test TranscodingStreams.pledgeinsize(codec, 10000, e) === :ok
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
m1 = TranscodingStreams.Memory(pointer(d1), 0)
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :error
@test e[] == ErrorException("zstd error: Src size is incorrect")
TranscodingStreams.finalize(codec)
end
@testset "set pledgeinsize after process" begin
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
codec = ZstdCompressor()
e = TranscodingStreams.Error()
@test TranscodingStreams.startproc(codec, :read, e) === :ok
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
@test TranscodingStreams.pledgeinsize(codec, 10000, e) === :error
@test e[] == ErrorException("zstd error setting pledged source size")
TranscodingStreams.finalize(codec)
end
@testset "set unknown pledgeinsize" begin
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
codec = ZstdCompressor()
e = TranscodingStreams.Error()
@test TranscodingStreams.startproc(codec, :read, e) === :ok
@test TranscodingStreams.pledgeinsize(codec, -1, e) === :ok
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
TranscodingStreams.finalize(codec)
end
end
end
end

include("compress_endOp.jl")
include("static_only_tests.jl")

Expand Down Expand Up @@ -195,6 +265,12 @@ include("utils.jl")
TranscodingStreams.finalize(codec)
data = [0x00,0x01]
GC.@preserve data let m = TranscodingStreams.Memory(pointer(data), length(data))
if isdefined(TranscodingStreams, :pledgeinsize)
try
TranscodingStreams.pledgeinsize(codec, 10, TranscodingStreams.Error())
catch
end
end
try
TranscodingStreams.expectedsize(codec, m)
catch
Expand Down
Loading