diff --git a/src/core.jl b/src/core.jl index cec3b90..4b28f0d 100644 --- a/src/core.jl +++ b/src/core.jl @@ -63,13 +63,13 @@ typealias Input Node Base.show(io::IO, n::Node) = write(io, "Node{$(eltype(n))}($(n.value), nactions=$(length(n.actions))$(n.alive ? "" : ", closed"))") - + value(n::Node) = n.value eltype{T}(::Node{T}) = T eltype{T}(::Type{Node{T}}) = T ##### Connections ##### - + function add_action!(f, node, recipient) push!(node.actions, Action(recipient, f)) end @@ -88,7 +88,7 @@ function close(n::Node, warn_nonleaf=true) end end -function send_value!(node, x, timestep) +function send_value!(node::Node, x, timestep) # Dead node? !node.alive && return @@ -98,6 +98,7 @@ function send_value!(node, x, timestep) do_action(action, timestep) end end +send_value!(wr::WeakRef, x, timestep) = send_value!(wr.value, x, timestep) do_action(a::Action, timestep) = isrequired(a) && a.f(a.recipient, timestep) @@ -115,7 +116,9 @@ const CHANNEL_SIZE = 1024 const _messages = Channel{Any}(CHANNEL_SIZE) # queue an update. meta comes back in a ReactiveException if there is an error -function Base.push!(n::Node, x, onerror=print_error) +Base.push!(n::Node, x, onerror=print_error) = _push!(n, x, onerror) + +function _push!(n, x, onerror=print_error) taken = Base.n_avail(_messages) if taken >= CHANNEL_SIZE warn("Message queue is full. Ordering may be incorrect.") diff --git a/src/time.jl b/src/time.jl index 5aa0b40..96e45d3 100644 --- a/src/time.jl +++ b/src/time.jl @@ -23,13 +23,9 @@ function every(dt) n end -function weakrefdo(ref, yes, no=()->nothing) - ref.value != nothing ? yes(ref.value) : no() -end - function every_connect(dt, output) outputref = WeakRef(output) - timer = Timer(x -> weakrefdo(outputref, x->push!(x, time()), ()->close(timer)), dt, dt) + timer = Timer(x -> _push!(outputref, time(), ()->close(timer)), dt, dt) finalizer(output, _->close(timer)) output end @@ -68,4 +64,3 @@ function fpswhen(switch, rate) end fps(rate) = fpswhen(Node(Bool, true, ()), rate) -