Skip to content

Commit

Permalink
Make third argument to push! error callback. Default error callback p…
Browse files Browse the repository at this point in the history
…rints to stderr
  • Loading branch information
shashi committed Nov 11, 2015
1 parent 3541fa2 commit 2acb72e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 53 deletions.
60 changes: 39 additions & 21 deletions src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -109,57 +109,75 @@ cleanup_actions(node::Node) =

##### Messaging #####

if VERSION < v"0.4.0-dev"
using MessageUtils
queue_size(x) = length(fetch(x.rr).space)
else
channel(;sz=1024) = Channel{Any}(sz)
queue_size = Base.n_avail
end

const CHANNEL_SIZE = 1024

# Global channel for signal updates
const _messages = channel(sz=CHANNEL_SIZE)
const _messages = Channel{Any}(CHANNEL_SIZE)

# queue an update. meta comes back in a ReactiveException if there is an error
function Base.push!(n::Node, x; meta::Any=nothing)
taken = queue_size(_messages)
function Base.push!(n::Node, x, onerror=print_error)
taken = Base.n_avail(_messages)
if taken >= CHANNEL_SIZE
warn("Message queue is full. Ordering may be incorrect.")
@async put!(_messages, (n, x, meta))
@async put!(_messages, (n, x, onerror))
else
put!(_messages, (n, x, meta))
put!(_messages, (n, x, onerror))
end
nothing
end

include("exception.jl")

# remove messages from the channel and propagate them
global runner_task
global run
let timestep = 0
function run(steps=typemax(Int))
runner_task = current_task()::Task
local waiting, node, value, debug_meta, iter = 1
local waiting, node, value, onerror, iter = 1
try
while iter <= steps
timestep += 1
iter += 1

waiting = true
(node, value, debug_meta) = take!(_messages)
(node, value, onerror) = take!(_messages)
waiting = false

send_value!(node, value, timestep)
end
catch err
bt = catch_backtrace()
throw(ReactiveException(waiting, node, value, timestep, debug_meta, CapturedException(err, bt)))
if isa(err, InterruptException)
println("Reactive event loop was inturrupted.")
rethrow()
else
bt = catch_backtrace()
onerror(node, value, CapturedException(err, bt))
end
end
end
end

# Default error handler function
function print_error(node, value, ex)
lock(io_lock)
io = STDERR
println(io, "Failed to push!")
print(io, " ")
show(io, value)
println(io)
println(io, "to node")
print(io, " ")
show(io, node)
println(io)
showerror(io, ex)
println(io)
unlock(io_lock)
end

# Run everything queued up till the instant of calling this function
run_till_now() = run(queue_size(_messages))
run_till_now() = run(Base.n_avail(_messages))

# A decent default runner task
function __init__()
global runner_task = @async begin
Reactive.run()
end
end
32 changes: 0 additions & 32 deletions src/exception.jl

This file was deleted.

0 comments on commit 2acb72e

Please sign in to comment.