From 7988463c9369b16c91d8012c9eec1387ff383783 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerberg <39104088+nhz2@users.noreply.github.com> Date: Fri, 31 May 2024 10:23:12 -0400 Subject: [PATCH 1/6] Bump version to 0.10.9 (#220) --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index e02dee0e..d4ddee32 100644 --- a/Project.toml +++ b/Project.toml @@ -2,7 +2,7 @@ name = "TranscodingStreams" uuid = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" license = "MIT" authors = ["Kenta Sato "] -version = "0.10.8" +version = "0.10.9" [deps] Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" From d92fd8b9fb0b9314d82b40a96774f7f745b4dab1 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerberg <39104088+nhz2@users.noreply.github.com> Date: Sat, 8 Jun 2024 23:36:11 +0200 Subject: [PATCH 2/6] Avoid double counting in `stats` `out` when writing to nested streams (#204) --- docs/src/devnotes.md | 7 ++- fuzz/fuzz.jl | 20 +++++++- src/noop.jl | 61 ++++++++++++----------- src/state.jl | 5 +- src/stream.jl | 101 +++++++++++++++++++++++---------------- test/codecdoubleframe.jl | 83 ++++++++++++++++++++++++++++++++ test/codecnoop.jl | 73 ++++++++++++++++++++++------ 7 files changed, 259 insertions(+), 91 deletions(-) diff --git a/docs/src/devnotes.md b/docs/src/devnotes.md index 5fd968c8..ccae4124 100644 --- a/docs/src/devnotes.md +++ b/docs/src/devnotes.md @@ -41,6 +41,7 @@ default buffer size is 16KiB for each. - `error`: exception returned by the codec (`<:Error`) - `buffer1`: data buffer that is closer to the user (`<:Buffer`) - `buffer2`: data buffer that is farther to the user (`<:Buffer`) +- `bytes_written_out`: number of bytes written to the underlying stream (`<:Int64`) The `mode` field may be one of the following value: - `:idle` : initial and intermediate mode, no buffered data @@ -78,8 +79,10 @@ Shared buffers Adjacent transcoding streams may share their buffers. This will reduce memory allocation and eliminate data copy between buffers. -`readdata!(input::IO, output::Buffer)` and `writedata!(output::IO, -input::Buffer)` do the actual work of read/write data from/to the underlying +If `buffer2` is shared it is considered to be owned by the underlying stream +by the `stats` and `position` functions. + +`readdata!(input::IO, output::Buffer)` and `flush_buffer2(stream::TranscodingStream)` do the actual work of read/write data from/to the underlying stream. These methods have a special pass for shared buffers. diff --git a/fuzz/fuzz.jl b/fuzz/fuzz.jl index ef36dcc5..8710e0ee 100644 --- a/fuzz/fuzz.jl +++ b/fuzz/fuzz.jl @@ -152,7 +152,25 @@ end ) stream = wrap_stream(kws, IOBuffer()) for i in 1:length(data) + position(stream) == i-1 || return false + if stream isa TranscodingStream + s = TranscodingStreams.stats(stream) + s.in == i-1 || return false + # TODO fix position(stream.stream) + # s.out == position(stream.stream) || return false + # s.transcoded_in == s.out || return false + # s.transcoded_out == s.out || return false + end write(stream, data[i]) == 1 || return false end - take_all(stream) == data + take_all(stream) == data || return false + if stream isa TranscodingStream + s = TranscodingStreams.stats(stream) + s.in == length(data) || return false + # TODO fix position(stream.stream) + # s.out == position(stream.stream) || return false + # s.transcoded_in == s.out || return false + # s.transcoded_out == s.out || return false + end + true end \ No newline at end of file diff --git a/src/noop.jl b/src/noop.jl index 8b64d91b..cbdcf097 100644 --- a/src/noop.jl +++ b/src/noop.jl @@ -72,7 +72,7 @@ end function Base.seek(stream::NoopStream, pos::Integer) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seek(stream.stream, pos) initbuffer!(stream.buffer1) @@ -82,7 +82,7 @@ end function Base.seekstart(stream::NoopStream) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seekstart(stream.stream) initbuffer!(stream.buffer1) @@ -92,7 +92,7 @@ end function Base.seekend(stream::NoopStream) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seekend(stream.stream) initbuffer!(stream.buffer1) @@ -103,19 +103,23 @@ function Base.write(stream::NoopStream, b::UInt8)::Int changemode!(stream, :write) if has_sharedbuf(stream) # directly write data to the underlying stream - n = Int(write(stream.stream, b)) - return n + write(stream.stream, b) + stream.state.bytes_written_out += 1 + else + buffer1 = stream.buffer1 + marginsize(buffer1) > 0 || flush_buffer2(stream) + writebyte!(buffer1, b) end - buffer1 = stream.buffer1 - marginsize(buffer1) > 0 || flushbuffer(stream) - return writebyte!(buffer1, b) + return 1 end function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int changemode!(stream, :write) + Int(nbytes) # Error if nbytes > typemax Int if has_sharedbuf(stream) # directly write data to the underlying stream n = Int(unsafe_write(stream.stream, input, nbytes)) + stream.state.bytes_written_out += n return n end buffer = stream.buffer1 @@ -123,9 +127,10 @@ function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt): copydata!(buffer, input, Int(nbytes)) return Int(nbytes) else - flushbuffer(stream) + flush_buffer2(stream) # directly write data to the underlying stream n = Int(unsafe_write(stream.stream, input, nbytes)) + stream.state.bytes_written_out += n return n end end @@ -148,17 +153,20 @@ function stats(stream::NoopStream) buffer = stream.buffer1 @assert buffer === stream.buffer2 if mode === :idle - consumed = supplied = 0 + in = out = 0 elseif mode === :read - supplied = buffer.transcoded - consumed = supplied - buffersize(buffer) + in = buffer.transcoded + out = in - buffersize(buffer) elseif mode === :write - supplied = buffer.transcoded + buffersize(buffer) - consumed = buffer.transcoded + out = stream.state.bytes_written_out + in = out + if !has_sharedbuf(stream) + in += buffersize(buffer) + end else throw_invalid_mode(mode) end - return Stats(consumed, supplied, supplied, supplied) + return Stats(in, out, out, out) end @@ -190,24 +198,15 @@ function fillbuffer(stream::NoopStream; eager::Bool = false)::Int return nfilled end -function flushbuffer(stream::NoopStream, all::Bool=false) - changemode!(stream, :write) - buffer = stream.buffer1 - @assert buffer === stream.buffer2 - nflushed::Int = 0 - if all - while buffersize(buffer) > 0 - nflushed += writedata!(stream.stream, buffer) - end - else - nflushed += writedata!(stream.stream, buffer) - makemargin!(buffer, 0) - end - buffer.transcoded += nflushed - return nflushed +# Empty buffer1 by writing out data. +# `stream` must be in :write mode. +# Ensure there is margin available in buffer1 for at least one byte. +function flush_buffer1(stream::NoopStream)::Nothing + flush_buffer2(stream) end +# This is always called after `flush_buffer1(stream)` function flushuntilend(stream::NoopStream) - stream.buffer1.transcoded += writedata!(stream.stream, stream.buffer1) + @assert iszero(buffersize(stream.buffer1)) return end diff --git a/src/state.jl b/src/state.jl index 0cb54b6c..134b9495 100644 --- a/src/state.jl +++ b/src/state.jl @@ -24,8 +24,11 @@ mutable struct State buffer1::Buffer buffer2::Buffer + # Number of bytes written to the underlying stream + bytes_written_out::Int64 + function State(buffer1::Buffer, buffer2::Buffer) - return new(:idle, :ok, false, Error(), buffer1, buffer2) + return new(:idle, :ok, false, Error(), buffer1, buffer2, Int64(0)) end end diff --git a/src/stream.jl b/src/stream.jl index 22ef593f..ff7f0fda 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -494,21 +494,24 @@ function Base.write(stream::TranscodingStream) return 0 end -function Base.write(stream::TranscodingStream, b::UInt8) +function Base.write(stream::TranscodingStream, b::UInt8)::Int changemode!(stream, :write) - if marginsize(stream.buffer1) == 0 && flushbuffer(stream) == 0 - return 0 - end - return writebyte!(stream.buffer1, b) + buffer1 = stream.buffer1 + marginsize(buffer1) > 0 || flush_buffer1(stream) + writebyte!(buffer1, b) + return 1 end function Base.unsafe_write(stream::TranscodingStream, input::Ptr{UInt8}, nbytes::UInt) changemode!(stream, :write) - state = stream.state + Int(nbytes) # Error if nbytes > typemax Int buffer1 = stream.buffer1 p = input p_end = p + nbytes - while p < p_end && (marginsize(buffer1) > 0 || flushbuffer(stream) > 0) + while p < p_end + if marginsize(buffer1) ≤ 0 + flush_buffer1(stream) + end m = min(marginsize(buffer1), p_end - p) copydata!(buffer1, p, m) p += m @@ -535,7 +538,7 @@ const TOKEN_END = EndToken() function Base.write(stream::TranscodingStream, ::EndToken) changemode!(stream, :write) - flushbufferall(stream) + flush_buffer1(stream) flushuntilend(stream) return 0 end @@ -543,8 +546,8 @@ end function Base.flush(stream::TranscodingStream) checkmode(stream) if stream.state.mode == :write - flushbufferall(stream) - writedata!(stream.stream, stream.buffer2) + flush_buffer1(stream) + flush_buffer2(stream) end flush(stream.stream) end @@ -600,9 +603,12 @@ function stats(stream::TranscodingStream) out = transcoded_out - buffersize(buffer1) elseif mode === :write transcoded_in = buffer1.transcoded - transcoded_out = buffer2.transcoded + out = state.bytes_written_out + transcoded_out = out + if !has_sharedbuf(stream) + transcoded_out += buffersize(buffer2) + end in = transcoded_in + buffersize(buffer1) - out = transcoded_out - buffersize(buffer2) else throw_invalid_mode(mode) end @@ -633,38 +639,37 @@ function fillbuffer(stream::TranscodingStream; eager::Bool = false) return nfilled end -function flushbuffer(stream::TranscodingStream, all::Bool=false) - changemode!(stream, :write) +# Empty buffer1 by writing out data. +# `stream` must be in :write mode. +# Ensure there is margin available in buffer1 for at least one byte. +function flush_buffer1(stream::TranscodingStream)::Nothing state = stream.state buffer1 = stream.buffer1 buffer2 = stream.buffer2 - nflushed::Int = 0 - while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) + while buffersize(buffer1) > 0 if state.code == :end callstartproc(stream, :write) end - writedata!(stream.stream, buffer2) - Δin, _ = callprocess(stream, buffer1, buffer2) - nflushed += Δin + flush_buffer2(stream) + callprocess(stream, buffer1, buffer2) end - return nflushed -end - -function flushbufferall(stream::TranscodingStream) - return flushbuffer(stream, true) + # move positions to the start of the buffer + @assert !iszero(makemargin!(buffer1, 0)) + return end +# This is always called after `flush_buffer1(stream)` function flushuntilend(stream::TranscodingStream) - changemode!(stream, :write) state = stream.state buffer1 = stream.buffer1 buffer2 = stream.buffer2 + @assert buffersize(buffer1) == 0 + @assert stream.state.mode === :write while state.code != :end - writedata!(stream.stream, buffer2) + flush_buffer2(stream) callprocess(stream, buffer1, buffer2) end - writedata!(stream.stream, buffer2) - @assert buffersize(buffer1) == 0 + flush_buffer2(stream) return end @@ -687,7 +692,7 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) state = stream.state input = buffermem(inbuf) GC.@preserve inbuf makemargin!(outbuf, minoutsize(stream.codec, input)) - Δin, Δout, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error) + Δin::Int, Δout::Int, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error) @debug( "called process()", code = state.code, @@ -698,6 +703,12 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) ) consumed!(inbuf, Δin, transcode = true) supplied!(outbuf, Δout, transcode = true) + if has_sharedbuf(stream) + if stream.state.mode === :write + # this must be updated before throwing any error if outbuf is shared. + stream.state.bytes_written_out += Δout + end + end if state.code == :error changemode!(stream, :panic) elseif state.code == :ok && Δin == Δout == 0 @@ -745,20 +756,28 @@ function readdata!(input::IO, output::Buffer)::Int end # Write all data to `output` from the buffer of `input`. -function writedata!(output::IO, input::Buffer) - if output isa TranscodingStream && output.buffer1 === input +function flush_buffer2(stream::TranscodingStream)::Nothing + output = stream.stream + buffer2 = stream.buffer2 + state = stream.state + @assert state.mode === :write + if has_sharedbuf(stream) # Delegate the operation to the underlying stream for shared buffers. - return flushbufferall(output) - end - nwritten::Int = 0 - while buffersize(input) > 0 - n = GC.@preserve input Base.unsafe_write(output, bufferptr(input), buffersize(input)) - consumed!(input, n) - nwritten += n + changemode!(output, :write) + flush_buffer1(output) + else + while buffersize(buffer2) > 0 + n::Int = GC.@preserve buffer2 Base.unsafe_write(output, bufferptr(buffer2), buffersize(buffer2)) + n ≤ 0 && error("short write") + consumed!(buffer2, n) + state.bytes_written_out += n + GC.safepoint() + end + # move positions to the start of the buffer + @assert !iszero(makemargin!(buffer2, 0)) GC.safepoint() end - GC.safepoint() - return nwritten + nothing end @@ -800,7 +819,7 @@ function changemode!(stream::TranscodingStream, newmode::Symbol) end elseif mode == :write if newmode == :close - flushbufferall(stream) + flush_buffer1(stream) flushuntilend(stream) state.mode = newmode finalize_codec(stream.codec, state.error) diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index 64b6d83a..ca0c8187 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -303,6 +303,89 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD end end + @testset "stats" begin + @testset "read" begin + stream = DoubleFrameEncoderStream(IOBuffer(b"foobar")) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 16 + @test stat.out == 16 + close(stream) + + #nested Streams + stream = DoubleFrameDecoderStream(DoubleFrameEncoderStream(IOBuffer(b"foobar"))) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + read(stream) + stat = TranscodingStreams.stats(stream) + @test_broken stat.in == 16 + @test_broken stat.transcoded_in == 16 + @test stat.transcoded_out == 6 + @test stat.out == 6 + close(stream) + end + + @testset "write" begin + stream = DoubleFrameEncoderStream(IOBuffer()) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 0 + @test stat.transcoded_out == 0 + @test stat.out == 0 + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 14 + @test stat.out == 14 + write(stream, TranscodingStreams.TOKEN_END) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 16 + @test stat.out == 16 + close(stream) + + #nested Streams + stream = DoubleFrameDecoderStream(DoubleFrameEncoderStream(IOBuffer())) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + write(stream, b"[ ffoooobbaarr ]") + stat = TranscodingStreams.stats(stream) + @test stat.in == 16 + @test stat.transcoded_in == 0 + @test stat.transcoded_out == 0 + @test stat.out == 0 + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 16 + @test stat.transcoded_in == 16 + @test stat.transcoded_out == 6 + @test stat.out == 6 + @test_broken position(stream.stream) == 6 + close(stream) + end + end + + @testset "underlying stream fails" begin + sink = IOBuffer(;maxsize=4) + stream = DoubleFrameEncoderStream(sink) + @test write(stream, "abcd") == 4 + # make sure flush doesn't go into an infinite loop + @test_throws ErrorException("short write") flush(stream) + end + test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_write(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_lines(DoubleFrameEncoderStream, DoubleFrameDecoderStream) diff --git a/test/codecnoop.jl b/test/codecnoop.jl index 6203787b..abcd2751 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -171,22 +171,65 @@ using FillArrays: Zeros @test s1.buffer1 === s2.buffer1 === s3.buffer1 === s1.buffer2 === s2.buffer2 === s3.buffer2 - stream = TranscodingStream(Noop(), IOBuffer(b"foobar")) - @test TranscodingStreams.stats(stream).in === Int64(0) - @test TranscodingStreams.stats(stream).out === Int64(0) - read(stream) - @test TranscodingStreams.stats(stream).in === Int64(6) - @test TranscodingStreams.stats(stream).out === Int64(6) - close(stream) + @testset "stats" begin + stream = TranscodingStream(Noop(), IOBuffer(b"foobar")) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + eof(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(0) + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) - stream = TranscodingStream(Noop(), IOBuffer()) - @test TranscodingStreams.stats(stream).in === Int64(0) - @test TranscodingStreams.stats(stream).out === Int64(0) - write(stream, b"foobar") - flush(stream) - @test TranscodingStreams.stats(stream).in === Int64(6) - @test TranscodingStreams.stats(stream).out === Int64(6) - close(stream) + #nested NoopStreams + stream = NoopStream(NoopStream(IOBuffer(b"foobar"))) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + eof(stream) + stat = TranscodingStreams.stats(stream) + @test_broken stat.in === Int64(0) + @test stat.out === Int64(0) + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + + stream = TranscodingStream(Noop(), IOBuffer()) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(0) + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + + #nested NoopStreams + stream = NoopStream(NoopStream(IOBuffer())) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + end stream = TranscodingStream(Noop(), IOBuffer()) @test stream.state.mode == :idle From b5b03e6f027ff48b07e4e19b0598d50a0a317737 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerberg <39104088+nhz2@users.noreply.github.com> Date: Mon, 17 Jun 2024 21:58:10 -0400 Subject: [PATCH 3/6] Fix peek Char (#225) --- src/stream.jl | 12 ++++++++++-- test/codecdoubleframe.jl | 10 ++++++++++ test/codecnoop.jl | 8 ++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/stream.jl b/src/stream.jl index ff7f0fda..ee9b7c6a 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -314,7 +314,8 @@ end # Read Functions # -------------- -function Base.read(stream::TranscodingStream, ::Type{UInt8}) +# needed for `peek(stream, Char)` to work +function Base.peek(stream::TranscodingStream, ::Type{UInt8})::UInt8 # eof and ready_to_read! are inlined here because ready_to_read! is very slow and eof is broken eof = buffersize(stream.buffer1) == 0 state = stream.state @@ -325,7 +326,14 @@ function Base.read(stream::TranscodingStream, ::Type{UInt8}) if eof && sloweof(stream) throw(EOFError()) end - return readbyte!(stream.buffer1) + buf = stream.buffer1 + return buf.data[buf.bufferpos] +end + +function Base.read(stream::TranscodingStream, ::Type{UInt8})::UInt8 + x = peek(stream) + consumed!(stream.buffer1, 1) + x end function Base.readuntil(stream::TranscodingStream, delim::UInt8; keep::Bool=false) diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index ca0c8187..5d0feaf1 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -386,6 +386,16 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD @test_throws ErrorException("short write") flush(stream) end + @testset "peek" begin + stream = DoubleFrameDecoderStream(DoubleFrameEncoderStream(IOBuffer( + codeunits("こんにちは") + ))) + @test peek(stream) == 0xe3 + @test peek(stream, Char) == 'こ' + @test peek(stream, Int32) == -476872221 + close(stream) + end + test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_write(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_lines(DoubleFrameEncoderStream, DoubleFrameDecoderStream) diff --git a/test/codecnoop.jl b/test/codecnoop.jl index abcd2751..bdbfae21 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -541,4 +541,12 @@ using FillArrays: Zeros @test take!(sink) == b"abcd" end + @testset "peek" begin + stream = NoopStream(IOBuffer(codeunits("こんにちは"))) + @test peek(stream) == 0xe3 + @test peek(stream, Char) == 'こ' + @test peek(stream, Int32) == -476872221 + close(stream) + end + end From 6b13d1f4caa6dca4e21565019617840e1bbb6b62 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerberg <39104088+nhz2@users.noreply.github.com> Date: Tue, 18 Jun 2024 10:40:55 -0400 Subject: [PATCH 4/6] Fix position of underlying stream when sharing buffers (#222) --- fuzz/fuzz.jl | 55 ++++++++++++++++++++++++++-------------- src/noop.jl | 8 ++++-- src/stream.jl | 17 ++++++++++--- test/codecdoubleframe.jl | 6 ++--- test/codecnoop.jl | 2 +- 5 files changed, 59 insertions(+), 29 deletions(-) diff --git a/fuzz/fuzz.jl b/fuzz/fuzz.jl index 8710e0ee..795a5784 100644 --- a/fuzz/fuzz.jl +++ b/fuzz/fuzz.jl @@ -91,15 +91,41 @@ read_methods = Data.SampledFrom([ end ]) +# Return true if the stats of a stream are self consistent +# This function assumes stream was never seeked. +function is_stats_consistent(stream) + if stream isa TranscodingStream + s = TranscodingStreams.stats(stream) + inner_pos = position(stream.stream) + pos = position(stream) + # event!("stats(stream)", s) + # event!("position(stream.stream)", inner_pos) + # event!("position(stream)", pos) + if isreadable(stream) + s.out == pos || return false + s.in == inner_pos || return false + else + iswritable(stream) || return false + s.in == pos || return false + s.out == inner_pos || return false + end + s.transcoded_in ≤ s.in || return false + s.transcoded_out ≥ s.out || return false + end + true +end @check function read_byte_data( kws=read_codecs_kws, data=datas, ) stream = wrap_stream(kws, IOBuffer(data)) - for i in eachindex(data) + for i in 1:length(data) + position(stream) == i-1 || return false + is_stats_consistent(stream) || return false read(stream, UInt8) == data[i] || return false end + is_stats_consistent(stream) || return false eof(stream) end @check function read_data( @@ -108,6 +134,7 @@ end ) stream = wrap_stream(kws, IOBuffer(data)) read(stream) == data || return false + is_stats_consistent(stream) || return false eof(stream) end @check function read_data_methods( @@ -122,13 +149,15 @@ end append!(x, d) length(x) == position(stream) || return false end + is_stats_consistent(stream) || return false x == data[eachindex(x)] end # flush all nested streams and return final data function take_all(stream) if stream isa Base.GenericIOBuffer - take!(stream) + seekstart(stream) + read(stream) else write(stream, TranscodingStreams.TOKEN_END) flush(stream) @@ -144,7 +173,9 @@ const write_codecs_kws = map(reverse, read_codecs_kws) ) stream = wrap_stream(kws, IOBuffer()) write(stream, data) == length(data) || return false - take_all(stream) == data + take_all(stream) == data || return false + is_stats_consistent(stream) || return false + true end @check function write_byte_data( kws=write_codecs_kws, @@ -153,24 +184,10 @@ end stream = wrap_stream(kws, IOBuffer()) for i in 1:length(data) position(stream) == i-1 || return false - if stream isa TranscodingStream - s = TranscodingStreams.stats(stream) - s.in == i-1 || return false - # TODO fix position(stream.stream) - # s.out == position(stream.stream) || return false - # s.transcoded_in == s.out || return false - # s.transcoded_out == s.out || return false - end + is_stats_consistent(stream) || return false write(stream, data[i]) == 1 || return false end take_all(stream) == data || return false - if stream isa TranscodingStream - s = TranscodingStreams.stats(stream) - s.in == length(data) || return false - # TODO fix position(stream.stream) - # s.out == position(stream.stream) || return false - # s.transcoded_in == s.out || return false - # s.transcoded_out == s.out || return false - end + is_stats_consistent(stream) || return false true end \ No newline at end of file diff --git a/src/noop.jl b/src/noop.jl index cbdcf097..3b63205c 100644 --- a/src/noop.jl +++ b/src/noop.jl @@ -155,8 +155,12 @@ function stats(stream::NoopStream) if mode === :idle in = out = 0 elseif mode === :read - in = buffer.transcoded - out = in - buffersize(buffer) + out = buffer.transcoded - buffersize(buffer) + if has_sharedbuf(stream) + in = out + else + in = buffer.transcoded + end elseif mode === :write out = stream.state.bytes_written_out in = out diff --git a/src/stream.jl b/src/stream.jl index ee9b7c6a..105210ed 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -605,10 +605,15 @@ function stats(stream::TranscodingStream) if mode === :idle transcoded_in = transcoded_out = in = out = 0 elseif mode === :read || mode === :stop - transcoded_in = buffer2.transcoded transcoded_out = buffer1.transcoded - in = transcoded_in + buffersize(buffer2) out = transcoded_out - buffersize(buffer1) + if has_sharedbuf(stream) + transcoded_in = stats(stream.stream).out + in = transcoded_in + else + transcoded_in = buffer2.transcoded + in = transcoded_in + buffersize(buffer2) + end elseif mode === :write transcoded_in = buffer1.transcoded out = state.bytes_written_out @@ -709,8 +714,12 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) input_delta = Δin, output_delta = Δout, ) - consumed!(inbuf, Δin, transcode = true) - supplied!(outbuf, Δout, transcode = true) + consumed!(inbuf, Δin; + transcode = !has_sharedbuf(stream) || stream.state.mode === :write, + ) # inbuf is buffer1 if mode is :write + supplied!(outbuf, Δout; + transcode = !has_sharedbuf(stream) || stream.state.mode !== :write, + ) # outbuf is buffer1 if mode is not :write if has_sharedbuf(stream) if stream.state.mode === :write # this must be updated before throwing any error if outbuf is shared. diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index 5d0feaf1..7730d8a4 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -324,8 +324,8 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD @test stat.out == 0 read(stream) stat = TranscodingStreams.stats(stream) - @test_broken stat.in == 16 - @test_broken stat.transcoded_in == 16 + @test stat.in == 16 + @test stat.transcoded_in == 16 @test stat.transcoded_out == 6 @test stat.out == 6 close(stream) @@ -373,7 +373,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD @test stat.transcoded_in == 16 @test stat.transcoded_out == 6 @test stat.out == 6 - @test_broken position(stream.stream) == 6 + @test position(stream.stream) == 6 close(stream) end end diff --git a/test/codecnoop.jl b/test/codecnoop.jl index bdbfae21..02a6bd48 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -193,7 +193,7 @@ using FillArrays: Zeros @test stat.out === Int64(0) eof(stream) stat = TranscodingStreams.stats(stream) - @test_broken stat.in === Int64(0) + @test stat.in === Int64(0) @test stat.out === Int64(0) read(stream) stat = TranscodingStreams.stats(stream) From bcad9e0de83d76ff7f0df0ec7f247577c670a44a Mon Sep 17 00:00:00 2001 From: Nathan Zimmerberg <39104088+nhz2@users.noreply.github.com> Date: Thu, 20 Jun 2024 10:24:23 -0400 Subject: [PATCH 5/6] Fix printing of fuzz.jl results (#226) --- fuzz/fuzz.jl | 112 ++++++++++++++++++++++++++------------------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/fuzz/fuzz.jl b/fuzz/fuzz.jl index 795a5784..7e9d82c1 100644 --- a/fuzz/fuzz.jl +++ b/fuzz/fuzz.jl @@ -115,42 +115,44 @@ function is_stats_consistent(stream) true end -@check function read_byte_data( - kws=read_codecs_kws, - data=datas, - ) - stream = wrap_stream(kws, IOBuffer(data)) - for i in 1:length(data) - position(stream) == i-1 || return false +@testset "read" begin + @check function read_byte_data( + kws=read_codecs_kws, + data=datas, + ) + stream = wrap_stream(kws, IOBuffer(data)) + for i in 1:length(data) + position(stream) == i-1 || return false + is_stats_consistent(stream) || return false + read(stream, UInt8) == data[i] || return false + end is_stats_consistent(stream) || return false - read(stream, UInt8) == data[i] || return false + eof(stream) end - is_stats_consistent(stream) || return false - eof(stream) -end -@check function read_data( - kws=read_codecs_kws, - data=datas, - ) - stream = wrap_stream(kws, IOBuffer(data)) - read(stream) == data || return false - is_stats_consistent(stream) || return false - eof(stream) -end -@check function read_data_methods( - kws=read_codecs_kws, - data=datas, - rs=Data.Vectors(read_methods), - ) - stream = wrap_stream(kws, IOBuffer(data)) - x = UInt8[] - for r in rs - d = r(stream) - append!(x, d) - length(x) == position(stream) || return false + @check function read_data( + kws=read_codecs_kws, + data=datas, + ) + stream = wrap_stream(kws, IOBuffer(data)) + read(stream) == data || return false + is_stats_consistent(stream) || return false + eof(stream) + end + @check function read_data_methods( + kws=read_codecs_kws, + data=datas, + rs=Data.Vectors(read_methods), + ) + stream = wrap_stream(kws, IOBuffer(data)) + x = UInt8[] + for r in rs + d = r(stream) + append!(x, d) + length(x) == position(stream) || return false + end + is_stats_consistent(stream) || return false + x == data[eachindex(x)] end - is_stats_consistent(stream) || return false - x == data[eachindex(x)] end # flush all nested streams and return final data @@ -167,27 +169,29 @@ end const write_codecs_kws = map(reverse, read_codecs_kws) -@check function write_data( - kws=write_codecs_kws, - data=datas, - ) - stream = wrap_stream(kws, IOBuffer()) - write(stream, data) == length(data) || return false - take_all(stream) == data || return false - is_stats_consistent(stream) || return false - true -end -@check function write_byte_data( - kws=write_codecs_kws, - data=datas, - ) - stream = wrap_stream(kws, IOBuffer()) - for i in 1:length(data) - position(stream) == i-1 || return false +@testset "write" begin + @check function write_data( + kws=write_codecs_kws, + data=datas, + ) + stream = wrap_stream(kws, IOBuffer()) + write(stream, data) == length(data) || return false + take_all(stream) == data || return false is_stats_consistent(stream) || return false - write(stream, data[i]) == 1 || return false + true + end + @check function write_byte_data( + kws=write_codecs_kws, + data=datas, + ) + stream = wrap_stream(kws, IOBuffer()) + for i in 1:length(data) + position(stream) == i-1 || return false + is_stats_consistent(stream) || return false + write(stream, data[i]) == 1 || return false + end + take_all(stream) == data || return false + is_stats_consistent(stream) || return false + true end - take_all(stream) == data || return false - is_stats_consistent(stream) || return false - true end \ No newline at end of file From 74ac5c8dd74d877d1f5a90d0a9706e2ea2f9f8a9 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerberg <39104088+nhz2@users.noreply.github.com> Date: Thu, 20 Jun 2024 17:19:04 -0400 Subject: [PATCH 6/6] Add `GC.@preserve` when using pointers (#221) --- docs/src/examples.md | 2 +- src/memory.jl | 1 + src/stream.jl | 28 ++++++++++-------- src/transcode.jl | 15 +++++----- test/codecnoop.jl | 10 ++++--- test/codecquadruple.jl | 3 +- test/runtests.jl | 64 ++++++++++++++++++++++-------------------- 7 files changed, 67 insertions(+), 56 deletions(-) diff --git a/docs/src/examples.md b/docs/src/examples.md index 5b366d81..3644c45a 100644 --- a/docs/src/examples.md +++ b/docs/src/examples.md @@ -178,7 +178,7 @@ using CodecZlib function decompress(input, output) buffer = Vector{UInt8}(undef, 16 * 1024) - while !eof(input) + GC.@preserve buffer while !eof(input) n = min(bytesavailable(input), length(buffer)) unsafe_read(input, pointer(buffer), n) unsafe_write(output, pointer(buffer), n) diff --git a/src/memory.jl b/src/memory.jl index 357df640..81eceb81 100644 --- a/src/memory.jl +++ b/src/memory.jl @@ -11,6 +11,7 @@ struct Memory size::UInt end +# TODO remove this method function Memory(data::ByteData) return Memory(pointer(data), sizeof(data)) end diff --git a/src/stream.jl b/src/stream.jl index 105210ed..c6c5073b 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -343,16 +343,18 @@ function Base.readuntil(stream::TranscodingStream, delim::UInt8; keep::Bool=fals local ret::Vector{UInt8} filled = 0 while !eof(stream) - p = findbyte(buffer1, delim) - found = false - if p < marginptr(buffer1) - found = true - sz = Int(p + 1 - bufferptr(buffer1)) - if !keep - sz -= 1 + GC.@preserve buffer1 begin + p = findbyte(buffer1, delim) + found = false + if p < marginptr(buffer1) + found = true + sz = Int(p + 1 - bufferptr(buffer1)) + if !keep + sz -= 1 + end + else + sz = buffersize(buffer1) end - else - sz = buffersize(buffer1) end if @isdefined(ret) resize!(ret, filled + sz) @@ -703,9 +705,11 @@ end # Call `process` with prologue and epilogue. function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) state = stream.state - input = buffermem(inbuf) - GC.@preserve inbuf makemargin!(outbuf, minoutsize(stream.codec, input)) - Δin::Int, Δout::Int, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error) + makemargin!( + outbuf, + GC.@preserve(inbuf, minoutsize(stream.codec, buffermem(inbuf))), + ) + Δin::Int, Δout::Int, state.code = GC.@preserve inbuf outbuf process(stream.codec, buffermem(inbuf), marginmem(outbuf), state.error) @debug( "called process()", code = state.code, diff --git a/src/transcode.jl b/src/transcode.jl index 32cb5bee..fca33736 100644 --- a/src/transcode.jl +++ b/src/transcode.jl @@ -46,10 +46,10 @@ function Base.transcode(codec::Type{C}, src::String) where {C<:Codec} end _default_output_buffer(codec, input) = Buffer( - initial_output_size( + GC.@preserve(input, initial_output_size( codec, buffermem(input) - ) + )) ) """ @@ -125,8 +125,7 @@ function transcode!( Base.mightalias(input.data, output.data) && error( "input and outbut buffers must be independent" ) - # GC.@preserve since unsafe_transcode! may convert to raw pointers - GC.@preserve input output codec unsafe_transcode!(output, codec, input) + unsafe_transcode!(output, codec, input) end """ @@ -148,10 +147,10 @@ function unsafe_transcode!( if code === :error @goto error end - n = minoutsize(codec, buffermem(input)) + n = GC.@preserve input minoutsize(codec, buffermem(input)) @label process makemargin!(output, n) - Δin, Δout, code = process(codec, buffermem(input), marginmem(output), error) + Δin, Δout, code = GC.@preserve input output process(codec, buffermem(input), marginmem(output), error) @debug( "called process()", code = code, @@ -169,13 +168,13 @@ function unsafe_transcode!( if startproc(codec, :write, error) === :error @goto error end - n = minoutsize(codec, buffermem(input)) + n = GC.@preserve input minoutsize(codec, buffermem(input)) @goto process end resize!(output.data, output.marginpos - 1) return output.data else - n = max(Δout, minoutsize(codec, buffermem(input))) + n = GC.@preserve input max(Δout, minoutsize(codec, buffermem(input))) @goto process end @label error diff --git a/test/codecnoop.jl b/test/codecnoop.jl index 02a6bd48..9db6bc14 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -24,13 +24,14 @@ using FillArrays: Zeros stream = TranscodingStream(Noop(), IOBuffer()) @test_throws EOFError read(stream, UInt8) - @test_throws EOFError unsafe_read(stream, pointer(Vector{UInt8}(undef, 3)), 3) + data = Vector{UInt8}(undef, 3) + @test_throws EOFError GC.@preserve data unsafe_read(stream, pointer(data), 3) close(stream) stream = TranscodingStream(Noop(), IOBuffer("foobar"), bufsize=1) @test read(stream, UInt8) === UInt8('f') data = Vector{UInt8}(undef, 5) - unsafe_read(stream, pointer(data), 5) === nothing + GC.@preserve data unsafe_read(stream, pointer(data), 5) === nothing @test data == b"oobar" close(stream) @@ -122,7 +123,7 @@ using FillArrays: Zeros stream = TranscodingStream(Noop(), IOBuffer("foo")) out = zeros(UInt8, 3) @test bytesavailable(stream) == 0 - @test TranscodingStreams.unsafe_read(stream, pointer(out), 10) == 3 + @test GC.@preserve out TranscodingStreams.unsafe_read(stream, pointer(out), 10) == 3 @test out == b"foo" close(stream) @@ -384,7 +385,8 @@ using FillArrays: Zeros @test eof(stream) stream = NoopStream(IOBuffer("foobar")) - @test_throws ArgumentError TranscodingStreams.unsafe_unread(stream, pointer(b"foo"), -1) + data = b"foo" + @test_throws ArgumentError GC.@preserve data TranscodingStreams.unsafe_unread(stream, pointer(data), -1) close(stream) stream = NoopStream(IOBuffer("foo")) diff --git a/test/codecquadruple.jl b/test/codecquadruple.jl index 31a0ef53..fe481b67 100644 --- a/test/codecquadruple.jl +++ b/test/codecquadruple.jl @@ -81,7 +81,8 @@ end close(stream2) stream = TranscodingStream(QuadrupleCodec(), IOBuffer("foo")) - @test_throws EOFError unsafe_read(stream, pointer(Vector{UInt8}(undef, 13)), 13) + data = Vector{UInt8}(undef, 13) + @test_throws EOFError GC.@preserve data unsafe_read(stream, pointer(data), 13) close(stream) @testset "position" begin diff --git a/test/runtests.jl b/test/runtests.jl index 310da504..2bca20af 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -26,13 +26,15 @@ using TranscodingStreams: data = Vector{UInt8}(b"foobar") buf = Buffer(data) - @test buf isa Buffer - @test bufferptr(buf) === pointer(data) - @test buffersize(buf) === 6 - @test buffermem(buf) === Memory(pointer(data), 6) - @test marginptr(buf) === pointer(data) + 6 - @test marginsize(buf) === 0 - @test marginmem(buf) === Memory(pointer(data)+6, 0) + GC.@preserve data buf begin + @test buf isa Buffer + @test bufferptr(buf) === pointer(data) + @test buffersize(buf) === 6 + @test buffermem(buf) === Memory(pointer(data), 6) + @test marginptr(buf) === pointer(data) + 6 + @test marginsize(buf) === 0 + @test marginmem(buf) === Memory(pointer(data)+6, 0) + end buf = Buffer(2) writebyte!(buf, 0x34) @@ -72,31 +74,33 @@ end @testset "Memory" begin data = Vector{UInt8}(b"foobar") - mem = TranscodingStreams.Memory(pointer(data), sizeof(data)) - @test mem isa TranscodingStreams.Memory - @test mem.ptr === pointer(data) - @test mem.size === length(mem) === UInt(sizeof(data)) - @test lastindex(mem) === 6 - @test mem[1] === UInt8('f') - @test mem[2] === UInt8('o') - @test mem[3] === UInt8('o') - @test mem[4] === UInt8('b') - @test mem[5] === UInt8('a') - @test mem[6] === UInt8('r') - @test_throws BoundsError mem[7] - @test_throws BoundsError mem[0] - mem[1] = UInt8('z') - @test mem[1] === UInt8('z') - mem[3] = UInt8('!') - @test mem[3] === UInt8('!') - @test_throws BoundsError mem[7] = 0x00 - @test_throws BoundsError mem[0] = 0x00 + GC.@preserve data let mem = TranscodingStreams.Memory(pointer(data), sizeof(data)) + @test mem isa TranscodingStreams.Memory + @test mem.ptr === pointer(data) + @test mem.size === length(mem) === UInt(sizeof(data)) + @test lastindex(mem) === 6 + @test mem[1] === UInt8('f') + @test mem[2] === UInt8('o') + @test mem[3] === UInt8('o') + @test mem[4] === UInt8('b') + @test mem[5] === UInt8('a') + @test mem[6] === UInt8('r') + @test_throws BoundsError mem[7] + @test_throws BoundsError mem[0] + mem[1] = UInt8('z') + @test mem[1] === UInt8('z') + mem[3] = UInt8('!') + @test mem[3] === UInt8('!') + @test_throws BoundsError mem[7] = 0x00 + @test_throws BoundsError mem[0] = 0x00 + end data = Vector{UInt8}(b"foobar") - mem = TranscodingStreams.Memory(data) - @test mem isa TranscodingStreams.Memory - @test mem.ptr == pointer(data) - @test mem.size == sizeof(data) + GC.@preserve data let mem = TranscodingStreams.Memory(pointer(data), sizeof(data)) + @test mem isa TranscodingStreams.Memory + @test mem.ptr == pointer(data) + @test mem.size == sizeof(data) + end end @testset "Stats" begin