Skip to content

Commit

Permalink
Merge branch 'master' into nz/Remove-buggy-memory-constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 authored Jun 23, 2024
2 parents 552d6c3 + 74ac5c8 commit ff4ea0c
Show file tree
Hide file tree
Showing 13 changed files with 412 additions and 180 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name = "TranscodingStreams"
uuid = "3bb67fe8-82b1-5028-8e26-92a6c54297fa"
license = "MIT"
authors = ["Kenta Sato <[email protected]>"]
version = "0.10.8"
version = "0.10.9"

[deps]
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Expand Down
7 changes: 5 additions & 2 deletions docs/src/devnotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ default buffer size is 16KiB for each.
- `error`: exception returned by the codec (`<:Error`)
- `buffer1`: data buffer that is closer to the user (`<:Buffer`)
- `buffer2`: data buffer that is farther to the user (`<:Buffer`)
- `bytes_written_out`: number of bytes written to the underlying stream (`<:Int64`)

The `mode` field may be one of the following value:
- `:idle` : initial and intermediate mode, no buffered data
Expand Down Expand Up @@ -78,8 +79,10 @@ Shared buffers
Adjacent transcoding streams may share their buffers. This will reduce memory
allocation and eliminate data copy between buffers.

`readdata!(input::IO, output::Buffer)` and `writedata!(output::IO,
input::Buffer)` do the actual work of read/write data from/to the underlying
If `buffer2` is shared it is considered to be owned by the underlying stream
by the `stats` and `position` functions.

`readdata!(input::IO, output::Buffer)` and `flush_buffer2(stream::TranscodingStream)` do the actual work of read/write data from/to the underlying
stream. These methods have a special pass for shared buffers.


Expand Down
2 changes: 1 addition & 1 deletion docs/src/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ using CodecZlib

function decompress(input, output)
buffer = Vector{UInt8}(undef, 16 * 1024)
while !eof(input)
GC.@preserve buffer while !eof(input)
n = min(bytesavailable(input), length(buffer))
unsafe_read(input, pointer(buffer), n)
unsafe_write(output, pointer(buffer), n)
Expand Down
131 changes: 85 additions & 46 deletions fuzz/fuzz.jl
Original file line number Diff line number Diff line change
Expand Up @@ -91,44 +91,75 @@ read_methods = Data.SampledFrom([
end
])


@check function read_byte_data(
kws=read_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer(data))
for i in eachindex(data)
read(stream, UInt8) == data[i] || return false
# Return true if the stats of a stream are self consistent
# This function assumes stream was never seeked.
function is_stats_consistent(stream)
if stream isa TranscodingStream
s = TranscodingStreams.stats(stream)
inner_pos = position(stream.stream)
pos = position(stream)
# event!("stats(stream)", s)
# event!("position(stream.stream)", inner_pos)
# event!("position(stream)", pos)
if isreadable(stream)
s.out == pos || return false
s.in == inner_pos || return false
else
iswritable(stream) || return false
s.in == pos || return false
s.out == inner_pos || return false
end
s.transcoded_in s.in || return false
s.transcoded_out s.out || return false
end
eof(stream)
end
@check function read_data(
kws=read_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer(data))
read(stream) == data || return false
eof(stream)
true
end
@check function read_data_methods(
kws=read_codecs_kws,
data=datas,
rs=Data.Vectors(read_methods),
)
stream = wrap_stream(kws, IOBuffer(data))
x = UInt8[]
for r in rs
d = r(stream)
append!(x, d)
length(x) == position(stream) || return false

@testset "read" begin
@check function read_byte_data(
kws=read_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer(data))
for i in 1:length(data)
position(stream) == i-1 || return false
is_stats_consistent(stream) || return false
read(stream, UInt8) == data[i] || return false
end
is_stats_consistent(stream) || return false
eof(stream)
end
@check function read_data(
kws=read_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer(data))
read(stream) == data || return false
is_stats_consistent(stream) || return false
eof(stream)
end
@check function read_data_methods(
kws=read_codecs_kws,
data=datas,
rs=Data.Vectors(read_methods),
)
stream = wrap_stream(kws, IOBuffer(data))
x = UInt8[]
for r in rs
d = r(stream)
append!(x, d)
length(x) == position(stream) || return false
end
is_stats_consistent(stream) || return false
x == data[eachindex(x)]
end
x == data[eachindex(x)]
end

# flush all nested streams and return final data
function take_all(stream)
if stream isa Base.GenericIOBuffer
take!(stream)
seekstart(stream)
read(stream)
else
write(stream, TranscodingStreams.TOKEN_END)
flush(stream)
Expand All @@ -138,21 +169,29 @@ end

const write_codecs_kws = map(reverse, read_codecs_kws)

