Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking position and eof fixes #196

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/src/devnotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,10 @@ The `mode` field may be one of the following value:
- `:idle` : initial and intermediate mode, no buffered data
- `:read` : being ready to read data, data may be buffered
- `:write`: being ready to write data, data may be buffered
- `:stop` : transcoding is stopped after read, data may be buffered
- `:close`: closed, no buffered data
- `:panic`: an exception has been thrown in codec, data may be buffered but we
cannot do anything

Note that `mode=:stop` does not mean there is no data available in the stream.
This is because transcoded data may be left in the buffer.

The initial mode is `:idle` and mode transition happens as shown in the
following diagram:
Expand Down
3 changes: 1 addition & 2 deletions fuzz/fuzz.jl
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ end
for r in rs
d = r(stream)
append!(x, d)
# TODO fix position
# length(x) == position(stream) || return false
length(x) == position(stream) || return false
end
x == data[eachindex(x)]
end
Expand Down
59 changes: 34 additions & 25 deletions src/buffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# position 1 markpos bufferpos marginpos lastindex(data)
#
# `markpos` is positive iff there are marked data; otherwise it is set to zero.
# `markpos` ≤ `bufferpos` ≤ `marginpos` must hold whenever possible.
# `markpos` ≤ `marginpos` and `bufferpos` ≤ `marginpos` must hold.

mutable struct Buffer
# data and positions (see above)
Expand All @@ -23,8 +23,9 @@
bufferpos::Int
marginpos::Int

# the total number of transcoded bytes
transcoded::Int64
# total number of bytes shifted by makemargin!
# used to keep track of stream position
shifted::Int64

function Buffer(data::Vector{UInt8}, marginpos::Integer=length(data)+1)
@assert 1 <= marginpos <= length(data)+1
Expand Down Expand Up @@ -93,41 +94,30 @@
end

# Notify that `n` bytes are consumed from `buf`.
function consumed!(buf::Buffer, n::Integer; transcode::Bool = false)
function consumed!(buf::Buffer, n::Integer)
buf.bufferpos += n
if transcode
buf.transcoded += n
end
return buf
end

# Notify that `n` bytes are supplied to `buf`.
function supplied!(buf::Buffer, n::Integer; transcode::Bool = false)
function supplied!(buf::Buffer, n::Integer)
buf.marginpos += n
if transcode
buf.transcoded += n
end
return buf
end

# Discard buffered data and initialize positions.
function initbuffer!(buf::Buffer)
buf.markpos = buf.transcoded = 0
buf.markpos = buf.shifted = 0
buf.bufferpos = buf.marginpos = 1
return buf
end

# Remove all buffered data.
function emptybuffer!(buf::Buffer)
buf.marginpos = buf.bufferpos
return buf
end

# Make margin with ≥`minsize` and return the size of it.
# If eager is true, it tries to move data even when the buffer has enough margin.
function makemargin!(buf::Buffer, minsize::Integer; eager::Bool = false)
@assert minsize ≥ 0
if buffersize(buf) == 0 && buf.markpos == 0
buf.shifted += buf.bufferpos - 1
buf.bufferpos = buf.marginpos = 1
end
if marginsize(buf) < minsize || eager
Expand All @@ -138,10 +128,9 @@
datapos = buf.bufferpos
datasize = buffersize(buf)
else
# Else, we must not consume marked data
# (Since markpos ≤ bufferpos, we do not consume buffered data either)
datapos = buf.markpos
datasize = buf.marginpos - buf.markpos
# Else, we must not consume marked data or buffered data
datapos = min(buf.markpos, buf.bufferpos)
datasize = buf.marginpos - datapos
end
# Shift data left in buffer to make space for new data
copyto!(buf.data, 1, buf.data, datapos, datasize)
Expand All @@ -151,6 +140,7 @@
end
buf.bufferpos -= shift
buf.marginpos -= shift
buf.shifted += shift
end
# If there is still not enough margin, we expand buffer.
# At least enough for minsize, but otherwise 1.5 times
Expand Down Expand Up @@ -203,10 +193,29 @@

# Insert data to the current buffer.
function insertdata!(buf::Buffer, data::Ptr{UInt8}, nbytes::Integer)
makemargin!(buf, nbytes)
copyto!(buf.data, buf.bufferpos + nbytes, buf.data, buf.bufferpos, buffersize(buf))
front_space_needed = nbytes - buf.bufferpos + 1
if front_space_needed > 0
if front_space_needed > marginsize(buf)
# make more space
resize!(buf.data, buf.marginpos + front_space_needed - 1)

