Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw exception for ray.get of an evicted actor object #3490

Merged
merged 12 commits into from
Dec 14, 2018

Conversation

stephanie-wang
Copy link
Contributor

What do these changes do?

Since actors have state, objects created by an earlier actor method that have been evicted cannot be reconstructed without rolling back the actor. This PR treats such tasks as failed so that the frontend can catch the error, instead of hanging.

Related issue number

#3452 and potentially others that involve actors and a limited amount of object store memory.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9831/
Test FAILed.

if (task.GetTaskSpecification().IsActorTask()) {
// Actor reconstruction is turned off by default right now.
const ActorID actor_id = task.GetTaskSpecification().ActorId();
auto it = actor_registry_.find(actor_id);
RAY_CHECK(it != actor_registry_.end());
if (it->second.IsAlive()) {
// Only treat the task as failed if its output has been evicted.
// Otherwise, this must be a spurious reconstruction.
if (return_values_lost) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if we unconditionally treated the task as failed? It seems like it would simplify the code a lot to not have to track whether the return values were lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code used to do what you're saying, but it was causing spurious RayGetErrors that we fixed in #3359.

One option that I was thinking we could do is to check whether the task has already been executed using the task counters for the actor, then treat the task as failed if that check passes. To be safe, we could also check whether the return values exist on any nodes, but we wouldn't need the extra logic in this PR to check whether the return values were evicted. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable, so it would just involve checking the locations table after the task count check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually, it may still make sense to have the check that the object is evicted. Since GCS writes are asynchronous, the locations entry might be empty even when some node does have the object.

std::unordered_set<ClientID> &client_ids,
const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table) {
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document this function. In particular the fact that we have output arguments.

const ray::gcs::ClientTable &client_table) {
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids, bool *created) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slight preference for has_been_created over created

UpdateObjectLocations(object_id_listener_pair->second.current_object_locations,
location_history, gcs_client_->client_table());
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations, &it->second.created);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be clearer to do

std::vector<ClientID> current_object_locations;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
                      &current_object_locations, 
                      &it->second.created);
it->second.current_object_locations = current_object_locations;

The reason is that the current implementation makes it seem like the past object locations are relevant, but the prior value of it->second.current_object_locations are completely irrelevant (even though this leads to the same output).

Alternatively, It'd be ok to call

it->second.current_object_locations.clear();

first.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the other place where we call UpdateObjectLocations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm going to move to what Eric and I discussed above about just checking if the object is lost if it's a duplicate, but just to be clear, the past object locations are relevant since UpdateObjectLocations processes a subset of the log, not necessarily the whole log.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, ok, just for my own clarification, the location_history argument in the object notification callback to the object table subscribe function contains the full history of all object table updates for that object ID, right?

If that's true, then it looks to me like `UpdateObjectLocations processes the full log, am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's only the new object table updates. So the first notification will contain the full history, but subsequent notifications will only contain a subset.

@ericl
Copy link
Contributor

ericl commented Dec 11, 2018

Isn't it also possible that the eviction message was lost? Perhaps we can look for the object for a timeout before giving up?

I would also be fine raising an error on that race condition. An object being evicted locally and available on a remote node seems unlikely.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9964/
Test FAILed.

@stephanie-wang
Copy link
Contributor Author

Yeah, but eventually either the eviction notice will go through or the node will be marked as dead. Either way, reconstruction will get triggered again on a timeout and eventually the task will be marked as failed.

return np.random.rand(size)

