diff --git a/Project.toml b/Project.toml index e02dee0e..d4ddee32 100644 --- a/Project.toml +++ b/Project.toml @@ -2,7 +2,7 @@ name = "TranscodingStreams" uuid = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" license = "MIT" authors = ["Kenta Sato "] -version = "0.10.8" +version = "0.10.9" [deps] Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" diff --git a/docs/src/devnotes.md b/docs/src/devnotes.md index 5fd968c8..ccae4124 100644 --- a/docs/src/devnotes.md +++ b/docs/src/devnotes.md @@ -41,6 +41,7 @@ default buffer size is 16KiB for each. - `error`: exception returned by the codec (`<:Error`) - `buffer1`: data buffer that is closer to the user (`<:Buffer`) - `buffer2`: data buffer that is farther to the user (`<:Buffer`) +- `bytes_written_out`: number of bytes written to the underlying stream (`<:Int64`) The `mode` field may be one of the following value: - `:idle` : initial and intermediate mode, no buffered data @@ -78,8 +79,10 @@ Shared buffers Adjacent transcoding streams may share their buffers. This will reduce memory allocation and eliminate data copy between buffers. -`readdata!(input::IO, output::Buffer)` and `writedata!(output::IO, -input::Buffer)` do the actual work of read/write data from/to the underlying +If `buffer2` is shared it is considered to be owned by the underlying stream +by the `stats` and `position` functions. + +`readdata!(input::IO, output::Buffer)` and `flush_buffer2(stream::TranscodingStream)` do the actual work of read/write data from/to the underlying stream. These methods have a special pass for shared buffers. diff --git a/docs/src/examples.md b/docs/src/examples.md index 5b366d81..3644c45a 100644 --- a/docs/src/examples.md +++ b/docs/src/examples.md @@ -178,7 +178,7 @@ using CodecZlib function decompress(input, output) buffer = Vector{UInt8}(undef, 16 * 1024) - while !eof(input) + GC.@preserve buffer while !eof(input) n = min(bytesavailable(input), length(buffer)) unsafe_read(input, pointer(buffer), n) unsafe_write(output, pointer(buffer), n) diff --git a/fuzz/fuzz.jl b/fuzz/fuzz.jl index ef36dcc5..7e9d82c1 100644 --- a/fuzz/fuzz.jl +++ b/fuzz/fuzz.jl @@ -91,44 +91,75 @@ read_methods = Data.SampledFrom([ end ]) - -@check function read_byte_data( - kws=read_codecs_kws, - data=datas, - ) - stream = wrap_stream(kws, IOBuffer(data)) - for i in eachindex(data) - read(stream, UInt8) == data[i] || return false +# Return true if the stats of a stream are self consistent +# This function assumes stream was never seeked. +function is_stats_consistent(stream) + if stream isa TranscodingStream + s = TranscodingStreams.stats(stream) + inner_pos = position(stream.stream) + pos = position(stream) + # event!("stats(stream)", s) + # event!("position(stream.stream)", inner_pos) + # event!("position(stream)", pos) + if isreadable(stream) + s.out == pos || return false + s.in == inner_pos || return false + else + iswritable(stream) || return false + s.in == pos || return false + s.out == inner_pos || return false + end + s.transcoded_in ≤ s.in || return false + s.transcoded_out ≥ s.out || return false end - eof(stream) -end -@check function read_data( - kws=read_codecs_kws, - data=datas, - ) - stream = wrap_stream(kws, IOBuffer(data)) - read(stream) == data || return false - eof(stream) + true end -@check function read_data_methods( - kws=read_codecs_kws, - data=datas, - rs=Data.Vectors(read_methods), - ) - stream = wrap_stream(kws, IOBuffer(data)) - x = UInt8[] - for r in rs - d = r(stream) - append!(x, d) - length(x) == position(stream) || return false + +@testset "read" begin + @check function read_byte_data( + kws=read_codecs_kws, + data=datas, + ) + stream = wrap_stream(kws, IOBuffer(data)) + for i in 1:length(data) + position(stream) == i-1 || return false + is_stats_consistent(stream) || return false + read(stream, UInt8) == data[i] || return false + end + is_stats_consistent(stream) || return false + eof(stream) + end + @check function read_data( + kws=read_codecs_kws, + data=datas, + ) + stream = wrap_stream(kws, IOBuffer(data)) + read(stream) == data || return false + is_stats_consistent(stream) || return false + eof(stream) + end + @check function read_data_methods( + kws=read_codecs_kws, + data=datas, + rs=Data.Vectors(read_methods), + ) + stream = wrap_stream(kws, IOBuffer(data)) + x = UInt8[] + for r in rs + d = r(stream) + append!(x, d) + length(x) == position(stream) || return false + end + is_stats_consistent(stream) || return false + x == data[eachindex(x)] end - x == data[eachindex(x)] end # flush all nested streams and return final data function take_all(stream) if stream isa Base.GenericIOBuffer - take!(stream) + seekstart(stream) + read(stream) else write(stream, TranscodingStreams.TOKEN_END) flush(stream) @@ -138,21 +169,29 @@ end const write_codecs_kws = map(reverse, read_codecs_kws) -@check function write_data( - kws=write_codecs_kws, - data=datas, - ) - stream = wrap_stream(kws, IOBuffer()) - write(stream, data) == length(data) || return false - take_all(stream) == data -end -@check function write_byte_data( - kws=write_codecs_kws, - data=datas, - ) - stream = wrap_stream(kws, IOBuffer()) - for i in 1:length(data) - write(stream, data[i]) == 1 || return false +@testset "write" begin + @check function write_data( + kws=write_codecs_kws, + data=datas, + ) + stream = wrap_stream(kws, IOBuffer()) + write(stream, data) == length(data) || return false + take_all(stream) == data || return false + is_stats_consistent(stream) || return false + true + end + @check function write_byte_data( + kws=write_codecs_kws, + data=datas, + ) + stream = wrap_stream(kws, IOBuffer()) + for i in 1:length(data) + position(stream) == i-1 || return false + is_stats_consistent(stream) || return false + write(stream, data[i]) == 1 || return false + end + take_all(stream) == data || return false + is_stats_consistent(stream) || return false + true end - take_all(stream) == data end \ No newline at end of file diff --git a/src/memory.jl b/src/memory.jl index d558076d..35c39497 100644 --- a/src/memory.jl +++ b/src/memory.jl @@ -11,10 +11,6 @@ struct Memory size::UInt end -function Memory(data::Vector{UInt8}) - return Memory(pointer(data), sizeof(data)) -end - @inline function Base.length(mem::Memory) return mem.size end diff --git a/src/noop.jl b/src/noop.jl index 8b64d91b..3b63205c 100644 --- a/src/noop.jl +++ b/src/noop.jl @@ -72,7 +72,7 @@ end function Base.seek(stream::NoopStream, pos::Integer) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seek(stream.stream, pos) initbuffer!(stream.buffer1) @@ -82,7 +82,7 @@ end function Base.seekstart(stream::NoopStream) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seekstart(stream.stream) initbuffer!(stream.buffer1) @@ -92,7 +92,7 @@ end function Base.seekend(stream::NoopStream) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seekend(stream.stream) initbuffer!(stream.buffer1) @@ -103,19 +103,23 @@ function Base.write(stream::NoopStream, b::UInt8)::Int changemode!(stream, :write) if has_sharedbuf(stream) # directly write data to the underlying stream - n = Int(write(stream.stream, b)) - return n + write(stream.stream, b) + stream.state.bytes_written_out += 1 + else + buffer1 = stream.buffer1 + marginsize(buffer1) > 0 || flush_buffer2(stream) + writebyte!(buffer1, b) end - buffer1 = stream.buffer1 - marginsize(buffer1) > 0 || flushbuffer(stream) - return writebyte!(buffer1, b) + return 1 end function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int changemode!(stream, :write) + Int(nbytes) # Error if nbytes > typemax Int if has_sharedbuf(stream) # directly write data to the underlying stream n = Int(unsafe_write(stream.stream, input, nbytes)) + stream.state.bytes_written_out += n return n end buffer = stream.buffer1 @@ -123,9 +127,10 @@ function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt): copydata!(buffer, input, Int(nbytes)) return Int(nbytes) else - flushbuffer(stream) + flush_buffer2(stream) # directly write data to the underlying stream n = Int(unsafe_write(stream.stream, input, nbytes)) + stream.state.bytes_written_out += n return n end end @@ -148,17 +153,24 @@ function stats(stream::NoopStream) buffer = stream.buffer1 @assert buffer === stream.buffer2 if mode === :idle - consumed = supplied = 0 + in = out = 0 elseif mode === :read - supplied = buffer.transcoded - consumed = supplied - buffersize(buffer) + out = buffer.transcoded - buffersize(buffer) + if has_sharedbuf(stream) + in = out + else + in = buffer.transcoded + end elseif mode === :write - supplied = buffer.transcoded + buffersize(buffer) - consumed = buffer.transcoded + out = stream.state.bytes_written_out + in = out + if !has_sharedbuf(stream) + in += buffersize(buffer) + end else throw_invalid_mode(mode) end - return Stats(consumed, supplied, supplied, supplied) + return Stats(in, out, out, out) end @@ -190,24 +202,15 @@ function fillbuffer(stream::NoopStream; eager::Bool = false)::Int return nfilled end -function flushbuffer(stream::NoopStream, all::Bool=false) - changemode!(stream, :write) - buffer = stream.buffer1 - @assert buffer === stream.buffer2 - nflushed::Int = 0 - if all - while buffersize(buffer) > 0 - nflushed += writedata!(stream.stream, buffer) - end - else - nflushed += writedata!(stream.stream, buffer) - makemargin!(buffer, 0) - end - buffer.transcoded += nflushed - return nflushed +# Empty buffer1 by writing out data. +# `stream` must be in :write mode. +# Ensure there is margin available in buffer1 for at least one byte. +function flush_buffer1(stream::NoopStream)::Nothing + flush_buffer2(stream) end +# This is always called after `flush_buffer1(stream)` function flushuntilend(stream::NoopStream) - stream.buffer1.transcoded += writedata!(stream.stream, stream.buffer1) + @assert iszero(buffersize(stream.buffer1)) return end diff --git a/src/state.jl b/src/state.jl index 0cb54b6c..134b9495 100644 --- a/src/state.jl +++ b/src/state.jl @@ -24,8 +24,11 @@ mutable struct State buffer1::Buffer buffer2::Buffer + # Number of bytes written to the underlying stream + bytes_written_out::Int64 + function State(buffer1::Buffer, buffer2::Buffer) - return new(:idle, :ok, false, Error(), buffer1, buffer2) + return new(:idle, :ok, false, Error(), buffer1, buffer2, Int64(0)) end end diff --git a/src/stream.jl b/src/stream.jl index 22ef593f..c6c5073b 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -314,7 +314,8 @@ end # Read Functions # -------------- -function Base.read(stream::TranscodingStream, ::Type{UInt8}) +# needed for `peek(stream, Char)` to work +function Base.peek(stream::TranscodingStream, ::Type{UInt8})::UInt8 # eof and ready_to_read! are inlined here because ready_to_read! is very slow and eof is broken eof = buffersize(stream.buffer1) == 0 state = stream.state @@ -325,7 +326,14 @@ function Base.read(stream::TranscodingStream, ::Type{UInt8}) if eof && sloweof(stream) throw(EOFError()) end - return readbyte!(stream.buffer1) + buf = stream.buffer1 + return buf.data[buf.bufferpos] +end + +function Base.read(stream::TranscodingStream, ::Type{UInt8})::UInt8 + x = peek(stream) + consumed!(stream.buffer1, 1) + x end function Base.readuntil(stream::TranscodingStream, delim::UInt8; keep::Bool=false) @@ -335,16 +343,18 @@ function Base.readuntil(stream::TranscodingStream, delim::UInt8; keep::Bool=fals local ret::Vector{UInt8} filled = 0 while !eof(stream) - p = findbyte(buffer1, delim) - found = false - if p < marginptr(buffer1) - found = true - sz = Int(p + 1 - bufferptr(buffer1)) - if !keep - sz -= 1 + GC.@preserve buffer1 begin + p = findbyte(buffer1, delim) + found = false + if p < marginptr(buffer1) + found = true + sz = Int(p + 1 - bufferptr(buffer1)) + if !keep + sz -= 1 + end + else + sz = buffersize(buffer1) end - else - sz = buffersize(buffer1) end if @isdefined(ret) resize!(ret, filled + sz) @@ -494,21 +504,24 @@ function Base.write(stream::TranscodingStream) return 0 end -function Base.write(stream::TranscodingStream, b::UInt8) +function Base.write(stream::TranscodingStream, b::UInt8)::Int changemode!(stream, :write) - if marginsize(stream.buffer1) == 0 && flushbuffer(stream) == 0 - return 0 - end - return writebyte!(stream.buffer1, b) + buffer1 = stream.buffer1 + marginsize(buffer1) > 0 || flush_buffer1(stream) + writebyte!(buffer1, b) + return 1 end function Base.unsafe_write(stream::TranscodingStream, input::Ptr{UInt8}, nbytes::UInt) changemode!(stream, :write) - state = stream.state + Int(nbytes) # Error if nbytes > typemax Int buffer1 = stream.buffer1 p = input p_end = p + nbytes - while p < p_end && (marginsize(buffer1) > 0 || flushbuffer(stream) > 0) + while p < p_end + if marginsize(buffer1) ≤ 0 + flush_buffer1(stream) + end m = min(marginsize(buffer1), p_end - p) copydata!(buffer1, p, m) p += m @@ -535,7 +548,7 @@ const TOKEN_END = EndToken() function Base.write(stream::TranscodingStream, ::EndToken) changemode!(stream, :write) - flushbufferall(stream) + flush_buffer1(stream) flushuntilend(stream) return 0 end @@ -543,8 +556,8 @@ end function Base.flush(stream::TranscodingStream) checkmode(stream) if stream.state.mode == :write - flushbufferall(stream) - writedata!(stream.stream, stream.buffer2) + flush_buffer1(stream) + flush_buffer2(stream) end flush(stream.stream) end @@ -594,15 +607,23 @@ function stats(stream::TranscodingStream) if mode === :idle transcoded_in = transcoded_out = in = out = 0 elseif mode === :read || mode === :stop - transcoded_in = buffer2.transcoded transcoded_out = buffer1.transcoded - in = transcoded_in + buffersize(buffer2) out = transcoded_out - buffersize(buffer1) + if has_sharedbuf(stream) + transcoded_in = stats(stream.stream).out + in = transcoded_in + else + transcoded_in = buffer2.transcoded + in = transcoded_in + buffersize(buffer2) + end elseif mode === :write transcoded_in = buffer1.transcoded - transcoded_out = buffer2.transcoded + out = state.bytes_written_out + transcoded_out = out + if !has_sharedbuf(stream) + transcoded_out += buffersize(buffer2) + end in = transcoded_in + buffersize(buffer1) - out = transcoded_out - buffersize(buffer2) else throw_invalid_mode(mode) end @@ -633,38 +654,37 @@ function fillbuffer(stream::TranscodingStream; eager::Bool = false) return nfilled end -function flushbuffer(stream::TranscodingStream, all::Bool=false) - changemode!(stream, :write) +# Empty buffer1 by writing out data. +# `stream` must be in :write mode. +# Ensure there is margin available in buffer1 for at least one byte. +function flush_buffer1(stream::TranscodingStream)::Nothing state = stream.state buffer1 = stream.buffer1 buffer2 = stream.buffer2 - nflushed::Int = 0 - while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) + while buffersize(buffer1) > 0 if state.code == :end callstartproc(stream, :write) end - writedata!(stream.stream, buffer2) - Δin, _ = callprocess(stream, buffer1, buffer2) - nflushed += Δin + flush_buffer2(stream) + callprocess(stream, buffer1, buffer2) end - return nflushed -end - -function flushbufferall(stream::TranscodingStream) - return flushbuffer(stream, true) + # move positions to the start of the buffer + @assert !iszero(makemargin!(buffer1, 0)) + return end +# This is always called after `flush_buffer1(stream)` function flushuntilend(stream::TranscodingStream) - changemode!(stream, :write) state = stream.state buffer1 = stream.buffer1 buffer2 = stream.buffer2 + @assert buffersize(buffer1) == 0 + @assert stream.state.mode === :write while state.code != :end - writedata!(stream.stream, buffer2) + flush_buffer2(stream) callprocess(stream, buffer1, buffer2) end - writedata!(stream.stream, buffer2) - @assert buffersize(buffer1) == 0 + flush_buffer2(stream) return end @@ -685,9 +705,11 @@ end # Call `process` with prologue and epilogue. function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) state = stream.state - input = buffermem(inbuf) - GC.@preserve inbuf makemargin!(outbuf, minoutsize(stream.codec, input)) - Δin, Δout, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error) + makemargin!( + outbuf, + GC.@preserve(inbuf, minoutsize(stream.codec, buffermem(inbuf))), + ) + Δin::Int, Δout::Int, state.code = GC.@preserve inbuf outbuf process(stream.codec, buffermem(inbuf), marginmem(outbuf), state.error) @debug( "called process()", code = state.code, @@ -696,8 +718,18 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) input_delta = Δin, output_delta = Δout, ) - consumed!(inbuf, Δin, transcode = true) - supplied!(outbuf, Δout, transcode = true) + consumed!(inbuf, Δin; + transcode = !has_sharedbuf(stream) || stream.state.mode === :write, + ) # inbuf is buffer1 if mode is :write + supplied!(outbuf, Δout; + transcode = !has_sharedbuf(stream) || stream.state.mode !== :write, + ) # outbuf is buffer1 if mode is not :write + if has_sharedbuf(stream) + if stream.state.mode === :write + # this must be updated before throwing any error if outbuf is shared. + stream.state.bytes_written_out += Δout + end + end if state.code == :error changemode!(stream, :panic) elseif state.code == :ok && Δin == Δout == 0 @@ -745,20 +777,28 @@ function readdata!(input::IO, output::Buffer)::Int end # Write all data to `output` from the buffer of `input`. -function writedata!(output::IO, input::Buffer) - if output isa TranscodingStream && output.buffer1 === input +function flush_buffer2(stream::TranscodingStream)::Nothing + output = stream.stream + buffer2 = stream.buffer2 + state = stream.state + @assert state.mode === :write + if has_sharedbuf(stream) # Delegate the operation to the underlying stream for shared buffers. - return flushbufferall(output) - end - nwritten::Int = 0 - while buffersize(input) > 0 - n = GC.@preserve input Base.unsafe_write(output, bufferptr(input), buffersize(input)) - consumed!(input, n) - nwritten += n + changemode!(output, :write) + flush_buffer1(output) + else + while buffersize(buffer2) > 0 + n::Int = GC.@preserve buffer2 Base.unsafe_write(output, bufferptr(buffer2), buffersize(buffer2)) + n ≤ 0 && error("short write") + consumed!(buffer2, n) + state.bytes_written_out += n + GC.safepoint() + end + # move positions to the start of the buffer + @assert !iszero(makemargin!(buffer2, 0)) GC.safepoint() end - GC.safepoint() - return nwritten + nothing end @@ -800,7 +840,7 @@ function changemode!(stream::TranscodingStream, newmode::Symbol) end elseif mode == :write if newmode == :close - flushbufferall(stream) + flush_buffer1(stream) flushuntilend(stream) state.mode = newmode finalize_codec(stream.codec, state.error) diff --git a/src/transcode.jl b/src/transcode.jl index 32cb5bee..fca33736 100644 --- a/src/transcode.jl +++ b/src/transcode.jl @@ -46,10 +46,10 @@ function Base.transcode(codec::Type{C}, src::String) where {C<:Codec} end _default_output_buffer(codec, input) = Buffer( - initial_output_size( + GC.@preserve(input, initial_output_size( codec, buffermem(input) - ) + )) ) """ @@ -125,8 +125,7 @@ function transcode!( Base.mightalias(input.data, output.data) && error( "input and outbut buffers must be independent" ) - # GC.@preserve since unsafe_transcode! may convert to raw pointers - GC.@preserve input output codec unsafe_transcode!(output, codec, input) + unsafe_transcode!(output, codec, input) end """ @@ -148,10 +147,10 @@ function unsafe_transcode!( if code === :error @goto error end - n = minoutsize(codec, buffermem(input)) + n = GC.@preserve input minoutsize(codec, buffermem(input)) @label process makemargin!(output, n) - Δin, Δout, code = process(codec, buffermem(input), marginmem(output), error) + Δin, Δout, code = GC.@preserve input output process(codec, buffermem(input), marginmem(output), error) @debug( "called process()", code = code, @@ -169,13 +168,13 @@ function unsafe_transcode!( if startproc(codec, :write, error) === :error @goto error end - n = minoutsize(codec, buffermem(input)) + n = GC.@preserve input minoutsize(codec, buffermem(input)) @goto process end resize!(output.data, output.marginpos - 1) return output.data else - n = max(Δout, minoutsize(codec, buffermem(input))) + n = GC.@preserve input max(Δout, minoutsize(codec, buffermem(input))) @goto process end @label error diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index 64b6d83a..7730d8a4 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -303,6 +303,99 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD end end + @testset "stats" begin + @testset "read" begin + stream = DoubleFrameEncoderStream(IOBuffer(b"foobar")) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 16 + @test stat.out == 16 + close(stream) + + #nested Streams + stream = DoubleFrameDecoderStream(DoubleFrameEncoderStream(IOBuffer(b"foobar"))) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 16 + @test stat.transcoded_in == 16 + @test stat.transcoded_out == 6 + @test stat.out == 6 + close(stream) + end + + @testset "write" begin + stream = DoubleFrameEncoderStream(IOBuffer()) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 0 + @test stat.transcoded_out == 0 + @test stat.out == 0 + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 14 + @test stat.out == 14 + write(stream, TranscodingStreams.TOKEN_END) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 16 + @test stat.out == 16 + close(stream) + + #nested Streams + stream = DoubleFrameDecoderStream(DoubleFrameEncoderStream(IOBuffer())) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + write(stream, b"[ ffoooobbaarr ]") + stat = TranscodingStreams.stats(stream) + @test stat.in == 16 + @test stat.transcoded_in == 0 + @test stat.transcoded_out == 0 + @test stat.out == 0 + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 16 + @test stat.transcoded_in == 16 + @test stat.transcoded_out == 6 + @test stat.out == 6 + @test position(stream.stream) == 6 + close(stream) + end + end + + @testset "underlying stream fails" begin + sink = IOBuffer(;maxsize=4) + stream = DoubleFrameEncoderStream(sink) + @test write(stream, "abcd") == 4 + # make sure flush doesn't go into an infinite loop + @test_throws ErrorException("short write") flush(stream) + end + + @testset "peek" begin + stream = DoubleFrameDecoderStream(DoubleFrameEncoderStream(IOBuffer( + codeunits("こんにちは") + ))) + @test peek(stream) == 0xe3 + @test peek(stream, Char) == 'こ' + @test peek(stream, Int32) == -476872221 + close(stream) + end + test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_write(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_lines(DoubleFrameEncoderStream, DoubleFrameDecoderStream) diff --git a/test/codecnoop.jl b/test/codecnoop.jl index 6203787b..9db6bc14 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -24,13 +24,14 @@ using FillArrays: Zeros stream = TranscodingStream(Noop(), IOBuffer()) @test_throws EOFError read(stream, UInt8) - @test_throws EOFError unsafe_read(stream, pointer(Vector{UInt8}(undef, 3)), 3) + data = Vector{UInt8}(undef, 3) + @test_throws EOFError GC.@preserve data unsafe_read(stream, pointer(data), 3) close(stream) stream = TranscodingStream(Noop(), IOBuffer("foobar"), bufsize=1) @test read(stream, UInt8) === UInt8('f') data = Vector{UInt8}(undef, 5) - unsafe_read(stream, pointer(data), 5) === nothing + GC.@preserve data unsafe_read(stream, pointer(data), 5) === nothing @test data == b"oobar" close(stream) @@ -122,7 +123,7 @@ using FillArrays: Zeros stream = TranscodingStream(Noop(), IOBuffer("foo")) out = zeros(UInt8, 3) @test bytesavailable(stream) == 0 - @test TranscodingStreams.unsafe_read(stream, pointer(out), 10) == 3 + @test GC.@preserve out TranscodingStreams.unsafe_read(stream, pointer(out), 10) == 3 @test out == b"foo" close(stream) @@ -171,22 +172,65 @@ using FillArrays: Zeros @test s1.buffer1 === s2.buffer1 === s3.buffer1 === s1.buffer2 === s2.buffer2 === s3.buffer2 - stream = TranscodingStream(Noop(), IOBuffer(b"foobar")) - @test TranscodingStreams.stats(stream).in === Int64(0) - @test TranscodingStreams.stats(stream).out === Int64(0) - read(stream) - @test TranscodingStreams.stats(stream).in === Int64(6) - @test TranscodingStreams.stats(stream).out === Int64(6) - close(stream) + @testset "stats" begin + stream = TranscodingStream(Noop(), IOBuffer(b"foobar")) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + eof(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(0) + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) - stream = TranscodingStream(Noop(), IOBuffer()) - @test TranscodingStreams.stats(stream).in === Int64(0) - @test TranscodingStreams.stats(stream).out === Int64(0) - write(stream, b"foobar") - flush(stream) - @test TranscodingStreams.stats(stream).in === Int64(6) - @test TranscodingStreams.stats(stream).out === Int64(6) - close(stream) + #nested NoopStreams + stream = NoopStream(NoopStream(IOBuffer(b"foobar"))) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + eof(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + + stream = TranscodingStream(Noop(), IOBuffer()) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(0) + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + + #nested NoopStreams + stream = NoopStream(NoopStream(IOBuffer())) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + end stream = TranscodingStream(Noop(), IOBuffer()) @test stream.state.mode == :idle @@ -341,7 +385,8 @@ using FillArrays: Zeros @test eof(stream) stream = NoopStream(IOBuffer("foobar")) - @test_throws ArgumentError TranscodingStreams.unsafe_unread(stream, pointer(b"foo"), -1) + data = b"foo" + @test_throws ArgumentError GC.@preserve data TranscodingStreams.unsafe_unread(stream, pointer(data), -1) close(stream) stream = NoopStream(IOBuffer("foo")) @@ -498,4 +543,12 @@ using FillArrays: Zeros @test take!(sink) == b"abcd" end + @testset "peek" begin + stream = NoopStream(IOBuffer(codeunits("こんにちは"))) + @test peek(stream) == 0xe3 + @test peek(stream, Char) == 'こ' + @test peek(stream, Int32) == -476872221 + close(stream) + end + end diff --git a/test/codecquadruple.jl b/test/codecquadruple.jl index 31a0ef53..fe481b67 100644 --- a/test/codecquadruple.jl +++ b/test/codecquadruple.jl @@ -81,7 +81,8 @@ end close(stream2) stream = TranscodingStream(QuadrupleCodec(), IOBuffer("foo")) - @test_throws EOFError unsafe_read(stream, pointer(Vector{UInt8}(undef, 13)), 13) + data = Vector{UInt8}(undef, 13) + @test_throws EOFError GC.@preserve data unsafe_read(stream, pointer(data), 13) close(stream) @testset "position" begin diff --git a/test/runtests.jl b/test/runtests.jl index 38f8d5e4..2bca20af 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -26,13 +26,15 @@ using TranscodingStreams: data = Vector{UInt8}(b"foobar") buf = Buffer(data) - @test buf isa Buffer - @test bufferptr(buf) === pointer(data) - @test buffersize(buf) === 6 - @test buffermem(buf) === Memory(pointer(data), 6) - @test marginptr(buf) === pointer(data) + 6 - @test marginsize(buf) === 0 - @test marginmem(buf) === Memory(pointer(data)+6, 0) + GC.@preserve data buf begin + @test buf isa Buffer + @test bufferptr(buf) === pointer(data) + @test buffersize(buf) === 6 + @test buffermem(buf) === Memory(pointer(data), 6) + @test marginptr(buf) === pointer(data) + 6 + @test marginsize(buf) === 0 + @test marginmem(buf) === Memory(pointer(data)+6, 0) + end buf = Buffer(2) writebyte!(buf, 0x34) @@ -94,7 +96,7 @@ end end data = Vector{UInt8}(b"foobar") - GC.@preserve data let mem = TranscodingStreams.Memory(data) + GC.@preserve data let mem = TranscodingStreams.Memory(pointer(data), sizeof(data)) @test mem isa TranscodingStreams.Memory @test mem.ptr == pointer(data) @test mem.size == sizeof(data)