Skip to content

Commit

Permalink
Fix position of underlying stream when sharing buffers (#222)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 authored Jun 18, 2024
1 parent b5b03e6 commit 6b13d1f
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 29 deletions.
55 changes: 36 additions & 19 deletions fuzz/fuzz.jl
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,41 @@ read_methods = Data.SampledFrom([
end
])

# Return true if the stats of a stream are self consistent
# This function assumes stream was never seeked.
function is_stats_consistent(stream)
if stream isa TranscodingStream
s = TranscodingStreams.stats(stream)
inner_pos = position(stream.stream)
pos = position(stream)
# event!("stats(stream)", s)
# event!("position(stream.stream)", inner_pos)
# event!("position(stream)", pos)
if isreadable(stream)
s.out == pos || return false
s.in == inner_pos || return false
else
iswritable(stream) || return false
s.in == pos || return false
s.out == inner_pos || return false
end
s.transcoded_in s.in || return false
s.transcoded_out s.out || return false
end
true
end

@check function read_byte_data(
kws=read_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer(data))
for i in eachindex(data)
for i in 1:length(data)
position(stream) == i-1 || return false
is_stats_consistent(stream) || return false
read(stream, UInt8) == data[i] || return false
end
is_stats_consistent(stream) || return false
eof(stream)
end
@check function read_data(
Expand All @@ -108,6 +134,7 @@ end
)
stream = wrap_stream(kws, IOBuffer(data))
read(stream) == data || return false
is_stats_consistent(stream) || return false
eof(stream)
end
@check function read_data_methods(
Expand All @@ -122,13 +149,15 @@ end
append!(x, d)
length(x) == position(stream) || return false
end
is_stats_consistent(stream) || return false
x == data[eachindex(x)]
end

# flush all nested streams and return final data
function take_all(stream)
if stream isa Base.GenericIOBuffer
take!(stream)
seekstart(stream)
read(stream)
else
write(stream, TranscodingStreams.TOKEN_END)
flush(stream)
Expand All @@ -144,7 +173,9 @@ const write_codecs_kws = map(reverse, read_codecs_kws)
)
stream = wrap_stream(kws, IOBuffer())
write(stream, data) == length(data) || return false
take_all(stream) == data
take_all(stream) == data || return false
is_stats_consistent(stream) || return false
true
end
@check function write_byte_data(
kws=write_codecs_kws,
Expand All @@ -153,24 +184,10 @@ end
stream = wrap_stream(kws, IOBuffer())
for i in 1:length(data)
position(stream) == i-1 || return false
if stream isa TranscodingStream
s = TranscodingStreams.stats(stream)
s.in == i-1 || return false
# TODO fix position(stream.stream)
# s.out == position(stream.stream) || return false
# s.transcoded_in == s.out || return false
# s.transcoded_out == s.out || return false
end
is_stats_consistent(stream) || return false
write(stream, data[i]) == 1 || return false
end
take_all(stream) == data || return false
if stream isa TranscodingStream
s = TranscodingStreams.stats(stream)
s.in == length(data) || return false
# TODO fix position(stream.stream)
# s.out == position(stream.stream) || return false
# s.transcoded_in == s.out || return false
# s.transcoded_out == s.out || return false
end
is_stats_consistent(stream) || return false
true
end
8 changes: 6 additions & 2 deletions src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,12 @@ function stats(stream::NoopStream)
if mode === :idle
in = out = 0
elseif mode === :read
in = buffer.transcoded
out = in - buffersize(buffer)
out = buffer.transcoded - buffersize(buffer)
if has_sharedbuf(stream)
in = out
else
in = buffer.transcoded
end
elseif mode === :write
out = stream.state.bytes_written_out
in = out
Expand Down
17 changes: 13 additions & 4 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,15 @@ function stats(stream::TranscodingStream)
if mode === :idle
transcoded_in = transcoded_out = in = out = 0
elseif mode === :read || mode === :stop
transcoded_in = buffer2.transcoded
transcoded_out = buffer1.transcoded
in = transcoded_in + buffersize(buffer2)
out = transcoded_out - buffersize(buffer1)
if has_sharedbuf(stream)
transcoded_in = stats(stream.stream).out
in = transcoded_in
else
transcoded_in = buffer2.transcoded
in = transcoded_in + buffersize(buffer2)
end
elseif mode === :write
transcoded_in = buffer1.transcoded
out = state.bytes_written_out
Expand Down Expand Up @@ -709,8 +714,12 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
input_delta = Δin,
output_delta = Δout,
)
consumed!(inbuf, Δin, transcode = true)
supplied!(outbuf, Δout, transcode = true)
consumed!(inbuf, Δin;
transcode = !has_sharedbuf(stream) || stream.state.mode === :write,
) # inbuf is buffer1 if mode is :write
supplied!(outbuf, Δout;
transcode = !has_sharedbuf(stream) || stream.state.mode !== :write,
) # outbuf is buffer1 if mode is not :write
if has_sharedbuf(stream)
if stream.state.mode === :write
# this must be updated before throwing any error if outbuf is shared.
Expand Down
6 changes: 3 additions & 3 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
@test stat.out == 0
read(stream)
stat = TranscodingStreams.stats(stream)
@test_broken stat.in == 16
@test_broken stat.transcoded_in == 16
@test stat.in == 16
@test stat.transcoded_in == 16
@test stat.transcoded_out == 6
@test stat.out == 6
close(stream)
Expand Down Expand Up @@ -373,7 +373,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
@test stat.transcoded_in == 16
@test stat.transcoded_out == 6
@test stat.out == 6
@test_broken position(stream.stream) == 6
@test position(stream.stream) == 6
close(stream)
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/codecnoop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ using FillArrays: Zeros
@test stat.out === Int64(0)
eof(stream)
stat = TranscodingStreams.stats(stream)
@test_broken stat.in === Int64(0)
@test stat.in === Int64(0)
@test stat.out === Int64(0)
read(stream)
stat = TranscodingStreams.stats(stream)
Expand Down

0 comments on commit 6b13d1f

Please sign in to comment.