object_store_memory = 10**8
ray.worker._init(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can actually be ray.init.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you remove the start_ray_local arg

// Use a shared flag to make sure that we only treat the task as failed at
// most once. This flag will get deallocated once all of the object table
// lookup callbacks are fired.
auto mark_task_failed = std::make_shared<bool>(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearer to call it task_marked_failed

void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids,
bool *has_been_created) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unfortunately doesn't seem to play too nicely with #3499 because sometimes we evict the keys so they will appear to have never been created. cc @ericl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's true...I can't really think of a foolproof way around that except to fail the object after some number of attempts.

///
/// \param task The task to potentially fail.
/// \return Void.
void TreatLostTaskAsFailed(const Task &task);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe TreatTaskAsFailedIfLost?

ObjectManager::ObjectManager(asio::io_service &main_service,
const ObjectManagerConfig &config,
std::unique_ptr<ObjectDirectoryInterface> od)
std::shared_ptr<ObjectDirectoryInterface> od)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think od -> object_directory

@ericl
Copy link
Contributor

ericl commented Dec 11, 2018

Ok, that makes sense then!

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9968/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9992/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/9997/
Test PASSed.

@@ -2142,3 +2142,45 @@ def method(self):
ray.wait([object_id])

ray.get(results)


def test_actor_eviction(shutdown_only):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test can be simplified as the following:

1. submit a task to an actor.
2. use `ray.internal.free` to evict the object.
3. call `ray.get` on the object and assert that an error is raised. 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll try that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this actually doesn't seem to work, I think due to asynchrony between the raylet and the object store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I forgot to mention that before step 3, we need to wait until the object is really removed from object store. you can do that by keep reading the id from object_store_client until you get an error saying the object doesn't exist.

@raulchen
Copy link
Contributor

I've also implemented this feature in our internal code base. A few comments and questions:

  1. I use a new exception type UnreconstructableException to let users know this task has finished successfully before but its result were lost and cannot be reconstructed now. It'd be useful to distinguish this case with the case where the task failed or the case where actor died. (In all these cases, users get RayGetError.) Because users may want to make different decisions (ignore/retry/fail) based the cases. (BTW, I implemented this exception by writing a special value in object's metadata.)

  2. When I mark the objects as unreconstructable, I only check that this object doesn't exit on any node now, but don't check that this object was never created before. It seems to me that the has_been_created check isn't necessary. Because when CheckDuplicateActorTask fails, the object must have been created before, right?

@stephanie-wang
Copy link
Contributor Author

Thanks for the comments @raulchen.

  1. This would be super useful to do and we've actually been meaning to do that for a while now. Would you be able to open a PR for that?
  2. Because GCS writes are asynchronous, there is actually a chance that the locations of the task's output values haven't been written to the object table yet even though the task has been executed. Although now that I think about it, there is probably a way to rely on the ordering of commands per GCS connection to make sure this doesn't happen. We may have to revisit this once GCS flushing is more stable, since evicted objects will appear to have never been created.

@raulchen
Copy link
Contributor

Thanks for the comments @raulchen.

  1. This would be super useful to do and we've actually been meaning to do that for a while now. Would you be able to open a PR for that?
  2. Because GCS writes are asynchronous, there is actually a chance that the locations of the task's output values haven't been written to the object table yet even though the task has been executed. Although now that I think about it, there is probably a way to rely on the ordering of commands per GCS connection to make sure this doesn't happen. We may have to revisit this once GCS flushing is more stable, since evicted objects will appear to have never been created.

For 1: sure, I can do this after this PR is merged.
For 2: Is it possible to make sure that a node doesn't release task lease until object locations are written to GCS?

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10045/
Test FAILed.

@stephanie-wang stephanie-wang merged commit fcc3702 into ray-project:master Dec 14, 2018
@stephanie-wang stephanie-wang deleted the fix-actor-eviction branch December 14, 2018 19:41
@stephanie-wang
Copy link
Contributor Author

@raulchen, for 2, that is ideally how we would do it! Technically it is possible, but it's a little involved. One way to do it would require:

  1. Remembering which tasks have completed but whose objects haven't been added to the GCS yet.
  2. Adding a callback to the GCS object table write to clear the above data structure and cancel the task lease.

Do you want to open an issue for it? We might not get around to it soon but I think it's important to do, especially if GCS latency is high.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants