diff --git a/.github/workflows/Downstream.yml b/.github/workflows/Downstream.yml index 880220c..adbf164 100644 --- a/.github/workflows/Downstream.yml +++ b/.github/workflows/Downstream.yml @@ -31,6 +31,7 @@ jobs: Pkg.Registry.update() Pkg.activate(;temp=true) # force it to use this PR's version of the package + ENV["JULIA_PKG_DEVDIR"]= mktempdir() Pkg.develop([ PackageSpec(path="."), PackageSpec(name="${{ matrix.package }}"), diff --git a/.github/workflows/Upstream.yml b/.github/workflows/Upstream.yml new file mode 100644 index 0000000..137ec54 --- /dev/null +++ b/.github/workflows/Upstream.yml @@ -0,0 +1,36 @@ +name: Upstream +on: + push: + branches: [master] + tags: [v*] + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: julia-actions/setup-julia@v2 + with: + version: 1 + arch: x64 + - uses: julia-actions/julia-buildpkg@latest + - name: Load the upstream packages and run the tests + shell: julia --color=yes {0} + run: | + using Pkg + Pkg.Registry.update() + Pkg.activate(;temp=true) + # force it to use this PR's version of the package + ENV["JULIA_PKG_DEVDIR"]= mktempdir() + Pkg.develop([ + PackageSpec(path="."), + PackageSpec(name="TranscodingStreams"), + ]) + Pkg.update() + Pkg.test("CodecZstd"; coverage=true) + - uses: julia-actions/julia-processcoverage@v1 + - uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: lcov.info diff --git a/Project.toml b/Project.toml index fa3a41a..15efc4f 100644 --- a/Project.toml +++ b/Project.toml @@ -10,6 +10,6 @@ TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" Zstd_jll = "3161d3a3-bdf6-5164-811a-617609db77b4" [compat] -TranscodingStreams = "0.9, 0.10, 0.11" +TranscodingStreams = "0.11.3" Zstd_jll = "1.5.5" -julia = "1.3" +julia = "1.6" diff --git a/src/compression.jl b/src/compression.jl index fc9361d..7476650 100644 --- a/src/compression.jl +++ b/src/compression.jl @@ -101,17 +101,31 @@ function TranscodingStreams.startproc(codec::ZstdCompressor, mode::Symbol, error return :error end end - code = reset!(codec.cstream, 0 #=unknown source size=#) + reset!(codec.cstream) + return :ok +end + +function TranscodingStreams.pledgeinsize(codec::ZstdCompressor, insize::Int64, error::Error)::Symbol + if codec.cstream.ptr == C_NULL + Base.error("`startproc` must be called before `pledgeinsize`") + end + srcsize = if signbit(insize) + ZSTD_CONTENTSIZE_UNKNOWN + else + Culonglong(insize) + end + code = LibZstd.ZSTD_CCtx_setPledgedSrcSize(codec.cstream, srcsize) if iserror(code) - error[] = ErrorException("zstd error") - return :error + error[] = ErrorException("zstd error setting pledged source size") + :error + else + :ok end - return :ok end function TranscodingStreams.process(codec::ZstdCompressor, input::Memory, output::Memory, error::Error) if codec.cstream.ptr == C_NULL - error("startproc must be called before process") + Base.error("`startproc` must be called before `process`") end cstream = codec.cstream ibuffer_starting_pos = UInt(0) diff --git a/src/libzstd.jl b/src/libzstd.jl index f9865b6..dbbbb4b 100644 --- a/src/libzstd.jl +++ b/src/libzstd.jl @@ -56,23 +56,15 @@ function initialize!(cstream::CStream, level::Integer) return LibZstd.ZSTD_initCStream(cstream, level) end -function reset!(cstream::CStream, srcsize::Integer) - # ZSTD_resetCStream is deprecated - # https://github.com/facebook/zstd/blob/9d2a45a705e22ad4817b41442949cd0f78597154/lib/zstd.h#L2253-L2272 +function reset!(cstream::CStream) res = LibZstd.ZSTD_CCtx_reset(cstream, LibZstd.ZSTD_reset_session_only) - if iserror(res) - return res - end - if srcsize == 0 - # From zstd.h: - # Note: ZSTD_resetCStream() interprets pledgedSrcSize == 0 as ZSTD_CONTENTSIZE_UNKNOWN, but - # ZSTD_CCtx_setPledgedSrcSize() does not do the same, so ZSTD_CONTENTSIZE_UNKNOWN must be - # explicitly specified. - srcsize = ZSTD_CONTENTSIZE_UNKNOWN - end reset!(cstream.ibuffer) reset!(cstream.obuffer) - return LibZstd.ZSTD_CCtx_setPledgedSrcSize(cstream, srcsize) + if iserror(res) + # According to zstd.h "Resetting session never fails" so this branch should be unreachable. + error("unreachable") + end + return end """ diff --git a/test/runtests.jl b/test/runtests.jl index 73cf82f..56b8d5a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -130,9 +130,17 @@ include("utils.jl") @test CodecZstd.find_decompressed_size(v) == 22 codec = ZstdCompressor - buffer3 = transcode(codec, b"Hello") - buffer4 = transcode(codec, b"World!") + sink = IOBuffer() + s = TranscodingStream(codec(), sink; stop_on_end=true) + write(s, b"Hello") + close(s) + buffer3 = take!(sink) @test CodecZstd.find_decompressed_size(buffer3) == CodecZstd.ZSTD_CONTENTSIZE_UNKNOWN + sink = IOBuffer() + s = TranscodingStream(codec(), sink; stop_on_end=true) + write(s, b"Hello") + close(s) + buffer4 = take!(sink) @test CodecZstd.find_decompressed_size(buffer4) == CodecZstd.ZSTD_CONTENTSIZE_UNKNOWN write(iob, buffer1) @@ -156,6 +164,66 @@ include("utils.jl") @test CodecZstd.find_decompressed_size(v) == CodecZstd.ZSTD_CONTENTSIZE_ERROR end + @testset "pledgeinsize" begin + # when pledgeinsize is available transcode should save the + # decompressed size in a header + for n in [0:30; 1000; 1000000;] + v = transcode(ZstdCompressor, rand(UInt8, n)) + @test CodecZstd.find_decompressed_size(v) == n + end + + # Test what happens if pledgeinsize promise is broken + d1 = zeros(UInt8, 10000) + d2 = zeros(UInt8, 10000) + GC.@preserve d1 d2 begin + @testset "too many bytes" begin + m1 = TranscodingStreams.Memory(pointer(d1), 1000) + m2 = TranscodingStreams.Memory(pointer(d2), 1000) + codec = ZstdCompressor() + e = TranscodingStreams.Error() + @test TranscodingStreams.startproc(codec, :read, e) === :ok + @test TranscodingStreams.pledgeinsize(codec, Int64(10), e) === :ok + @test TranscodingStreams.process(codec, m1, m2, e) === (0, 0, :error) + @test e[] == ErrorException("zstd error: Src size is incorrect") + TranscodingStreams.finalize(codec) + end + @testset "too few bytes" begin + m1 = TranscodingStreams.Memory(pointer(d1), 10) + m2 = TranscodingStreams.Memory(pointer(d2), 1000) + codec = ZstdCompressor() + e = TranscodingStreams.Error() + @test TranscodingStreams.startproc(codec, :read, e) === :ok + @test TranscodingStreams.pledgeinsize(codec, Int64(10000), e) === :ok + @test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok + m1 = TranscodingStreams.Memory(pointer(d1), 0) + @test TranscodingStreams.process(codec, m1, m2, e)[3] === :error + @test e[] == ErrorException("zstd error: Src size is incorrect") + TranscodingStreams.finalize(codec) + end + @testset "set pledgeinsize after process" begin + m1 = TranscodingStreams.Memory(pointer(d1), 1000) + m2 = TranscodingStreams.Memory(pointer(d2), 1000) + codec = ZstdCompressor() + e = TranscodingStreams.Error() + @test TranscodingStreams.startproc(codec, :read, e) === :ok + @test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok + @test TranscodingStreams.pledgeinsize(codec, Int64(10000), e) === :error + @test e[] == ErrorException("zstd error setting pledged source size") + TranscodingStreams.finalize(codec) + end + @testset "set unknown pledgeinsize" begin + m1 = TranscodingStreams.Memory(pointer(d1), 1000) + m2 = TranscodingStreams.Memory(pointer(d2), 1000) + codec = ZstdCompressor() + e = TranscodingStreams.Error() + @test TranscodingStreams.startproc(codec, :read, e) === :ok + @test TranscodingStreams.pledgeinsize(codec, Int64(-1), e) === :ok + @test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok + TranscodingStreams.finalize(codec) + end + end + end + include("compress_endOp.jl") include("static_only_tests.jl") @@ -195,6 +263,10 @@ include("utils.jl") TranscodingStreams.finalize(codec) data = [0x00,0x01] GC.@preserve data let m = TranscodingStreams.Memory(pointer(data), length(data)) + try + TranscodingStreams.pledgeinsize(codec, Int64(10), TranscodingStreams.Error()) + catch + end try TranscodingStreams.expectedsize(codec, m) catch