From 80a27f7b9851dd67689b8cd19a6669eb9bdc08fe Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 14:05:27 -0700 Subject: [PATCH 01/15] fix reconstruction hangs --- src/ray/raylet/node_manager.cc | 37 ++++++++++++++----- src/ray/raylet/node_manager.h | 6 +-- src/ray/raylet/reconstruction_policy.cc | 13 ++++--- src/ray/raylet/reconstruction_policy.h | 6 +-- .../raylet/task_dependency_manager_test.cc | 2 +- 5 files changed, 42 insertions(+), 22 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b56029d28bdf6..17d5709f7ea93 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -88,7 +88,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, scheduling_policy_(local_queues_), reconstruction_policy_( io_service_, - [this](const TaskID &task_id) { HandleTaskReconstruction(task_id); }, + [this](const TaskID &task_id, const ObjectID &required_object_id) { + HandleTaskReconstruction(task_id, required_object_id); }, RayConfig::instance().initial_reconstruction_timeout_milliseconds(), gcs_client_->client_table().GetLocalClientId(), gcs_client_->task_lease_table(), object_directory_, gcs_client_->task_reconstruction_log()), @@ -1403,7 +1404,7 @@ bool NodeManager::CheckDependencyManagerInvariant() const { return true; } -void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type) { +void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type, const ObjectID* required_object_id) { const TaskSpecification &spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed because of error " << ErrorType_Name(error_type) << "."; @@ -1422,6 +1423,22 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ num_returns -= 1; } const std::string meta = std::to_string(static_cast(error_type)); + if (num_returns == 0 && required_object_id != nullptr) { + const auto object_id = required_object_id->ToPlasmaId(); + arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta); + if (!status.ok() && !plasma::IsPlasmaObjectExists(status)) { + // If we failed to save the error code, log a warning and push an error message + // to the driver. + std::ostringstream stream; + stream << "An plasma error (" << status.ToString() << ") occurred while saving" + << " error code to object " << object_id << ". Anyone who's getting this" + << " object may hang forever."; + std::string error_message = stream.str(); + RAY_LOG(WARNING) << error_message; + RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( + task.GetTaskSpecification().JobId(), "task", error_message, current_time_ms())); + } + } for (int64_t i = 0; i < num_returns; i++) { const auto object_id = spec.ReturnId(i).ToPlasmaId(); arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta); @@ -2044,21 +2061,22 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id } } -void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { +void NodeManager::HandleTaskReconstruction(const TaskID &task_id, const ObjectID &required_object_id) { // Retrieve the task spec in order to re-execute the task. RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( JobID::Nil(), task_id, /*success_callback=*/ - [this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id, + [this, required_object_id](ray::gcs::AsyncGcsClient *client, const TaskID &task_id, const TaskTableData &task_data) { // The task was in the GCS task table. Use the stored task spec to // re-execute the task. - ResubmitTask(Task(task_data.task())); + ResubmitTask(Task(task_data.task()), required_object_id); }, /*failure_callback=*/ - [this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) { + [this, required_object_id](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) { // The task was not in the GCS task table. It must therefore be in the // lineage cache. + // TODO(ekl) fail the task there too RAY_CHECK(lineage_cache_.ContainsTask(task_id)) << "Metadata of task " << task_id << " not found in either GCS or lineage cache. It may have been evicted " @@ -2067,11 +2085,11 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { << "ray.init(redis_max_memory=)."; // Use a copy of the cached task spec to re-execute the task. const Task task = lineage_cache_.GetTaskOrDie(task_id); - ResubmitTask(task); + ResubmitTask(task, required_object_id); })); } -void NodeManager::ResubmitTask(const Task &task) { +void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object_id) { RAY_LOG(DEBUG) << "Attempting to resubmit task " << task.GetTaskSpecification().TaskId(); @@ -2098,10 +2116,11 @@ void NodeManager::ResubmitTask(const Task &task) { std::ostringstream error_message; error_message << "The task with ID " << task.GetTaskSpecification().TaskId() << " is a driver task and so the object created by ray.put " - << "could not be reconstructed."; + << "could not be reconstructed!!!"; RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( task.GetTaskSpecification().JobId(), type, error_message.str(), current_time_ms())); + TreatTaskAsFailed(task, ErrorType::OBJECT_UNRECONSTRUCTABLE, &required_object_id); return; } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 9fbb4a9bc98f6..cf723494a8323 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -188,7 +188,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param task The task to fail. /// \param error_type The type of the error that caused this task to fail. /// \return Void. - void TreatTaskAsFailed(const Task &task, const ErrorType &error_type); + void TreatTaskAsFailed(const Task &task, const ErrorType &error_type, const ObjectID* required_object_id = nullptr); /// This is similar to TreatTaskAsFailed, but it will only mark the task as /// failed if at least one of the task's return values is lost. A return /// value is lost if it has been created before, but no longer exists on any @@ -258,13 +258,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// /// \param task_id The relevant task ID. /// \return Void. - void HandleTaskReconstruction(const TaskID &task_id); + void HandleTaskReconstruction(const TaskID &task_id, const ObjectID& required_object_id); /// Resubmit a task for execution. This is a task that was previously already /// submitted to a raylet but which must now be re-executed. /// /// \param task The task being resubmitted. /// \return Void. - void ResubmitTask(const Task &task); + void ResubmitTask(const Task &task, const ObjectID& required_object_id); /// Attempt to forward a task to a remote different node manager. If this /// fails, the task will be resubmit locally. /// diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index f522c898649b7..6d534402384d6 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -8,7 +8,7 @@ namespace raylet { ReconstructionPolicy::ReconstructionPolicy( boost::asio::io_service &io_service, - std::function reconstruction_handler, + std::function reconstruction_handler, int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, gcs::PubsubInterface &task_lease_pubsub, std::shared_ptr object_directory, @@ -64,6 +64,7 @@ void ReconstructionPolicy::SetTaskTimeout( } void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id, + const ObjectID &required_object_id, bool success) { auto it = listening_tasks_.find(task_id); if (it == listening_tasks_.end()) { @@ -76,7 +77,7 @@ void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id, SetTaskTimeout(it, initial_reconstruction_timeout_ms_); if (success) { - reconstruction_handler_(task_id); + reconstruction_handler_(task_id, required_object_id); } } @@ -112,14 +113,14 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id, RAY_CHECK_OK(task_reconstruction_log_.AppendAt( JobID::Nil(), task_id, reconstruction_entry, /*success_callback=*/ - [this](gcs::AsyncGcsClient *client, const TaskID &task_id, + [this, required_object_id](gcs::AsyncGcsClient *client, const TaskID &task_id, const TaskReconstructionData &data) { - HandleReconstructionLogAppend(task_id, /*success=*/true); + HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/true); }, /*failure_callback=*/ - [this](gcs::AsyncGcsClient *client, const TaskID &task_id, + [this, required_object_id](gcs::AsyncGcsClient *client, const TaskID &task_id, const TaskReconstructionData &data) { - HandleReconstructionLogAppend(task_id, /*success=*/false); + HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/false); }, reconstruction_attempt)); diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index a194443e14258..a1d6dca260301 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -42,7 +42,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// lease notifications from. ReconstructionPolicy( boost::asio::io_service &io_service, - std::function reconstruction_handler, + std::function reconstruction_handler, int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, gcs::PubsubInterface &task_lease_pubsub, std::shared_ptr object_directory, @@ -127,12 +127,12 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// Handle the response for an attempt at adding an entry to the task /// reconstruction log. - void HandleReconstructionLogAppend(const TaskID &task_id, bool success); + void HandleReconstructionLogAppend(const TaskID &task_id, const ObjectID &object_id, bool success); /// The event loop. boost::asio::io_service &io_service_; /// The handler to call for tasks that require reconstruction. - const std::function reconstruction_handler_; + const std::function reconstruction_handler_; /// The initial timeout within which a task lease notification must be /// received. Otherwise, reconstruction will be triggered. const int64_t initial_reconstruction_timeout_ms_; diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 86136e201cdd6..d6a578eea8f2b 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -71,7 +71,7 @@ static inline Task ExampleTask(const std::vector &arguments, uint64_t num_returns) { TaskSpecBuilder builder; builder.SetCommonTaskSpec(Language::PYTHON, {"", "", ""}, JobID::Nil(), - TaskID::FromRandom(), 0, num_returns, {}, {}); + TaskID::FromRandom(), 0, num_returns, {}, {}, {}); for (const auto &arg : arguments) { builder.AddByRefArg(arg); } From b778a7c07055efb3d34282ff4f59cc36b1aca9b7 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 14:40:53 -0700 Subject: [PATCH 02/15] reformat --- build.sh | 12 ++-- src/ray/raylet/node_manager.cc | 91 ++++++++++++++----------- src/ray/raylet/node_manager.h | 18 +++-- src/ray/raylet/reconstruction_policy.cc | 9 ++- src/ray/raylet/reconstruction_policy.h | 3 +- 5 files changed, 76 insertions(+), 57 deletions(-) diff --git a/build.sh b/build.sh index b7d97aeb59247..5a4ac12608d00 100755 --- a/build.sh +++ b/build.sh @@ -97,12 +97,12 @@ fi pushd "$BUILD_DIR" -# The following line installs pyarrow from S3, these wheels have been -# generated from https://github.com/ray-project/arrow-build from -# the commit listed in the command. -$PYTHON_EXECUTABLE -m pip install \ - --target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \ - --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/f86340a3b597502bacc801b17ab03c89d31aa561/index.html +## The following line installs pyarrow from S3, these wheels have been +## generated from https://github.com/ray-project/arrow-build from +## the commit listed in the command. +#$PYTHON_EXECUTABLE -m pip install \ +# --target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \ +# --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/f86340a3b597502bacc801b17ab03c89d31aa561/index.html export PYTHON_BIN_PATH="$PYTHON_EXECUTABLE" if [ "$RAY_BUILD_JAVA" == "YES" ]; then diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 17d5709f7ea93..faf43fa7ccd9e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -89,7 +89,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, reconstruction_policy_( io_service_, [this](const TaskID &task_id, const ObjectID &required_object_id) { - HandleTaskReconstruction(task_id, required_object_id); }, + HandleTaskReconstruction(task_id, required_object_id); + }, RayConfig::instance().initial_reconstruction_timeout_milliseconds(), gcs_client_->client_table().GetLocalClientId(), gcs_client_->task_lease_table(), object_directory_, gcs_client_->task_reconstruction_log()), @@ -1404,7 +1405,8 @@ bool NodeManager::CheckDependencyManagerInvariant() const { return true; } -void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type, const ObjectID* required_object_id) { +void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type, + const ObjectID *required_object_id) { const TaskSpecification &spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed because of error " << ErrorType_Name(error_type) << "."; @@ -1422,25 +1424,31 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ // information about the TaskSpecification implementation. num_returns -= 1; } - const std::string meta = std::to_string(static_cast(error_type)); + // Determine which IDs should be marked as failed. + std::vector objects_to_fail; if (num_returns == 0 && required_object_id != nullptr) { - const auto object_id = required_object_id->ToPlasmaId(); - arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta); - if (!status.ok() && !plasma::IsPlasmaObjectExists(status)) { - // If we failed to save the error code, log a warning and push an error message - // to the driver. - std::ostringstream stream; - stream << "An plasma error (" << status.ToString() << ") occurred while saving" - << " error code to object " << object_id << ". Anyone who's getting this" - << " object may hang forever."; - std::string error_message = stream.str(); - RAY_LOG(WARNING) << error_message; - RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( - task.GetTaskSpecification().JobId(), "task", error_message, current_time_ms())); + objects_to_fail.push_back(required_object_id->ToPlasmaId()); + } else { + for (int64_t i = 0; i < num_returns; i++) { + objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId()); } } - for (int64_t i = 0; i < num_returns; i++) { - const auto object_id = spec.ReturnId(i).ToPlasmaId(); + const JobID job_id = task.GetTaskSpecification().JobId(); + MarkObjectsAsFailed(error_type, objects_to_fail, &job_id); + task_dependency_manager_.TaskCanceled(spec.TaskId()); + // Notify the task dependency manager that we no longer need this task's + // object dependencies. TODO(swang): Ideally, we would check the return value + // here. However, we don't know at this point if the task was in the WAITING + // or READY queue before, in which case we would not have been subscribed to + // its dependencies. + task_dependency_manager_.UnsubscribeDependencies(spec.TaskId()); +} + +void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, + const std::vector objects_to_fail, + const JobID *job_id) { + const std::string meta = std::to_string(static_cast(error_type)); + for (const auto &object_id : objects_to_fail) { arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta); if (!status.ok() && !plasma::IsPlasmaObjectExists(status)) { // If we failed to save the error code, log a warning and push an error message @@ -1451,17 +1459,12 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ << " object may hang forever."; std::string error_message = stream.str(); RAY_LOG(WARNING) << error_message; - RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( - task.GetTaskSpecification().JobId(), "task", error_message, current_time_ms())); + if (job_id != nullptr) { + RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( + *job_id, "task", error_message, current_time_ms())); + } } } - task_dependency_manager_.TaskCanceled(spec.TaskId()); - // Notify the task dependency manager that we no longer need this task's - // object dependencies. TODO(swang): Ideally, we would check the return value - // here. However, we don't know at this point if the task was in the WAITING - // or READY queue before, in which case we would not have been subscribed to - // its dependencies. - task_dependency_manager_.UnsubscribeDependencies(spec.TaskId()); } void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { @@ -2061,31 +2064,37 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id } } -void NodeManager::HandleTaskReconstruction(const TaskID &task_id, const ObjectID &required_object_id) { +void NodeManager::HandleTaskReconstruction(const TaskID &task_id, + const ObjectID &required_object_id) { // Retrieve the task spec in order to re-execute the task. RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( JobID::Nil(), task_id, /*success_callback=*/ [this, required_object_id](ray::gcs::AsyncGcsClient *client, const TaskID &task_id, - const TaskTableData &task_data) { + const TaskTableData &task_data) { // The task was in the GCS task table. Use the stored task spec to // re-execute the task. ResubmitTask(Task(task_data.task()), required_object_id); }, /*failure_callback=*/ - [this, required_object_id](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) { + [this, required_object_id](ray::gcs::AsyncGcsClient *client, + const TaskID &task_id) { // The task was not in the GCS task table. It must therefore be in the // lineage cache. - // TODO(ekl) fail the task there too - RAY_CHECK(lineage_cache_.ContainsTask(task_id)) - << "Metadata of task " << task_id - << " not found in either GCS or lineage cache. It may have been evicted " - << "by the redis LRU configuration. Consider increasing the memory " - "allocation via " - << "ray.init(redis_max_memory=)."; - // Use a copy of the cached task spec to re-execute the task. - const Task task = lineage_cache_.GetTaskOrDie(task_id); - ResubmitTask(task, required_object_id); + if (lineage_cache_.ContainsTask(task_id)) { + // Use a copy of the cached task spec to re-execute the task. + const Task task = lineage_cache_.GetTaskOrDie(task_id); + ResubmitTask(task, required_object_id); + } else { + RAY_LOG(WARNING) + << "Metadata of task " << task_id + << " not found in either GCS or lineage cache. It may have been evicted " + << "by the redis LRU configuration. Consider increasing the memory " + "allocation via " + << "ray.init(redis_max_memory=)."; + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, + {required_object_id.ToPlasmaId()}); + } })); } @@ -2116,7 +2125,7 @@ void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object std::ostringstream error_message; error_message << "The task with ID " << task.GetTaskSpecification().TaskId() << " is a driver task and so the object created by ray.put " - << "could not be reconstructed!!!"; + << "could not be reconstructed."; RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( task.GetTaskSpecification().JobId(), type, error_message.str(), current_time_ms())); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index cf723494a8323..101498385829e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -187,8 +187,15 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// /// \param task The task to fail. /// \param error_type The type of the error that caused this task to fail. - /// \return Void. - void TreatTaskAsFailed(const Task &task, const ErrorType &error_type, const ObjectID* required_object_id = nullptr); + /// \param required_object_id The object id that we should write the error + /// message into. This is used in case the task has no outputs (e.g., + /// it is the driver task). + /// \return Void. + void TreatTaskAsFailed(const Task &task, const ErrorType &error_type, + const ObjectID *required_object_id = nullptr); + void MarkObjectsAsFailed(const ErrorType &error_type, + const std::vector object_ids, + const JobID *job_id = nullptr); /// This is similar to TreatTaskAsFailed, but it will only mark the task as /// failed if at least one of the task's return values is lost. A return /// value is lost if it has been created before, but no longer exists on any @@ -257,14 +264,17 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Handle a task whose return value(s) must be reconstructed. /// /// \param task_id The relevant task ID. + /// \param required_object_id The object id we are reconstructing for. /// \return Void. - void HandleTaskReconstruction(const TaskID &task_id, const ObjectID& required_object_id); + void HandleTaskReconstruction(const TaskID &task_id, + const ObjectID &required_object_id); /// Resubmit a task for execution. This is a task that was previously already /// submitted to a raylet but which must now be re-executed. /// /// \param task The task being resubmitted. + /// \param required_object_id The object id that triggered the resubmission. /// \return Void. - void ResubmitTask(const Task &task, const ObjectID& required_object_id); + void ResubmitTask(const Task &task, const ObjectID &required_object_id); /// Attempt to forward a task to a remote different node manager. If this /// fails, the task will be resubmit locally. /// diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index 6d534402384d6..cb37c71509721 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -63,9 +63,8 @@ void ReconstructionPolicy::SetTaskTimeout( }); } -void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id, - const ObjectID &required_object_id, - bool success) { +void ReconstructionPolicy::HandleReconstructionLogAppend( + const TaskID &task_id, const ObjectID &required_object_id, bool success) { auto it = listening_tasks_.find(task_id); if (it == listening_tasks_.end()) { return; @@ -114,12 +113,12 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id, JobID::Nil(), task_id, reconstruction_entry, /*success_callback=*/ [this, required_object_id](gcs::AsyncGcsClient *client, const TaskID &task_id, - const TaskReconstructionData &data) { + const TaskReconstructionData &data) { HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/true); }, /*failure_callback=*/ [this, required_object_id](gcs::AsyncGcsClient *client, const TaskID &task_id, - const TaskReconstructionData &data) { + const TaskReconstructionData &data) { HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/false); }, reconstruction_attempt)); diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index a1d6dca260301..a44819dca5bc1 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -127,7 +127,8 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// Handle the response for an attempt at adding an entry to the task /// reconstruction log. - void HandleReconstructionLogAppend(const TaskID &task_id, const ObjectID &object_id, bool success); + void HandleReconstructionLogAppend(const TaskID &task_id, const ObjectID &object_id, + bool success); /// The event loop. boost::asio::io_service &io_service_; From 81e108e2d87da35cb21b044ecbd72ceb5cd69504 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 14:41:57 -0700 Subject: [PATCH 03/15] doc --- src/ray/raylet/node_manager.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 101498385829e..8bb72860187cc 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -193,6 +193,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void TreatTaskAsFailed(const Task &task, const ErrorType &error_type, const ObjectID *required_object_id = nullptr); + /// Mark the specified objects as failed with the given error type. + /// + /// \param error_type The type of the error that caused this task to fail. + /// \param object_ids The object ids to store error messages into. + /// \param job_id The optional job to push errors to if the writes fail. void MarkObjectsAsFailed(const ErrorType &error_type, const std::vector object_ids, const JobID *job_id = nullptr); From 7eead6762c5f0f8e5456b2acb072331701b7cea3 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 14:57:44 -0700 Subject: [PATCH 04/15] tests --- .../tests/test_unreconstructable_errors.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 python/ray/tests/test_unreconstructable_errors.py diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py new file mode 100644 index 0000000000000..8778588748347 --- /dev/null +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -0,0 +1,42 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import unittest + +import ray + + +class TestUnreconstructableErrors(unittest.TestCase): + def setUp(self): + ray.init(object_store_memory=10000000, redis_max_memory=10000000) + + def tearDown(self): + ray.shutdown() + + def testDriverPutEvictedCannotReconstruct(self): + x_id = ray.put(np.zeros(1 * 1024 * 1024)) + ray.get(x_id) + for _ in range(10): + ray.put(np.zeros(1 * 1024 * 1024)) + self.assertRaises(ray.exceptions.UnreconstructableError, + lambda: ray.get(x_id)) + + def testLineageEvictedReconstructionFails(self): + @ray.remote + def f(data): + return 0 + + x_id = f.remote(None) + ray.get(x_id) + for i in range(200): + if i % 100 == 0: + print("launching round of tasks to evict lineage", i) + ray.get([f.remote(np.zeros(10000)) for _ in range(100)]) + self.assertRaises(ray.exceptions.UnreconstructableError, + lambda: ray.get(x_id)) + + +if __name__ == "__main__": + unittest.main(verbosity=2) From 3e25ccc084238e5ef5d26e42689a67090ea4e3b0 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 14:58:11 -0700 Subject: [PATCH 05/15] revert build --- build.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/build.sh b/build.sh index 5a4ac12608d00..b7d97aeb59247 100755 --- a/build.sh +++ b/build.sh @@ -97,12 +97,12 @@ fi pushd "$BUILD_DIR" -## The following line installs pyarrow from S3, these wheels have been -## generated from https://github.com/ray-project/arrow-build from -## the commit listed in the command. -#$PYTHON_EXECUTABLE -m pip install \ -# --target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \ -# --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/f86340a3b597502bacc801b17ab03c89d31aa561/index.html +# The following line installs pyarrow from S3, these wheels have been +# generated from https://github.com/ray-project/arrow-build from +# the commit listed in the command. +$PYTHON_EXECUTABLE -m pip install \ + --target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \ + --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/f86340a3b597502bacc801b17ab03c89d31aa561/index.html export PYTHON_BIN_PATH="$PYTHON_EXECUTABLE" if [ "$RAY_BUILD_JAVA" == "YES" ]; then From ca0e112c98abacbe5881442dce4954509e98ceb8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 17:36:10 -0700 Subject: [PATCH 06/15] fix test --- src/ray/raylet/reconstruction_policy_test.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 94155b442c713..e9154691c5fbe 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -155,7 +155,9 @@ class ReconstructionPolicyTest : public ::testing::Test { reconstruction_timeout_ms_(50), reconstruction_policy_(std::make_shared( io_service_, - [this](const TaskID &task_id) { TriggerReconstruction(task_id); }, + [this](const TaskID &task_id, const ObjectID &obj) { + TriggerReconstruction(task_id); + }, reconstruction_timeout_ms_, ClientID::FromRandom(), mock_gcs_, mock_object_directory_, mock_gcs_)), timer_canceled_(false) { From 2755bddc70dfac6152b5db12872419cee6e5ff31 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 17:45:11 -0700 Subject: [PATCH 07/15] comments --- python/ray/tests/test_unreconstructable_errors.py | 4 +--- src/ray/raylet/node_manager.cc | 13 ++++++------- src/ray/raylet/node_manager.h | 2 +- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py index 8778588748347..bfc05a914d591 100644 --- a/python/ray/tests/test_unreconstructable_errors.py +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -30,9 +30,7 @@ def f(data): x_id = f.remote(None) ray.get(x_id) - for i in range(200): - if i % 100 == 0: - print("launching round of tasks to evict lineage", i) + for _ in range(200): ray.get([f.remote(np.zeros(10000)) for _ in range(100)]) self.assertRaises(ray.exceptions.UnreconstructableError, lambda: ray.get(x_id)) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index faf43fa7ccd9e..7cb22645d0124 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1427,6 +1427,7 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ // Determine which IDs should be marked as failed. std::vector objects_to_fail; if (num_returns == 0 && required_object_id != nullptr) { + // This may be a driver task with no returns objects_to_fail.push_back(required_object_id->ToPlasmaId()); } else { for (int64_t i = 0; i < num_returns; i++) { @@ -1434,7 +1435,7 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ } } const JobID job_id = task.GetTaskSpecification().JobId(); - MarkObjectsAsFailed(error_type, objects_to_fail, &job_id); + MarkObjectsAsFailed(error_type, objects_to_fail, job_id); task_dependency_manager_.TaskCanceled(spec.TaskId()); // Notify the task dependency manager that we no longer need this task's // object dependencies. TODO(swang): Ideally, we would check the return value @@ -1446,7 +1447,7 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, const std::vector objects_to_fail, - const JobID *job_id) { + const JobID &job_id) { const std::string meta = std::to_string(static_cast(error_type)); for (const auto &object_id : objects_to_fail) { arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta); @@ -1459,10 +1460,8 @@ void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, << " object may hang forever."; std::string error_message = stream.str(); RAY_LOG(WARNING) << error_message; - if (job_id != nullptr) { - RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( - *job_id, "task", error_message, current_time_ms())); - } + RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( + job_id, "task", error_message, current_time_ms())); } } } @@ -2093,7 +2092,7 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id, "allocation via " << "ray.init(redis_max_memory=)."; MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, - {required_object_id.ToPlasmaId()}); + {required_object_id.ToPlasmaId()}, JobID::Nil()); } })); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 8bb72860187cc..fd84b55c529d8 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -200,7 +200,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param job_id The optional job to push errors to if the writes fail. void MarkObjectsAsFailed(const ErrorType &error_type, const std::vector object_ids, - const JobID *job_id = nullptr); + const JobID &job_id); /// This is similar to TreatTaskAsFailed, but it will only mark the task as /// failed if at least one of the task's return values is lost. A return /// value is lost if it has been created before, but no longer exists on any From efd36795c8f41e87c1fb690362563829e4d7b31c Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 18:00:03 -0700 Subject: [PATCH 08/15] comment 2 --- src/ray/raylet/node_manager.cc | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7cb22645d0124..da2872b338648 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1405,8 +1405,7 @@ bool NodeManager::CheckDependencyManagerInvariant() const { return true; } -void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type, - const ObjectID *required_object_id) { +void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type) { const TaskSpecification &spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed because of error " << ErrorType_Name(error_type) << "."; @@ -1426,13 +1425,8 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ } // Determine which IDs should be marked as failed. std::vector objects_to_fail; - if (num_returns == 0 && required_object_id != nullptr) { - // This may be a driver task with no returns - objects_to_fail.push_back(required_object_id->ToPlasmaId()); - } else { - for (int64_t i = 0; i < num_returns; i++) { - objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId()); - } + for (int64_t i = 0; i < num_returns; i++) { + objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId()); } const JobID job_id = task.GetTaskSpecification().JobId(); MarkObjectsAsFailed(error_type, objects_to_fail, job_id); @@ -2128,7 +2122,9 @@ void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( task.GetTaskSpecification().JobId(), type, error_message.str(), current_time_ms())); - TreatTaskAsFailed(task, ErrorType::OBJECT_UNRECONSTRUCTABLE, &required_object_id); + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, + {required_object_id.ToPlasmaId()}, + task.GetTaskSpecification().JobId()); return; } From 8e5e6d6e1e7e2ee224905c4fbc10af6b9b47fe80 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 18:01:04 -0700 Subject: [PATCH 09/15] h file --- src/ray/raylet/node_manager.h | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index fd84b55c529d8..27049100aebe5 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -187,12 +187,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// /// \param task The task to fail. /// \param error_type The type of the error that caused this task to fail. - /// \param required_object_id The object id that we should write the error - /// message into. This is used in case the task has no outputs (e.g., - /// it is the driver task). /// \return Void. - void TreatTaskAsFailed(const Task &task, const ErrorType &error_type, - const ObjectID *required_object_id = nullptr); + void TreatTaskAsFailed(const Task &task, const ErrorType &error_type); /// Mark the specified objects as failed with the given error type. /// /// \param error_type The type of the error that caused this task to fail. From a25f7b708fcb6ccb88b83edd6ca2f79f6c7d6b8c Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 18 Jul 2019 18:08:35 -0700 Subject: [PATCH 10/15] revert --- src/ray/raylet/task_dependency_manager_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index d6a578eea8f2b..86136e201cdd6 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -71,7 +71,7 @@ static inline Task ExampleTask(const std::vector &arguments, uint64_t num_returns) { TaskSpecBuilder builder; builder.SetCommonTaskSpec(Language::PYTHON, {"", "", ""}, JobID::Nil(), - TaskID::FromRandom(), 0, num_returns, {}, {}, {}); + TaskID::FromRandom(), 0, num_returns, {}, {}); for (const auto &arg : arguments) { builder.AddByRefArg(arg); } From e4d98162285da2dd60fada35235bfa6349529668 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 22 Jul 2019 19:18:17 -0700 Subject: [PATCH 11/15] fix test --- python/ray/tests/test_actor.py | 2 +- python/ray/tests/test_failure.py | 2 +- python/ray/tests/test_stress.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 0babe563d3d92..a66472bc4c93e 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -1995,7 +1995,7 @@ def method(self): actor2s = [Actor2.remote() for _ in range(2)] results = [a.method.remote() for a in actor2s] ready_ids, remaining_ids = ray.wait( - results, num_returns=len(results), timeout=1.0) + results, num_returns=len(results), timeout=5.0) assert len(ready_ids) == 1 diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index a560e461f7a21..8bf6f9317e909 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -688,7 +688,7 @@ def sleep_to_kill_raylet(): thread = threading.Thread(target=sleep_to_kill_raylet) thread.start() - with pytest.raises(Exception, match=r".*Connection closed unexpectedly.*"): + with pytest.raises(ray.exceptions.UnreconstructableError): ray.get(nonexistent_id) thread.join() diff --git a/python/ray/tests/test_stress.py b/python/ray/tests/test_stress.py index 85105df428dbc..bec97feccc1a6 100644 --- a/python/ray/tests/test_stress.py +++ b/python/ray/tests/test_stress.py @@ -524,6 +524,7 @@ def error_check(errors): errors = wait_for_errors(error_check) assert all(error["type"] == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR + or "ray.exceptions.UnreconstructableError" in error["message"] for error in errors) From 25c862c077fc57148154051454d4d25d81ceaacb Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 23 Jul 2019 12:36:07 -0700 Subject: [PATCH 12/15] fix --- src/ray/raylet/node_manager.cc | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index afebcbe99392e..ea289c90f6455 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1393,13 +1393,6 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ } const JobID job_id = task.GetTaskSpecification().JobId(); MarkObjectsAsFailed(error_type, objects_to_fail, job_id); - task_dependency_manager_.TaskCanceled(spec.TaskId()); - // Notify the task dependency manager that we no longer need this task's - // object dependencies. TODO(swang): Ideally, we would check the return value - // here. However, we don't know at this point if the task was in the WAITING - // or READY queue before, in which case we would not have been subscribed to - // its dependencies. - task_dependency_manager_.UnsubscribeDependencies(spec.TaskId()); } void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, From 9c61b41ed0fb14d4346c4fa3fc0bfbf52c0cb282 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 23 Jul 2019 14:38:26 -0700 Subject: [PATCH 13/15] fix merge --- src/ray/raylet/node_manager.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ea289c90f6455..908ea666c2e7a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1393,6 +1393,13 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ } const JobID job_id = task.GetTaskSpecification().JobId(); MarkObjectsAsFailed(error_type, objects_to_fail, job_id); + task_dependency_manager_.TaskCanceled(spec.TaskId()); + // Notify the task dependency manager that we no longer need this task's + // object dependencies. TODO(swang): Ideally, we would check the return value + // here. However, we don't know at this point if the task was in the WAITING + // or READY queue before, in which case we would not have been subscribed to + // its dependencies. + task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId()); } void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, @@ -1414,13 +1421,6 @@ void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, job_id, "task", error_message, current_time_ms())); } } - task_dependency_manager_.TaskCanceled(spec.TaskId()); - // Notify the task dependency manager that we no longer need this task's - // object dependencies. TODO(swang): Ideally, we would check the return value - // here. However, we don't know at this point if the task was in the WAITING - // or READY queue before, in which case we would not have been subscribed to - // its dependencies. - task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId()); } void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { From 75d341d4639fed4dcf7d6d10faa8e79ca808f741 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 23 Jul 2019 14:54:08 -0700 Subject: [PATCH 14/15] py3 --- python/ray/tests/test_stress.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/tests/test_stress.py b/python/ray/tests/test_stress.py index bec97feccc1a6..35cdac1c1f75f 100644 --- a/python/ray/tests/test_stress.py +++ b/python/ray/tests/test_stress.py @@ -6,6 +6,7 @@ import numpy as np import os import pytest +import sys import time import ray @@ -479,6 +480,8 @@ def error_check(errors): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") +@pytest.mark.skipif( + sys.version_info < (3, 0), reason="This test requires Python 3.") @pytest.mark.parametrize( "ray_start_object_store_memory", [10**9], indirect=True) def test_driver_put_errors(ray_start_object_store_memory): From 5cdc8ab2a5ceb65e844326f5f8f81c77724c38cd Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 23 Jul 2019 16:55:27 -0700 Subject: [PATCH 15/15] fix flaky tests --- python/ray/tests/test_actor.py | 2 +- python/ray/tests/test_unreconstructable_errors.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 26f269578b84b..24c3f0a31bdaf 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -1306,7 +1306,7 @@ def get_gpu_id(self): def test_blocking_actor_task(shutdown_only): - ray.init(num_cpus=1, num_gpus=1) + ray.init(num_cpus=1, num_gpus=1, object_store_memory=int(10**8)) @ray.remote(num_gpus=1) def f(): diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py index bfc05a914d591..5a73e9adc04e7 100644 --- a/python/ray/tests/test_unreconstructable_errors.py +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -30,8 +30,8 @@ def f(data): x_id = f.remote(None) ray.get(x_id) - for _ in range(200): - ray.get([f.remote(np.zeros(10000)) for _ in range(100)]) + for _ in range(400): + ray.get([f.remote(np.zeros(10000)) for _ in range(50)]) self.assertRaises(ray.exceptions.UnreconstructableError, lambda: ray.get(x_id))