Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
vchuravy committed Dec 13, 2021
1 parent ae7a83b commit 728c64d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 14 deletions.
30 changes: 16 additions & 14 deletions src/UCX.jl
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,7 @@ mutable struct UCXWorker
end

worker = new(handle, fd, context, IdDict{Any,Nothing}(), Dict{UInt16, Any}(), fill(false, Base.Threads.nthreads()), true, progress_mode)
finalizer(worker) do worker
worker.open = false
@assert isempty(worker.inflight)
API.ucp_worker_destroy(worker)
end
finalizer(destroy, worker)
return worker
end
end
Expand Down Expand Up @@ -313,6 +309,15 @@ function fence(worker::UCXWorker)
@check API.ucp_worker_fence(worker)
end

function destroy(worker::UCXWorker)
if worker.handle != C_NULL
close(worker)
@assert isempty(worker.inflight)
API.ucp_worker_destroy(worker)
worker.handle = C_NULL
end
end

function lock_am(worker::UCXWorker)
tid = Base.Threads.threadid()
if worker.in_amhandler[tid]
Expand Down Expand Up @@ -387,18 +392,16 @@ function Base.notify(worker::UCXWorker)
end

function Base.isopen(worker::UCXWorker)
worker.open
worker.open && worker.handle != C_NULL
end

function Base.close(worker::UCXWorker)
@debug "Close worker"
worker.open = false
notify(worker)
if isopen(worker)
worker.open = false
notify(worker)
end
end




"""
AMHandler(func)
Expand Down Expand Up @@ -533,8 +536,7 @@ end
Base.unsafe_convert(::Type{API.ucp_ep_h}, ep::UCXEndpoint) = ep.handle

function ucp_err_handler(arg::Ptr{Cvoid}, ep::API.ucp_ep_h, status::API.ucs_status_t)
@error "Endpoint error" exception=UCXException(status)
# TODO should we throw here and close the endpoint?
throw(UCXException(status))
return nothing
end

Expand Down
39 changes: 39 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,45 @@ end
@test addr.len > 0
end

@testset "Error handler" begin
ctx = UCX.UCXContext()
server = UCX.UCXWorker(ctx)

UCX.@spawn_showerr begin
while isopen(server)
wait(server)
end
close(server)
end

barrier = Base.Event()

am_called = Ref{Int}(0)
AM_TEST = 1
function am_test(worker, header, header_length, data, length, _param)
am_called[] += 1
notify(barrier)
return UCX.API.UCS_OK
end
UCX.AMHandler(server, am_test, AM_TEST)

server_addr = UCX.UCXAddress(server)
client = UCX.UCXWorker(ctx)
ep = UCX.UCXEndpoint(client, server_addr)

req = UCX.am_send(ep, AM_TEST, Int[])
wait(req) # wait on request to be send before suspending in `take!`
wait(barrier)

@test am_called[] == 1

UCX.destroy(server)
barrier = Base.Event()
req = UCX.am_send(ep, AM_TEST, Int[])
@test_throws UCX.UCXException wait(req) # wait on request to be send before suspending in `take!`
@test am_called[] == 1
end

@testset "Active Messages" begin
cmd = Base.julia_cmd()
if Base.JLOptions().project != C_NULL
Expand Down

0 comments on commit 728c64d

Please sign in to comment.