-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix direct actor transport not treating some tasks as failed #5464
Conversation
@@ -153,7 +154,10 @@ Status CoreWorkerDirectActorTaskSubmitter::PushTask(rpc::DirectActorClient &clie | |||
store_provider_->Put(RayObject(data_buffer, metadata_buffer), object_id)); | |||
} | |||
}); | |||
return status; | |||
if (!status.ok()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this scenario where the status is not ok, does that always mean that the actor has died? Could it mean that the actor is overloaded and some buffer for sending messages is full or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If buffer is full, the request should be blocked. But I guess it's possible that network in temporarily disconnected. However, no matter what case it is, we should treat the task as failed and let the app to decide what to do (retry, ignore, or error). I'll add a TODO here about making the error message more accurate, instead of just actor died
. Does that sound good to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok sounds good.
@@ -117,6 +117,7 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectAndSendPendingTasks( | |||
auto status = | |||
PushTask(*client, request, TaskID::FromBinary(request.task_spec().task_id()), | |||
request.task_spec().num_returns()); | |||
RAY_CHECK_OK(status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that the first time a driver tries to connect to a direct call actor, if the actor is already dead, then the whole driver will fail? That doesn't seem like the right behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. PushTask
now always return ok. If the actor is dead, we'll treat the task as failed, and use an app-level exception to inform callers.
@raulchen, do we have tests that test connecting to and pushing tasks to dead actors? |
Test PASSed. |
Test PASSed. |
Yeah, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Overall looks good to me. Left a few comments.
// TODO(hchen): Should we propagate this error out of `ObjectInterface::put`? | ||
RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: " | ||
<< object_id << "."; | ||
return Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason to return OK() in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this could happen in normal cases. For example, 1) during reconstructing a task, an existing object could be put again. 2) when treating a task as failed, the task could have been succeeded but we don't know and put a duplicate object.
We are already using this behavior in python/java worker and raylet.
I found and fixed another issue, see updated PR message. Do you have other comments? |
Test FAILed. |
// already exists. | ||
RAY_LOG(WARNING) << "Task " << task_spec.TaskId() << " failed to put object " << id | ||
<< " in store: " << status.message(); | ||
RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " << id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to add a comment here that we use log FATAL for put errors except when object exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this! LGTM. Just a few nits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM a few small comments
@@ -92,6 +92,19 @@ Status CoreWorkerDirectActorTaskSubmitter::SubscribeActorUpdates() { | |||
} else { | |||
// Remove rpc client if it's dead or being reconstructed. | |||
rpc_clients_.erase(actor_id); | |||
// If this actor is permanantly dead and there're pending requests, treat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// If this actor is permanantly dead and there're pending requests, treat | |
// If this actor is permanently dead and there are pending requests, treat |
// the pending tasks as failed. | ||
if (actor_data.state() == ActorTableData::DEAD && | ||
pending_requests_.count(actor_id) > 0) { | ||
auto &requests = pending_requests_[actor_id]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer range-based for loop and then a call to pending_requests_.clear()
, more concise. Also don't need the pending_requests.count(actor_id)
check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without pending_requests.count(actor_id)
, pending_requests_[actor_id]
will construct an empty list.
if (!status.ok()) { | ||
TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED); | ||
} | ||
return Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be returning OK()
here? If so, make sure it's clearly documented - not the behavior I would expect without context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change this to return void and update comment.
Test PASSed. |
Test PASSed. |
Test PASSed. |
Why are these changes needed?
CoreWorkerTest::TestActorFailure
sometimes hang in CI. There're 2 reasons:client.PushTask
will directly return an error status, instead of triggering the callback. So, we need to treat the tasks as failed in this case.What do these changes do?
Related issue number
Linter
scripts/format.sh
to lint the changes in this PR.