Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix direct actor transport not treating some tasks as failed #5464

Merged
merged 9 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ Java_org_ray_runtime_object_NativeObjectStore_nativePut__J_3BLorg_ray_runtime_ob
RAY_CHECK(ray_object != nullptr);
auto status =
GetObjectInterfaceFromPointer(nativeCoreWorkerPointer).Put(*ray_object, object_id);
if (status.IsIOError() &&
status.message() == "object already exists in the plasma store") {
// Ignore duplicated put on the same object ID.
return;
}
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}

Expand Down
11 changes: 9 additions & 2 deletions src/ray/core_worker/store_provider/local_plasma_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@ Status CoreWorkerLocalPlasmaStoreProvider::Put(const RayObject &object,
std::shared_ptr<arrow::Buffer> out_buffer;
{
std::unique_lock<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(store_client_.Create(
arrow::Status status = store_client_.Create(
plasma_id, data ? data->Size() : 0, metadata ? metadata->Data() : nullptr,
metadata ? metadata->Size() : 0, &out_buffer));
metadata ? metadata->Size() : 0, &out_buffer);
if (plasma::IsPlasmaObjectExists(status)) {
// TODO(hchen): Should we propagate this error out of `ObjectInterface::put`?
RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: "
<< object_id << ".";
return Status::OK();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason to return OK() in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this could happen in normal cases. For example, 1) during reconstructing a task, an existing object could be put again. 2) when treating a task as failed, the task could have been succeeded but we don't know and put a duplicate object.
We are already using this behavior in python/java worker and raylet.

}
RAY_ARROW_RETURN_NOT_OK(status);
}

if (data != nullptr) {
Expand Down
31 changes: 22 additions & 9 deletions src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(

// Submit request.
auto &client = rpc_clients_[actor_id];
return PushTask(*client, *request, task_id, num_returns);
PushTask(*client, *request, task_id, num_returns);
return Status::OK();
} else {
// Actor is dead, treat the task as failure.
RAY_CHECK(iter->second.state_ == ActorTableData::DEAD);
Expand Down Expand Up @@ -92,6 +93,17 @@ Status CoreWorkerDirectActorTaskSubmitter::SubscribeActorUpdates() {
} else {
// Remove rpc client if it's dead or being reconstructed.
rpc_clients_.erase(actor_id);
// If this actor is permanently dead and there are pending requests, treat
// the pending tasks as failed.
if (actor_data.state() == ActorTableData::DEAD &&
pending_requests_.count(actor_id) > 0) {
for (const auto &request : pending_requests_[actor_id]) {
TreatTaskAsFailed(TaskID::FromBinary(request->task_spec().task_id()),
request->task_spec().num_returns(),
rpc::ErrorType::ACTOR_DIED);
}
pending_requests_.erase(actor_id);
}
}

RAY_LOG(INFO) << "received notification on actor, state="
Expand All @@ -114,17 +126,16 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectAndSendPendingTasks(
auto &requests = pending_requests_[actor_id];
while (!requests.empty()) {
const auto &request = *requests.front();
auto status =
PushTask(*client, request, TaskID::FromBinary(request.task_spec().task_id()),
request.task_spec().num_returns());
PushTask(*client, request, TaskID::FromBinary(request.task_spec().task_id()),
request.task_spec().num_returns());
requests.pop_front();
}
}

Status CoreWorkerDirectActorTaskSubmitter::PushTask(rpc::DirectActorClient &client,
const rpc::PushTaskRequest &request,
const TaskID &task_id,
int num_returns) {
void CoreWorkerDirectActorTaskSubmitter::PushTask(rpc::DirectActorClient &client,
const rpc::PushTaskRequest &request,
const TaskID &task_id,
int num_returns) {
auto status = client.PushTask(
request,
[this, task_id, num_returns](Status status, const rpc::PushTaskReply &reply) {
Expand Down Expand Up @@ -153,7 +164,9 @@ Status CoreWorkerDirectActorTaskSubmitter::PushTask(rpc::DirectActorClient &clie
store_provider_->Put(RayObject(data_buffer, metadata_buffer), object_id));
}
});
return status;
if (!status.ok()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this scenario where the status is not ok, does that always mean that the actor has died? Could it mean that the actor is overloaded and some buffer for sending messages is full or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If buffer is full, the request should be blocked. But I guess it's possible that network in temporarily disconnected. However, no matter what case it is, we should treat the task as failed and let the app to decide what to do (retry, ignore, or error). I'll add a TODO here about making the error message more accurate, instead of just actor died. Does that sound good to you?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sounds good.

TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED);
}
}

void CoreWorkerDirectActorTaskSubmitter::TreatTaskAsFailed(
Expand Down
10 changes: 6 additions & 4 deletions src/ray/core_worker/transport/direct_actor_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter {
/// Subscribe to all actor updates.
Status SubscribeActorUpdates();

/// Helper function to push a task to an actor.
/// Push a task to a remote actor via the given client.
/// Note, this function doesn't return any error status code. If an error occurs while
/// sending the request, this task will be treated as failed.
///
/// \param[in] client The RPC client to send tasks to an actor.
/// \param[in] request The request to send.
/// \param[in] task_id The ID of a task.
/// \param[in] num_returns Number of return objects.
/// \return Status.
Status PushTask(rpc::DirectActorClient &client, const rpc::PushTaskRequest &request,
const TaskID &task_id, int num_returns);
/// \return Void.
void PushTask(rpc::DirectActorClient &client, const rpc::PushTaskRequest &request,
const TaskID &task_id, int num_returns);

/// Treat a task as failed.
///
Expand Down
8 changes: 4 additions & 4 deletions src/ray/core_worker/transport/raylet_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask(
/*transport_type=*/static_cast<int>(TaskTransportType::RAYLET));
Status status = object_interface_.Put(*results[i], id);
if (!status.ok()) {
// TODO (kfstorm): RAY_LOG(FATAL) except the error is about the object to put
// already exists.
RAY_LOG(WARNING) << "Task " << task_spec.TaskId() << " failed to put object " << id
<< " in store: " << status.message();
// NOTE(hchen): `PlasmaObjectExists` error is already ignored inside
// `ObjectInterface::Put`, we treat other error types as fatal here.
RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " << id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add a comment here that we use log FATAL for put errors except when object exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

<< " in store: " << status.message();
} else {
RAY_LOG(DEBUG) << "Task " << task_spec.TaskId() << " put object " << id
<< " in store.";
Expand Down