Skip to content

Commit

Permalink
Fix invalid parameter issue when force to close endpoint #25
Browse files Browse the repository at this point in the history
Signed-off-by: Brian Sheng <[email protected]>
  • Loading branch information
JeynmannZ committed Mar 17, 2023
1 parent 493d6b0 commit ab39e5c
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,14 @@ case class UcxWorkerWrapper(worker: UcpWorker, transport: UcxShuffleTransport, i
def connectByWorkerAddress(executorId: transport.ExecutorId, workerAddress: ByteBuffer): Unit = {
logDebug(s"Worker $this connecting back to $executorId by worker address")
val ep = worker.newEndpoint(new UcpEndpointParams().setName(s"Server connection to $executorId")
.setUcpAddress(workerAddress))
.setUcpAddress(workerAddress)
.setPeerErrorHandlingMode()
.setErrorHandler(new UcpEndpointErrorHandler() {
override def onError(ep: UcpEndpoint, status: Int, errorMsg: String): Unit = {
logError(s"Endpoint to $executorId got an error: $errorMsg")
connections.remove(executorId)
}
}))
connections.put(executorId, ep)
}

Expand Down

0 comments on commit ab39e5c

Please sign in to comment.