Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two types of eviction hangs #5225

Merged
merged 18 commits into from
Jul 24, 2019
42 changes: 42 additions & 0 deletions python/ray/tests/test_unreconstructable_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import unittest

import ray


class TestUnreconstructableErrors(unittest.TestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to put these in a new file? Maybe they should go in test_failure.py instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that test file too large already?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'd prefer we just put it there now and reorganize that whole file later, but up to you.

def setUp(self):
ray.init(object_store_memory=10000000, redis_max_memory=10000000)

def tearDown(self):
ray.shutdown()

def testDriverPutEvictedCannotReconstruct(self):
x_id = ray.put(np.zeros(1 * 1024 * 1024))
ray.get(x_id)
for _ in range(10):
ray.put(np.zeros(1 * 1024 * 1024))
self.assertRaises(ray.exceptions.UnreconstructableError,
lambda: ray.get(x_id))

def testLineageEvictedReconstructionFails(self):
@ray.remote
def f(data):
return 0

x_id = f.remote(None)
ray.get(x_id)
for i in range(200):
if i % 100 == 0:
print("launching round of tasks to evict lineage", i)
ericl marked this conversation as resolved.
Show resolved Hide resolved
ray.get([f.remote(np.zeros(10000)) for _ in range(100)])
self.assertRaises(ray.exceptions.UnreconstructableError,
lambda: ray.get(x_id))


if __name__ == "__main__":
unittest.main(verbosity=2)
84 changes: 56 additions & 28 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_,
[this](const TaskID &task_id) { HandleTaskReconstruction(task_id); },
[this](const TaskID &task_id, const ObjectID &required_object_id) {
HandleTaskReconstruction(task_id, required_object_id);
},
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
gcs_client_->client_table().GetLocalClientId(), gcs_client_->task_lease_table(),
object_directory_, gcs_client_->task_reconstruction_log()),
Expand Down Expand Up @@ -1403,7 +1405,8 @@ bool NodeManager::CheckDependencyManagerInvariant() const {
return true;
}

void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type) {
void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type,
const ObjectID *required_object_id) {
const TaskSpecification &spec = task.GetTaskSpecification();
RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed because of error "
<< ErrorType_Name(error_type) << ".";
Expand All @@ -1421,9 +1424,31 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ
// information about the TaskSpecification implementation.
num_returns -= 1;
}
// Determine which IDs should be marked as failed.
std::vector<plasma::ObjectID> objects_to_fail;
if (num_returns == 0 && required_object_id != nullptr) {
objects_to_fail.push_back(required_object_id->ToPlasmaId());
ericl marked this conversation as resolved.
Show resolved Hide resolved
} else {
for (int64_t i = 0; i < num_returns; i++) {
objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId());
}
}
const JobID job_id = task.GetTaskSpecification().JobId();
MarkObjectsAsFailed(error_type, objects_to_fail, &job_id);
task_dependency_manager_.TaskCanceled(spec.TaskId());
// Notify the task dependency manager that we no longer need this task's
// object dependencies. TODO(swang): Ideally, we would check the return value
// here. However, we don't know at this point if the task was in the WAITING
// or READY queue before, in which case we would not have been subscribed to
// its dependencies.
task_dependency_manager_.UnsubscribeDependencies(spec.TaskId());
}

void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type,
const std::vector<plasma::ObjectID> objects_to_fail,
const JobID *job_id) {
ericl marked this conversation as resolved.
Show resolved Hide resolved
const std::string meta = std::to_string(static_cast<int>(error_type));
for (int64_t i = 0; i < num_returns; i++) {
const auto object_id = spec.ReturnId(i).ToPlasmaId();
for (const auto &object_id : objects_to_fail) {
arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta);
if (!status.ok() && !plasma::IsPlasmaObjectExists(status)) {
// If we failed to save the error code, log a warning and push an error message
Expand All @@ -1434,17 +1459,12 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ
<< " object may hang forever.";
std::string error_message = stream.str();
RAY_LOG(WARNING) << error_message;
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
task.GetTaskSpecification().JobId(), "task", error_message, current_time_ms()));
if (job_id != nullptr) {
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
*job_id, "task", error_message, current_time_ms()));
}
}
}
task_dependency_manager_.TaskCanceled(spec.TaskId());
// Notify the task dependency manager that we no longer need this task's
// object dependencies. TODO(swang): Ideally, we would check the return value
// here. However, we don't know at this point if the task was in the WAITING
// or READY queue before, in which case we would not have been subscribed to
// its dependencies.
task_dependency_manager_.UnsubscribeDependencies(spec.TaskId());
}

