diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 5ad05e800912e..24c3f0a31bdaf 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -1306,7 +1306,7 @@ def get_gpu_id(self): def test_blocking_actor_task(shutdown_only): - ray.init(num_cpus=1, num_gpus=1) + ray.init(num_cpus=1, num_gpus=1, object_store_memory=int(10**8)) @ray.remote(num_gpus=1) def f(): @@ -1999,7 +1999,7 @@ def method(self): actor2s = [Actor2.remote() for _ in range(2)] results = [a.method.remote() for a in actor2s] ready_ids, remaining_ids = ray.wait( - results, num_returns=len(results), timeout=1.0) + results, num_returns=len(results), timeout=5.0) assert len(ready_ids) == 1 diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index a560e461f7a21..8bf6f9317e909 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -688,7 +688,7 @@ def sleep_to_kill_raylet(): thread = threading.Thread(target=sleep_to_kill_raylet) thread.start() - with pytest.raises(Exception, match=r".*Connection closed unexpectedly.*"): + with pytest.raises(ray.exceptions.UnreconstructableError): ray.get(nonexistent_id) thread.join() diff --git a/python/ray/tests/test_stress.py b/python/ray/tests/test_stress.py index 85105df428dbc..35cdac1c1f75f 100644 --- a/python/ray/tests/test_stress.py +++ b/python/ray/tests/test_stress.py @@ -6,6 +6,7 @@ import numpy as np import os import pytest +import sys import time import ray @@ -479,6 +480,8 @@ def error_check(errors): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") +@pytest.mark.skipif( + sys.version_info < (3, 0), reason="This test requires Python 3.") @pytest.mark.parametrize( "ray_start_object_store_memory", [10**9], indirect=True) def test_driver_put_errors(ray_start_object_store_memory): @@ -524,6 +527,7 @@ def error_check(errors): errors = wait_for_errors(error_check) assert all(error["type"] == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR + or "ray.exceptions.UnreconstructableError" in error["message"] for error in errors) diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py new file mode 100644 index 0000000000000..5a73e9adc04e7 --- /dev/null +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -0,0 +1,40 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import unittest + +import ray + + +class TestUnreconstructableErrors(unittest.TestCase): + def setUp(self): + ray.init(object_store_memory=10000000, redis_max_memory=10000000) + + def tearDown(self): + ray.shutdown() + + def testDriverPutEvictedCannotReconstruct(self): + x_id = ray.put(np.zeros(1 * 1024 * 1024)) + ray.get(x_id) + for _ in range(10): + ray.put(np.zeros(1 * 1024 * 1024)) + self.assertRaises(ray.exceptions.UnreconstructableError, + lambda: ray.get(x_id)) + + def testLineageEvictedReconstructionFails(self): + @ray.remote + def f(data): + return 0 + + x_id = f.remote(None) + ray.get(x_id) + for _ in range(400): + ray.get([f.remote(np.zeros(10000)) for _ in range(50)]) + self.assertRaises(ray.exceptions.UnreconstructableError, + lambda: ray.get(x_id)) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2286d7fb93717..908ea666c2e7a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -88,7 +88,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, scheduling_policy_(local_queues_), reconstruction_policy_( io_service_, - [this](const TaskID &task_id) { HandleTaskReconstruction(task_id); }, + [this](const TaskID &task_id, const ObjectID &required_object_id) { + HandleTaskReconstruction(task_id, required_object_id); + }, RayConfig::instance().initial_reconstruction_timeout_milliseconds(), gcs_client_->client_table().GetLocalClientId(), gcs_client_->task_lease_table(), object_directory_, gcs_client_->task_reconstruction_log()), @@ -1384,9 +1386,27 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ // information about the TaskSpecification implementation. num_returns -= 1; } - const std::string meta = std::to_string(static_cast(error_type)); + // Determine which IDs should be marked as failed. + std::vector objects_to_fail; for (int64_t i = 0; i < num_returns; i++) { - const auto object_id = spec.ReturnId(i).ToPlasmaId(); + objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId()); + } + const JobID job_id = task.GetTaskSpecification().JobId(); + MarkObjectsAsFailed(error_type, objects_to_fail, job_id); + task_dependency_manager_.TaskCanceled(spec.TaskId()); + // Notify the task dependency manager that we no longer need this task's + // object dependencies. TODO(swang): Ideally, we would check the return value + // here. However, we don't know at this point if the task was in the WAITING + // or READY queue before, in which case we would not have been subscribed to + // its dependencies. + task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId()); +} + +void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, + const std::vector objects_to_fail, + const JobID &job_id) { + const std::string meta = std::to_string(static_cast(error_type)); + for (const auto &object_id : objects_to_fail) { arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta); if (!status.ok() && !plasma::IsPlasmaObjectExists(status)) { // If we failed to save the error code, log a warning and push an error message @@ -1398,16 +1418,9 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ std::string error_message = stream.str(); RAY_LOG(WARNING) << error_message; RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( - task.GetTaskSpecification().JobId(), "task", error_message, current_time_ms())); + job_id, "task", error_message, current_time_ms())); } } - task_dependency_manager_.TaskCanceled(spec.TaskId()); - // Notify the task dependency manager that we no longer need this task's - // object dependencies. TODO(swang): Ideally, we would check the return value - // here. However, we don't know at this point if the task was in the WAITING - // or READY queue before, in which case we would not have been subscribed to - // its dependencies. - task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId()); } void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { @@ -2024,34 +2037,41 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id } } -void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { +void NodeManager::HandleTaskReconstruction(const TaskID &task_id, + const ObjectID &required_object_id) { // Retrieve the task spec in order to re-execute the task. RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( JobID::Nil(), task_id, /*success_callback=*/ - [this](ray::gcs::RedisGcsClient *client, const TaskID &task_id, - const TaskTableData &task_data) { + [this, required_object_id](ray::gcs::RedisGcsClient *client, const TaskID &task_id, + const TaskTableData &task_data) { // The task was in the GCS task table. Use the stored task spec to // re-execute the task. - ResubmitTask(Task(task_data.task())); + ResubmitTask(Task(task_data.task()), required_object_id); }, /*failure_callback=*/ - [this](ray::gcs::RedisGcsClient *client, const TaskID &task_id) { + [this, required_object_id](ray::gcs::RedisGcsClient *client, + const TaskID &task_id) { // The task was not in the GCS task table. It must therefore be in the // lineage cache. - RAY_CHECK(lineage_cache_.ContainsTask(task_id)) - << "Metadata of task " << task_id - << " not found in either GCS or lineage cache. It may have been evicted " - << "by the redis LRU configuration. Consider increasing the memory " - "allocation via " - << "ray.init(redis_max_memory=)."; - // Use a copy of the cached task spec to re-execute the task. - const Task task = lineage_cache_.GetTaskOrDie(task_id); - ResubmitTask(task); + if (lineage_cache_.ContainsTask(task_id)) { + // Use a copy of the cached task spec to re-execute the task. + const Task task = lineage_cache_.GetTaskOrDie(task_id); + ResubmitTask(task, required_object_id); + } else { + RAY_LOG(WARNING) + << "Metadata of task " << task_id + << " not found in either GCS or lineage cache. It may have been evicted " + << "by the redis LRU configuration. Consider increasing the memory " + "allocation via " + << "ray.init(redis_max_memory=)."; + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, + {required_object_id.ToPlasmaId()}, JobID::Nil()); + } })); } -void NodeManager::ResubmitTask(const Task &task) { +void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object_id) { RAY_LOG(DEBUG) << "Attempting to resubmit task " << task.GetTaskSpecification().TaskId(); @@ -2082,6 +2102,9 @@ void NodeManager::ResubmitTask(const Task &task) { RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( task.GetTaskSpecification().JobId(), type, error_message.str(), current_time_ms())); + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, + {required_object_id.ToPlasmaId()}, + task.GetTaskSpecification().JobId()); return; } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 881461dd061cd..19dc682c3c4da 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -189,6 +189,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param error_type The type of the error that caused this task to fail. /// \return Void. void TreatTaskAsFailed(const Task &task, const ErrorType &error_type); + /// Mark the specified objects as failed with the given error type. + /// + /// \param error_type The type of the error that caused this task to fail. + /// \param object_ids The object ids to store error messages into. + /// \param job_id The optional job to push errors to if the writes fail. + void MarkObjectsAsFailed(const ErrorType &error_type, + const std::vector object_ids, + const JobID &job_id); /// This is similar to TreatTaskAsFailed, but it will only mark the task as /// failed if at least one of the task's return values is lost. A return /// value is lost if it has been created before, but no longer exists on any @@ -257,14 +265,17 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Handle a task whose return value(s) must be reconstructed. /// /// \param task_id The relevant task ID. + /// \param required_object_id The object id we are reconstructing for. /// \return Void. - void HandleTaskReconstruction(const TaskID &task_id); + void HandleTaskReconstruction(const TaskID &task_id, + const ObjectID &required_object_id); /// Resubmit a task for execution. This is a task that was previously already /// submitted to a raylet but which must now be re-executed. /// /// \param task The task being resubmitted. + /// \param required_object_id The object id that triggered the resubmission. /// \return Void. - void ResubmitTask(const Task &task); + void ResubmitTask(const Task &task, const ObjectID &required_object_id); /// Attempt to forward a task to a remote different node manager. If this /// fails, the task will be resubmit locally. /// diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index ed4cc7c9705fc..23551b4295a10 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -8,7 +8,7 @@ namespace raylet { ReconstructionPolicy::ReconstructionPolicy( boost::asio::io_service &io_service, - std::function reconstruction_handler, + std::function reconstruction_handler, int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, gcs::PubsubInterface &task_lease_pubsub, std::shared_ptr object_directory, @@ -63,8 +63,8 @@ void ReconstructionPolicy::SetTaskTimeout( }); } -void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id, - bool success) { +void ReconstructionPolicy::HandleReconstructionLogAppend( + const TaskID &task_id, const ObjectID &required_object_id, bool success) { auto it = listening_tasks_.find(task_id); if (it == listening_tasks_.end()) { return; @@ -76,7 +76,7 @@ void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id, SetTaskTimeout(it, initial_reconstruction_timeout_ms_); if (success) { - reconstruction_handler_(task_id); + reconstruction_handler_(task_id, required_object_id); } } @@ -112,14 +112,14 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id, RAY_CHECK_OK(task_reconstruction_log_.AppendAt( JobID::Nil(), task_id, reconstruction_entry, /*success_callback=*/ - [this](gcs::RedisGcsClient *client, const TaskID &task_id, - const TaskReconstructionData &data) { - HandleReconstructionLogAppend(task_id, /*success=*/true); + [this, required_object_id](gcs::RedisGcsClient *client, const TaskID &task_id, + const TaskReconstructionData &data) { + HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/true); }, /*failure_callback=*/ - [this](gcs::RedisGcsClient *client, const TaskID &task_id, - const TaskReconstructionData &data) { - HandleReconstructionLogAppend(task_id, /*success=*/false); + [this, required_object_id](gcs::RedisGcsClient *client, const TaskID &task_id, + const TaskReconstructionData &data) { + HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/false); }, reconstruction_attempt)); diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index a194443e14258..a44819dca5bc1 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -42,7 +42,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// lease notifications from. ReconstructionPolicy( boost::asio::io_service &io_service, - std::function reconstruction_handler, + std::function reconstruction_handler, int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, gcs::PubsubInterface &task_lease_pubsub, std::shared_ptr object_directory, @@ -127,12 +127,13 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// Handle the response for an attempt at adding an entry to the task /// reconstruction log. - void HandleReconstructionLogAppend(const TaskID &task_id, bool success); + void HandleReconstructionLogAppend(const TaskID &task_id, const ObjectID &object_id, + bool success); /// The event loop. boost::asio::io_service &io_service_; /// The handler to call for tasks that require reconstruction. - const std::function reconstruction_handler_; + const std::function reconstruction_handler_; /// The initial timeout within which a task lease notification must be /// received. Otherwise, reconstruction will be triggered. const int64_t initial_reconstruction_timeout_ms_; diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 02554399e7c6b..397eb81aa65e1 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -155,7 +155,9 @@ class ReconstructionPolicyTest : public ::testing::Test { reconstruction_timeout_ms_(50), reconstruction_policy_(std::make_shared( io_service_, - [this](const TaskID &task_id) { TriggerReconstruction(task_id); }, + [this](const TaskID &task_id, const ObjectID &obj) { + TriggerReconstruction(task_id); + }, reconstruction_timeout_ms_, ClientID::FromRandom(), mock_gcs_, mock_object_directory_, mock_gcs_)), timer_canceled_(false) {