Check warning on line 200 in src/buffer.jl

View check run for this annotation

Codecov / codecov/patch

src/buffer.jl#L200

Added line #L200 was not covered by tests
end
@assert front_space_needed ≤ marginsize(buf)
# make space in front by shifting data to margin
for i in (buf.marginpos-1 + front_space_needed):-1:(1+nbytes)
buf.data[i] = buf.data[i-front_space_needed]
end

Check warning on line 206 in src/buffer.jl

View check run for this annotation

Codecov / codecov/patch

src/buffer.jl#L205-L206

Added lines #L205 - L206 were not covered by tests
if buf.markpos > 0
buf.markpos += front_space_needed

Check warning on line 208 in src/buffer.jl

View check run for this annotation

Codecov / codecov/patch

src/buffer.jl#L208

Added line #L208 was not covered by tests
end
buf.bufferpos += front_space_needed
buf.marginpos += front_space_needed
buf.shifted -= front_space_needed
end
# here is the really spooky part where sometimes `markpos` > `bufferpos`
buf.bufferpos -= nbytes
@assert buf.bufferpos > 0
@assert buffersize(buf) ≥ nbytes
GC.@preserve buf unsafe_copyto!(bufferptr(buf), data, nbytes)
supplied!(buf, nbytes)
return buf
end

Expand Down
152 changes: 87 additions & 65 deletions src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,69 +44,91 @@
return TranscodingStream(codec, stream, state)
end

"""
position(stream::NoopStream)

Get the current poition of `stream`.

Note that this method may return a wrong position when
- some data have been inserted by `TranscodingStreams.unread`, or
- the position of the wrapped stream has been changed outside of this package.
"""
function Base.position(stream::NoopStream)
function Base.position(stream::NoopStream)::Int64
mode = stream.state.mode
if mode === :idle
return Int64(0)
elseif mode === :write
return position(stream.stream) + buffersize(stream.buffer1)
elseif mode === :read
return position(stream.stream) - buffersize(stream.buffer1)
if has_sharedbuf(stream)
if mode === :idle || mode === :read || mode === :write
return position(stream.stream) - something(stream.state.offset)
else
throw_invalid_mode(mode)

Check warning on line 53 in src/noop.jl

View check run for this annotation

Codecov / codecov/patch

src/noop.jl#L53

Added line #L53 was not covered by tests
end
else
throw_invalid_mode(mode)
buffer1 = stream.buffer1
if mode === :idle
return Int64(0)
elseif mode === :write
return buffer1.shifted + buffer1.marginpos - 1
elseif mode === :read
return buffer1.shifted + buffer1.bufferpos - 1
else
throw_invalid_mode(mode)
end
end
@assert false "unreachable"
end

function Base.seek(stream::NoopStream, pos::Integer)
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
if has_sharedbuf(stream)
seek(stream.stream, pos)

Check warning on line 71 in src/noop.jl

View check run for this annotation

Codecov / codecov/patch

src/noop.jl#L71

Added line #L71 was not covered by tests
else
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
end
seek(stream.stream, pos)
initbuffer!(stream.buffer1)
stream.buffer1.shifted = pos
end
seek(stream.stream, pos)
initbuffer!(stream.buffer1)
stream.state.offset = 0
return stream
end

function Base.seekstart(stream::NoopStream)
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
if has_sharedbuf(stream)
seekstart(stream.stream)
else
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
end
seekstart(stream.stream)
initbuffer!(stream.buffer1)
end
seekstart(stream.stream)
initbuffer!(stream.buffer1)
stream.state.offset = 0
return stream
end

function Base.seekend(stream::NoopStream)
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
isnothing(stream.state.offset) && throw_invalid_offset(stream.stream)
if has_sharedbuf(stream)
seekend(stream.stream)
else
mode = stream.state.mode
if mode === :write
flushbuffer(stream)
end
seekend(stream.stream)
initbuffer!(stream.buffer1)
stream.buffer1.shifted = position(stream.stream) - something(stream.state.offset)
end
seekend(stream.stream)
initbuffer!(stream.buffer1)
return stream
end

