Skip to content

Commit

Permalink
reactive() from_msg keyword
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Oct 7, 2024
1 parent ef915ef commit 52ef404
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 10 deletions.
20 changes: 17 additions & 3 deletions src/Rembus.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2057,15 +2057,29 @@ function unreactive(rb::RBHandle; exceptionerror=true)
end

"""
reactive(rb::RBHandle, timeout=5; exceptionerror=true)
reactive(
rb::RBHandle;
msg_from::Union{Real,Period,Dates.CompoundPeriod}=Day(1),
exceptionerror=true
)
Start the delivery of published messages for which there was declared
an interest with [`subscribe`](@ref).
"""
function reactive(rb::RBHandle; exceptionerror=true)
function reactive(
rb::RBHandle;
msg_from::Union{Real,Period,Dates.CompoundPeriod}=Day(1),
exceptionerror=true
)
response = rpcreq(
rb,
AdminReqMsg(BROKER_CONFIG, Dict(COMMAND => REACTIVE_CMD, STATUS => true)),
AdminReqMsg(
BROKER_CONFIG,
Dict(
COMMAND => REACTIVE_CMD,
STATUS => true,
MSG_FROM => to_microseconds(msg_from))
),
exceptionerror=exceptionerror
)
rb.reactive = true
Expand Down
7 changes: 6 additions & 1 deletion src/admin.jl
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,13 @@ function admin_command(router, twin, msg::AdminReqMsg)
outcome = callback_and(Symbol(REACTIVE_HANDLER), router, twin, msg) do
enabled = get(msg.data, STATUS, false)
if enabled
return EnableReactiveMsg(msg.id)
return EnableReactiveMsg(msg.id, get(msg.data, MSG_FROM, 0.0))
else
if twin.reactive
# forward the message counter to the last message received when online
# because these messages get already a chance to be delivered.
twin.mark = router.mcounter
end
twin.reactive = false
return nothing
end
Expand Down
30 changes: 26 additions & 4 deletions src/broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Copyright (C) 2024 Claudio Carraro [email protected]

struct EnableReactiveMsg <: RembusMsg
id::UInt128
msg_from::Float64
end

#=
Expand Down Expand Up @@ -241,10 +242,26 @@ function persist_messages(router)
close(db)
end

function start_reactive(twin::Twin)
function start_reactive(twin::Twin, from_msg::Float64)
# get the files with never sent messages
allfiles = msg_files(twin.router)
files = filter(t -> parse(Int, t) > twin.mark, allfiles)
nowts = time()
mdir = Rembus.messages_dir(twin.router)
files = filter(allfiles) do fn
if parse(Int, fn) <= twin.mark
# the mesage was already delivered in a previous online session
# of the component
return false
else
ftime = mtime(joinpath(mdir, fn))
delta = nowts - ftime
if delta * 1_000_000 > from_msg
@info "skipping msg $fn: mtime: $(unix2datetime(ftime)) ($delta secs from now)"
return false
end
end
return true
end
twin.reactive = true
if twin.hasname
for fn in files
Expand Down Expand Up @@ -1018,11 +1035,15 @@ function twin_task(self, twin)
if isa(twin.socket, WebSockets.WebSocket)
close(twin.socket, WebSockets.CloseFrameBody(1008, "unexpected twin close"))
end
# forward the message counter to the last message received when online
# because these messages get already a chance to be delivered.
if twin.reactive
twin.mark = twin.router.mcounter
end
end
@debug "[$twin] task done"
end


#=
handle_ack_timeout(tim, twin, msg, msgid)
Expand Down Expand Up @@ -2214,7 +2235,7 @@ function broker(self, router)
#TBD: manage errors
response = ResMsg(msg.content.id, STS_SUCCESS, nothing)
put!(msg.twchannel.process.inbox, response)
start_reactive(msg.twchannel)
start_reactive(msg.twchannel, msg.content.msg_from)
end
else
@warn "unknown message: $msg"
Expand All @@ -2231,6 +2252,7 @@ function broker(self, router)
end
filter!(router.id_twin) do (id, tw)
cleanup(tw, router)
return true
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/api/test_publish.jl
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,4 @@ execute(run, "test_publish")
# expect 68 messages published (received and stored by broker)
# the mark of sub1 and sub3 is 67 because they not subscribed to
# noarg_topic.
verify_counters(total=68, components=Dict("sub2" => 68, "sub1" => 67, "sub3" => 67))
verify_counters(total=68, components=Dict("sub2" => 68, "sub1" => 68, "sub3" => 68))
2 changes: 1 addition & 1 deletion test/api/test_publish_macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,4 @@ execute(run, "test_publish_macros")
# expect 64 messages published (received and stored by broker)
# the mark of sub1 and sub3 is 63 because they not subscribed to
# noarg_topic.
verify_counters(total=64, components=Dict("sub2" => 64, "sub1" => 63, "sub3" => 63))
verify_counters(total=64, components=Dict("sub2" => 64, "sub1" => 64, "sub3" => 64))
45 changes: 45 additions & 0 deletions test/api/test_reactive.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
include("../utils.jl")

using Dates

mutable struct Ctx
topic_1::Int
end

topic_1(ctx) = ctx.topic_1 += 1

function publish_msg()
rb = connect()
publish(rb, "topic_1")
sleep(1)
close(rb)
end

function run()
ctx = Ctx(0)

#rb = connect()
#publish(rb, "topic_1")
#sleep(1)
#close(rb)

myc = connect("myc")
shared(myc, ctx)
subscribe(myc, topic_1, msg_from=Second(1))
reactive(myc, msg_from=Now())
sleep(1)
close(myc)

# # Only named component may receive message from past ...
# @component "myc"
# @shared ctx
# @subscribe topic_1 from = Second(1)
# @reactive msg_from = Now()
# sleep(1)
# @terminate

@test ctx.topic_1 == 0
end

execute(() -> publish_msg(), "test_reactive::1")
execute(() -> run(), "test_reactive::2", reset=false)
3 changes: 3 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ const GROUP = get(ENV, "GROUP", "all")
@time @safetestset "subscribe_glob" begin
include("api/test_subscribe_glob.jl")
end
@time @safetestset "reactive" begin
include("api/test_reactive.jl")
end
end
if GROUP == "all" || GROUP == "auth"
@time @safetestset "register" begin
Expand Down

0 comments on commit 52ef404

Please sign in to comment.