void NodeManager::TreatTaskAsFailedIfLost(const Task &task) {
Expand Down Expand Up @@ -2044,34 +2064,41 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
}
}

void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
void NodeManager::HandleTaskReconstruction(const TaskID &task_id,
const ObjectID &required_object_id) {
// Retrieve the task spec in order to re-execute the task.
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup(
JobID::Nil(), task_id,
/*success_callback=*/
[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskTableData &task_data) {
[this, required_object_id](ray::gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskTableData &task_data) {
// The task was in the GCS task table. Use the stored task spec to
// re-execute the task.
ResubmitTask(Task(task_data.task()));
ResubmitTask(Task(task_data.task()), required_object_id);
},
/*failure_callback=*/
[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) {
[this, required_object_id](ray::gcs::AsyncGcsClient *client,
const TaskID &task_id) {
// The task was not in the GCS task table. It must therefore be in the
// lineage cache.
RAY_CHECK(lineage_cache_.ContainsTask(task_id))
<< "Metadata of task " << task_id
<< " not found in either GCS or lineage cache. It may have been evicted "
<< "by the redis LRU configuration. Consider increasing the memory "
"allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>).";
// Use a copy of the cached task spec to re-execute the task.
const Task task = lineage_cache_.GetTaskOrDie(task_id);
ResubmitTask(task);
if (lineage_cache_.ContainsTask(task_id)) {
// Use a copy of the cached task spec to re-execute the task.
const Task task = lineage_cache_.GetTaskOrDie(task_id);
ResubmitTask(task, required_object_id);
} else {
RAY_LOG(WARNING)
<< "Metadata of task " << task_id
<< " not found in either GCS or lineage cache. It may have been evicted "
<< "by the redis LRU configuration. Consider increasing the memory "
"allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>).";
MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE,
{required_object_id.ToPlasmaId()});
}
}));
}

void NodeManager::ResubmitTask(const Task &task) {
void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object_id) {
RAY_LOG(DEBUG) << "Attempting to resubmit task "
<< task.GetTaskSpecification().TaskId();

Expand Down Expand Up @@ -2102,6 +2129,7 @@ void NodeManager::ResubmitTask(const Task &task) {
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
task.GetTaskSpecification().JobId(), type, error_message.str(),
current_time_ms()));
TreatTaskAsFailed(task, ErrorType::OBJECT_UNRECONSTRUCTABLE, &required_object_id);
ericl marked this conversation as resolved.
Show resolved Hide resolved
return;
}

