Skip to content

Commit

Permalink
Allow cleanup of timer nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
timholy committed Nov 12, 2015
1 parent 2acb72e commit b9a9168
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 10 deletions.
11 changes: 7 additions & 4 deletions src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ typealias Input Node

Base.show(io::IO, n::Node) =
write(io, "Node{$(eltype(n))}($(n.value), nactions=$(length(n.actions))$(n.alive ? "" : ", closed"))")

value(n::Node) = n.value
eltype{T}(::Node{T}) = T
eltype{T}(::Type{Node{T}}) = T

##### Connections #####

function add_action!(f, node, recipient)
push!(node.actions, Action(recipient, f))
end
Expand All @@ -88,7 +88,7 @@ function close(n::Node, warn_nonleaf=true)
end
end

function send_value!(node, x, timestep)
function send_value!(node::Node, x, timestep)
# Dead node?
!node.alive && return

Expand All @@ -98,6 +98,7 @@ function send_value!(node, x, timestep)
do_action(action, timestep)
end
end
send_value!(wr::WeakRef, x, timestep) = send_value!(wr.value, x, timestep)

do_action(a::Action, timestep) =
isrequired(a) && a.f(a.recipient, timestep)
Expand All @@ -115,7 +116,9 @@ const CHANNEL_SIZE = 1024
const _messages = Channel{Any}(CHANNEL_SIZE)

# queue an update. meta comes back in a ReactiveException if there is an error
function Base.push!(n::Node, x, onerror=print_error)
Base.push!(n::Node, x, onerror=print_error) = _push!(n, x, onerror)

function _push!(n, x, onerror=print_error)
taken = Base.n_avail(_messages)
if taken >= CHANNEL_SIZE
warn("Message queue is full. Ordering may be incorrect.")
Expand Down
7 changes: 1 addition & 6 deletions src/time.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ function every(dt)
n
end

function weakrefdo(ref, yes, no=()->nothing)
ref.value != nothing ? yes(ref.value) : no()
end

function every_connect(dt, output)
outputref = WeakRef(output)
timer = Timer(x -> weakrefdo(outputref, x->push!(x, time()), ()->close(timer)), dt, dt)
timer = Timer(x -> _push!(outputref, time(), ()->close(timer)), dt, dt)
finalizer(output, _->close(timer))
output
end
Expand Down Expand Up @@ -68,4 +64,3 @@ function fpswhen(switch, rate)
end

fps(rate) = fpswhen(Node(Bool, true, ()), rate)

0 comments on commit b9a9168

Please sign in to comment.