@check function write_data(
kws=write_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer())
write(stream, data) == length(data) || return false
take_all(stream) == data
end
@check function write_byte_data(
kws=write_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer())
for i in 1:length(data)
write(stream, data[i]) == 1 || return false
@testset "write" begin
@check function write_data(
kws=write_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer())
write(stream, data) == length(data) || return false
take_all(stream) == data || return false
is_stats_consistent(stream) || return false
true
end
@check function write_byte_data(
kws=write_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer())
for i in 1:length(data)
position(stream) == i-1 || return false
is_stats_consistent(stream) || return false
write(stream, data[i]) == 1 || return false
end
take_all(stream) == data || return false
is_stats_consistent(stream) || return false
true
end
take_all(stream) == data
end
4 changes: 0 additions & 4 deletions src/memory.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ struct Memory
size::UInt
end

function Memory(data::Vector{UInt8})
return Memory(pointer(data), sizeof(data))
end

@inline function Base.length(mem::Memory)
return mem.size
end
Expand Down
65 changes: 34 additions & 31 deletions src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ end
function Base.seek(stream::NoopStream, pos::Integer)
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
flush_buffer2(stream)
end
seek(stream.stream, pos)
initbuffer!(stream.buffer1)
Expand All @@ -82,7 +82,7 @@ end
function Base.seekstart(stream::NoopStream)
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
flush_buffer2(stream)
end
seekstart(stream.stream)
initbuffer!(stream.buffer1)
Expand All @@ -92,7 +92,7 @@ end
function Base.seekend(stream::NoopStream)
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
flush_buffer2(stream)
end
seekend(stream.stream)
initbuffer!(stream.buffer1)
Expand All @@ -103,29 +103,34 @@ function Base.write(stream::NoopStream, b::UInt8)::Int
changemode!(stream, :write)
if has_sharedbuf(stream)
# directly write data to the underlying stream
n = Int(write(stream.stream, b))
return n
write(stream.stream, b)
stream.state.bytes_written_out += 1
else
buffer1 = stream.buffer1
marginsize(buffer1) > 0 || flush_buffer2(stream)
writebyte!(buffer1, b)
end
buffer1 = stream.buffer1
marginsize(buffer1) > 0 || flushbuffer(stream)
return writebyte!(buffer1, b)
return 1
end

function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int
changemode!(stream, :write)
Int(nbytes) # Error if nbytes > typemax Int
if has_sharedbuf(stream)
# directly write data to the underlying stream
n = Int(unsafe_write(stream.stream, input, nbytes))
stream.state.bytes_written_out += n
return n
end
buffer = stream.buffer1
if marginsize(buffer) nbytes
copydata!(buffer, input, Int(nbytes))
return Int(nbytes)
else
flushbuffer(stream)
flush_buffer2(stream)
# directly write data to the underlying stream
n = Int(unsafe_write(stream.stream, input, nbytes))
stream.state.bytes_written_out += n
return n
end
end
Expand All @@ -148,17 +153,24 @@ function stats(stream::NoopStream)
buffer = stream.buffer1
@assert buffer === stream.buffer2
if mode === :idle
consumed = supplied = 0
in = out = 0
elseif mode === :read
supplied = buffer.transcoded
consumed = supplied - buffersize(buffer)
out = buffer.transcoded - buffersize(buffer)
if has_sharedbuf(stream)
in = out
else
in = buffer.transcoded
end
elseif mode === :write
supplied = buffer.transcoded + buffersize(buffer)
consumed = buffer.transcoded
out = stream.state.bytes_written_out
in = out
if !has_sharedbuf(stream)
in += buffersize(buffer)
end
else
throw_invalid_mode(mode)
end
return Stats(consumed, supplied, supplied, supplied)
return Stats(in, out, out, out)
end


Expand Down Expand Up @@ -190,24 +202,15 @@ function fillbuffer(stream::NoopStream; eager::Bool = false)::Int
return nfilled
end

function flushbuffer(stream::NoopStream, all::Bool=false)
changemode!(stream, :write)
buffer = stream.buffer1
@assert buffer === stream.buffer2
nflushed::Int = 0
if all
while buffersize(buffer) > 0
nflushed += writedata!(stream.stream, buffer)
end
else
nflushed += writedata!(stream.stream, buffer)
makemargin!(buffer, 0)
end
buffer.transcoded += nflushed
return nflushed
# Empty buffer1 by writing out data.
# `stream` must be in :write mode.
# Ensure there is margin available in buffer1 for at least one byte.
function flush_buffer1(stream::NoopStream)::Nothing
flush_buffer2(stream)
end

# This is always called after `flush_buffer1(stream)`
function flushuntilend(stream::NoopStream)
stream.buffer1.transcoded += writedata!(stream.stream, stream.buffer1)
@assert iszero(buffersize(stream.buffer1))
return
end
5 changes: 4 additions & 1 deletion src/state.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ mutable struct State
buffer1::Buffer
buffer2::Buffer

# Number of bytes written to the underlying stream
bytes_written_out::Int64

function State(buffer1::Buffer, buffer2::Buffer)
return new(:idle, :ok, false, Error(), buffer1, buffer2)
return new(:idle, :ok, false, Error(), buffer1, buffer2, Int64(0))
end
end

Expand Down
Loading

0 comments on commit ff4ea0c

Please sign in to comment.