function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)
changemode!(stream, :write)
buffer = stream.buffer1
if marginsize(buffer) ≥ nbytes
copydata!(buffer, input, nbytes)
return Int(nbytes)
else
flushbuffer(stream)
# directly write data to the underlying stream
if has_sharedbuf(stream)
return unsafe_write(stream.stream, input, nbytes)
else
buffer = stream.buffer1
if marginsize(buffer) ≥ nbytes
copydata!(buffer, input, nbytes)
return Int(nbytes)
else
flushbuffer(stream)
# directly write data to the underlying stream
n = unsafe_write(stream.stream, input, nbytes)
buffer.shifted += n
return n
end
end
end

Expand All @@ -125,20 +147,20 @@
function stats(stream::NoopStream)
state = stream.state
mode = state.mode
buffer = stream.buffer1
@assert buffer === stream.buffer2
if mode === :idle
consumed = supplied = 0
elseif mode === :read
supplied = buffer.transcoded
consumed = supplied - buffersize(buffer)
isnothing(stream.state.offset) && throw_invalid_offset(stream.stream)
supplied = position(stream.stream) - something(stream.state.offset)
consumed = position(stream)
elseif mode === :write
supplied = buffer.transcoded + buffersize(buffer)
consumed = buffer.transcoded
isnothing(stream.state.offset) && throw_invalid_offset(stream.stream)
supplied = position(stream)
consumed = position(stream.stream) - something(stream.state.offset)
else
throw_invalid_mode(mode)
end
return Stats(consumed, supplied, supplied, supplied)
return Stats(supplied, consumed, supplied, supplied)
end


Expand All @@ -148,26 +170,26 @@
# These methods are overloaded for the `Noop` codec because it has only one
# buffer for efficiency.

function fillbuffer(stream::NoopStream; eager::Bool = false)::Int
@noinline function sloweof(stream::NoopStream)::Bool
changemode!(stream, :read)
buffer = stream.buffer1
@assert buffer === stream.buffer2
if stream.stream isa TranscodingStream && buffer === stream.stream.buffer1
# Delegate the operation when buffers are shared.
underlying_mode::Symbol = stream.stream.state.mode
if underlying_mode === :idle || underlying_mode === :read
return fillbuffer(stream.stream, eager = eager)
else
return 0
end
end
nfilled::Int = 0
while ((!eager && buffersize(buffer) == 0) || (eager && makemargin!(buffer, 0, eager = true) > 0)) && !eof(stream.stream)
iszero(buffersize(buffer)) || return false
# fill buffer1
eof(stream.stream) && return true
if !has_sharedbuf(stream)
makemargin!(buffer, 1)
nfilled += readdata!(stream.stream, buffer)
navail = bytesavailable(stream.stream)
if navail == 0
writebyte!(buffer, read(stream.stream, UInt8))
navail = bytesavailable(stream.stream)

Check warning on line 184 in src/noop.jl

View check run for this annotation

Codecov / codecov/patch

src/noop.jl#L183-L184

Added lines #L183 - L184 were not covered by tests
end
n = min(navail, marginsize(buffer))
if !iszero(n)
GC.@preserve buffer Base.unsafe_read(stream.stream, marginptr(buffer), n)
supplied!(buffer, n)
end
end
buffer.transcoded += nfilled
return nfilled
return false
end

function flushbuffer(stream::NoopStream, all::Bool=false)
Expand All @@ -183,11 +205,11 @@
nflushed += writedata!(stream.stream, buffer)
makemargin!(buffer, 0)
end
buffer.transcoded += nflushed
# buffer.transcoded += nflushed
return nflushed
end

function flushuntilend(stream::NoopStream)
stream.buffer1.transcoded += writedata!(stream.stream, stream.buffer1)
writedata!(stream.stream, stream.buffer1)
return
end
9 changes: 6 additions & 3 deletions src/state.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ See Developer's notes for details.
"""
mutable struct State
# current stream mode
mode::Symbol # {:idle, :read, :write, :stop, :close, :panic}
mode::Symbol # {:idle, :read, :write, :close, :panic}

# return code of the last method call
code::Symbol # {:ok, :end, :error}

# flag to go :stop on :end while reading
# flag to go eof on :end while reading
stop_on_end::Bool

# exception thrown while data processing
Expand All @@ -24,8 +24,11 @@ mutable struct State
buffer1::Buffer
buffer2::Buffer

# relative start position in underlying stream
offset::Union{Int64, Nothing}

function State(buffer1::Buffer, buffer2::Buffer)
return new(:idle, :ok, false, Error(), buffer1, buffer2)
return new(:idle, :ok, false, Error(), buffer1, buffer2, nothing)
end
end

Expand Down
Loading
Loading