Expand Down
21 changes: 18 additions & 3 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,20 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
///
/// \param task The task to fail.
/// \param error_type The type of the error that caused this task to fail.
/// \param required_object_id The object id that we should write the error
/// message into. This is used in case the task has no outputs (e.g.,
/// it is the driver task).
/// \return Void.
void TreatTaskAsFailed(const Task &task, const ErrorType &error_type);
void TreatTaskAsFailed(const Task &task, const ErrorType &error_type,
const ObjectID *required_object_id = nullptr);
/// Mark the specified objects as failed with the given error type.
///
/// \param error_type The type of the error that caused this task to fail.
/// \param object_ids The object ids to store error messages into.
/// \param job_id The optional job to push errors to if the writes fail.
void MarkObjectsAsFailed(const ErrorType &error_type,
const std::vector<plasma::ObjectID> object_ids,
const JobID *job_id = nullptr);
ericl marked this conversation as resolved.
Show resolved Hide resolved
/// This is similar to TreatTaskAsFailed, but it will only mark the task as
/// failed if at least one of the task's return values is lost. A return
/// value is lost if it has been created before, but no longer exists on any
Expand Down Expand Up @@ -257,14 +269,17 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Handle a task whose return value(s) must be reconstructed.
///
/// \param task_id The relevant task ID.
/// \param required_object_id The object id we are reconstructing for.
/// \return Void.
void HandleTaskReconstruction(const TaskID &task_id);
void HandleTaskReconstruction(const TaskID &task_id,
const ObjectID &required_object_id);
/// Resubmit a task for execution. This is a task that was previously already
/// submitted to a raylet but which must now be re-executed.
///
/// \param task The task being resubmitted.
/// \param required_object_id The object id that triggered the resubmission.
/// \return Void.
void ResubmitTask(const Task &task);
void ResubmitTask(const Task &task, const ObjectID &required_object_id);
/// Attempt to forward a task to a remote different node manager. If this
/// fails, the task will be resubmit locally.
///
Expand Down
20 changes: 10 additions & 10 deletions src/ray/raylet/reconstruction_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace raylet {

ReconstructionPolicy::ReconstructionPolicy(
boost::asio::io_service &io_service,
std::function<void(const TaskID &)> reconstruction_handler,
std::function<void(const TaskID &, const ObjectID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms, const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory,
Expand Down Expand Up @@ -63,8 +63,8 @@ void ReconstructionPolicy::SetTaskTimeout(
});
}

void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id,
bool success) {
void ReconstructionPolicy::HandleReconstructionLogAppend(
const TaskID &task_id, const ObjectID &required_object_id, bool success) {
auto it = listening_tasks_.find(task_id);
if (it == listening_tasks_.end()) {
return;
Expand All @@ -76,7 +76,7 @@ void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id,
SetTaskTimeout(it, initial_reconstruction_timeout_ms_);

if (success) {
reconstruction_handler_(task_id);
reconstruction_handler_(task_id, required_object_id);
}
}

Expand Down Expand Up @@ -112,14 +112,14 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id,
RAY_CHECK_OK(task_reconstruction_log_.AppendAt(
JobID::Nil(), task_id, reconstruction_entry,
/*success_callback=*/
[this](gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskReconstructionData &data) {
HandleReconstructionLogAppend(task_id, /*success=*/true);
[this, required_object_id](gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskReconstructionData &data) {
HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/true);
},
/*failure_callback=*/
[this](gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskReconstructionData &data) {
HandleReconstructionLogAppend(task_id, /*success=*/false);
[this, required_object_id](gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskReconstructionData &data) {
HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/false);
},
reconstruction_attempt));

Expand Down
7 changes: 4 additions & 3 deletions src/ray/raylet/reconstruction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface {
/// lease notifications from.
ReconstructionPolicy(
boost::asio::io_service &io_service,
std::function<void(const TaskID &)> reconstruction_handler,
std::function<void(const TaskID &, const ObjectID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms, const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory,
Expand Down Expand Up @@ -127,12 +127,13 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface {

/// Handle the response for an attempt at adding an entry to the task
/// reconstruction log.
void HandleReconstructionLogAppend(const TaskID &task_id, bool success);
void HandleReconstructionLogAppend(const TaskID &task_id, const ObjectID &object_id,
bool success);

/// The event loop.
boost::asio::io_service &io_service_;
/// The handler to call for tasks that require reconstruction.
const std::function<void(const TaskID &)> reconstruction_handler_;
const std::function<void(const TaskID &, const ObjectID &)> reconstruction_handler_;
/// The initial timeout within which a task lease notification must be
/// received. Otherwise, reconstruction will be triggered.
const int64_t initial_reconstruction_timeout_ms_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/task_dependency_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
uint64_t num_returns) {
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(Language::PYTHON, {"", "", ""}, JobID::Nil(),
TaskID::FromRandom(), 0, num_returns, {}, {});
TaskID::FromRandom(), 0, num_returns, {}, {}, {});
ericl marked this conversation as resolved.
Show resolved Hide resolved
for (const auto &arg : arguments) {
builder.AddByRefArg(arg);
}
Expand Down