Skip to content

Commit

Permalink
Add error handler to endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
vchuravy committed Dec 2, 2021
1 parent 6cb2594 commit 187d809
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions src/UCX.jl
Original file line number Diff line number Diff line change
Expand Up @@ -529,12 +529,25 @@ mutable struct UCXEndpoint
end
Base.unsafe_convert(::Type{API.ucp_ep_h}, ep::UCXEndpoint) = ep.handle

function ucp_err_handler(arg::Ptr{Cvoid}, ep::API.ucp_ep_h, status::API.ucp_status_t)
@error "Endpoint error" exception=UCXException(status)
# TODO should we throw here and close the endpoint?
return nothing
end

function UCXEndpoint(worker::UCXWorker, ip::IPv4, port)
field_mask = API.UCP_EP_PARAM_FIELD_FLAGS |
API.UCP_EP_PARAM_FIELD_SOCK_ADDR
API.UCP_EP_PARAM_FIELD_SOCK_ADDR |
API.UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
API.UCP_EP_PARAM_FIELD_ERR_HANDLER
flags = API.UCP_EP_PARAMS_FLAGS_CLIENT_SERVER
sockaddr = Ref(IP.sockaddr_in(InetAddr(ip, port)))

err_handler = API.ucp_err_handler(
@cfunction(ucp_err_handler, Cvoid, (Ptr{Cvoid}, API.ucp_ep_h, API.ucp_status_t)),
C_NULL
)

r_handle = Ref{API.ucp_ep_h}()
GC.@preserve sockaddr begin
ptr = Base.unsafe_convert(Ptr{IP.sockaddr_in}, sockaddr)
Expand All @@ -546,8 +559,8 @@ function UCXEndpoint(worker::UCXWorker, ip::IPv4, port)
set!(params, :field_mask, field_mask)
set!(params, :sockaddr, ucs_sockaddr)
set!(params, :flags, flags)

# TODO: Error callback
set!(params, :err_mode, API.UCP_ERR_HANDLING_MODE_PEER)
set!(params, :err_handler, err_handler)

@check API.ucp_ep_create(worker, params, r_handle)
end
Expand All @@ -557,16 +570,24 @@ end

function UCXEndpoint(worker::UCXWorker, conn_request::UCXConnectionRequest)
field_mask = API.UCP_EP_PARAM_FIELD_FLAGS |
API.UCP_EP_PARAM_FIELD_CONN_REQUEST
API.UCP_EP_PARAM_FIELD_CONN_REQUEST |
API.UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
API.UCP_EP_PARAM_FIELD_ERR_HANDLER

flags = API.UCP_EP_PARAMS_FLAGS_NO_LOOPBACK

err_handler = API.ucp_err_handler(
@cfunction(ucp_err_handler, Cvoid, (Ptr{Cvoid}, API.ucp_ep_h, API.ucp_status_t)),
C_NULL
)

params = Ref{API.ucp_ep_params}()
memzero!(params)
set!(params, :field_mask, field_mask)
set!(params, :conn_request, conn_request.handle)
set!(params, :flags, flags)

# TODO: Error callback
set!(params, :err_mode, API.UCP_ERR_HANDLING_MODE_PEER)
set!(params, :err_handler, err_handler)

r_handle = Ref{API.ucp_ep_h}()
@check API.ucp_ep_create(worker, params, r_handle)
Expand All @@ -588,15 +609,22 @@ function UCXEndpoint(worker::UCXWorker, addr_buf::Vector{UInt8})
end

function _UCXEndpoint(worker::UCXWorker, addr::Ptr{API.ucp_address_t})
field_mask = API.UCP_EP_PARAM_FIELD_REMOTE_ADDRESS
field_mask = API.UCP_EP_PARAM_FIELD_REMOTE_ADDRESS |
API.UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
API.UCP_EP_PARAM_FIELD_ERR_HANDLER

err_handler = API.ucp_err_handler(
@cfunction(ucp_err_handler, Cvoid, (Ptr{Cvoid}, API.ucp_ep_h, API.ucp_status_t)),
C_NULL
)

r_handle = Ref{API.ucp_ep_h}()
params = Ref{API.ucp_ep_params}()
memzero!(params)
set!(params, :field_mask, field_mask)
set!(params, :address, addr)

# TODO: Error callback
set!(params, :err_mode, API.UCP_ERR_HANDLING_MODE_PEER)
set!(params, :err_handler, err_handler)

@check API.ucp_ep_create(worker, params, r_handle)

Expand Down

0 comments on commit 187d809

Please sign in to comment.