Skip to content

Commit

Permalink
Throw exception for ray.get of an evicted actor object (#3490)
Browse files Browse the repository at this point in the history
* Add a flag for whether an object has been created before

* Add regression test

* doc

* Share object directory between object and node managers

* Treat evicted actor tasks as failed

* minor

* Check return value

* Fix bug where object locations weren't getting updated on client death

* Fix mac build

* Use RayTaskError
  • Loading branch information
stephanie-wang authored Dec 14, 2018
1 parent 7fd24e3 commit fcc3702
Show file tree
Hide file tree
Showing 15 changed files with 313 additions and 124 deletions.
5 changes: 4 additions & 1 deletion src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,13 @@ void ClientTable::HandleNotification(AsyncGcsClient *client,
}
RAY_CHECK(removed_clients_.find(client_id) == removed_clients_.end());
} else {
// NOTE(swang): The client should be added to this data structure before
// the callback gets called, in case the callback depends on the data
// structure getting updated.
removed_clients_.insert(client_id);
if (client_removed_callback_ != nullptr) {
client_removed_callback_(client, client_id, data);
}
removed_clients_.insert(client_id);
}
}
}
Expand Down
124 changes: 85 additions & 39 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,43 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,

namespace {

std::vector<ClientID> UpdateObjectLocations(
std::unordered_set<ClientID> &client_ids,
const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table) {
/// Process a suffix of the object table log and store the result in
/// client_ids. This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix. This also stores a
/// bool in has_been_created indicating whether the object has ever been
/// created before.
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids,
bool *has_been_created) {
// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
// client1.is_eviction = true
// client2.is_eviction = false
// In such a scenario, we want to indicate client2 is the only client that contains
// the object, which the following code achieves.
if (!location_history.empty()) {
// If there are entries, then the object has been created. Once this flag
// is set to true, it should never go back to false.
*has_been_created = true;
}
for (const auto &object_table_data : location_history) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (!object_table_data.is_eviction) {
client_ids.insert(client_id);
client_ids->insert(client_id);
} else {
client_ids.erase(client_id);
client_ids->erase(client_id);
}
}
// Filter out the removed clients from the object locations.
for (auto it = client_ids.begin(); it != client_ids.end();) {
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids.erase(it);
it = client_ids->erase(it);
} else {
it++;
}
}
return std::vector<ClientID>(client_ids.begin(), client_ids.end());
}

} // namespace
Expand All @@ -45,26 +54,27 @@ void ObjectDirectory::RegisterBackend() {
gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Objects are added to this map in SubscribeObjectLocations.
auto object_id_listener_pair = listeners_.find(object_id);
auto it = listeners_.find(object_id);
// Do nothing for objects we are not listening for.
if (object_id_listener_pair == listeners_.end()) {
if (it == listeners_.end()) {
return;
}
// Update entries for this object.
std::vector<ClientID> client_id_vec =
UpdateObjectLocations(object_id_listener_pair->second.current_object_locations,
location_history, gcs_client_->client_table());
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations,
&it->second.has_been_created);
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
auto callbacks = object_id_listener_pair->second.callbacks;
auto callbacks = it->second.callbacks;
// Call all callbacks associated with the object id locations we have
// received. This notifies the client even if the list of locations is
// empty, since this may indicate that the objects have been evicted from
// all nodes.
for (const auto &callback_pair : callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(client_id_vec, object_id);
callback_pair.second(object_id, it->second.current_object_locations,
it->second.has_been_created);
}
};
RAY_CHECK_OK(gcs_client_->object_table().Subscribe(
Expand Down Expand Up @@ -133,28 +143,51 @@ std::vector<RemoteConnectionInfo> ObjectDirectory::LookupAllRemoteConnections()
return remote_connections;
}

void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) {
for (auto &listener : listeners_) {
const ObjectID &object_id = listener.first;
if (listener.second.current_object_locations.count(client_id) > 0) {
// If the subscribed object has the removed client as a location, update
// its locations with an empty log so that the location will be removed.
UpdateObjectLocations({}, gcs_client_->client_table(),
&listener.second.current_object_locations,
&listener.second.has_been_created);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, listener.second.current_object_locations,
listener.second.has_been_created);
}
}
}
}

ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_id,
const ObjectID &object_id,
const OnLocationsFound &callback) {
ray::Status status = ray::Status::OK();
if (listeners_.find(object_id) == listeners_.end()) {
listeners_.emplace(object_id, LocationListenerState());
auto it = listeners_.find(object_id);
if (it == listeners_.end()) {
it = listeners_.emplace(object_id, LocationListenerState()).first;
status = gcs_client_->object_table().RequestNotifications(
JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
}
auto &listener_state = listeners_.find(object_id)->second;
auto &listener_state = it->second;
// TODO(hme): Make this fatal after implementing Pull suppression.
if (listener_state.callbacks.count(callback_id) > 0) {
return ray::Status::OK();
}
listener_state.callbacks.emplace(callback_id, callback);
// Immediately notify of object locations. This notifies the client even if
// the list of locations is empty, since this may indicate that the objects
// have been evicted from all nodes.
std::vector<ClientID> client_id_vec(listener_state.current_object_locations.begin(),
listener_state.current_object_locations.end());
io_service_.post(
[callback, client_id_vec, object_id]() { callback(client_id_vec, object_id); });
// If we previously received some notifications about the object's locations,
// immediately notify the caller of the current known locations.
if (listener_state.has_been_created) {
auto &locations = listener_state.current_object_locations;
io_service_.post([callback, locations, object_id]() {
callback(object_id, locations, /*has_been_created=*/true);
});
}
return status;
}

Expand All @@ -176,19 +209,32 @@ ray::Status ObjectDirectory::UnsubscribeObjectLocations(const UniqueID &callback

ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
const OnLocationsFound &callback) {
JobID job_id = JobID::nil();
ray::Status status = gcs_client_->object_table().Lookup(
job_id, object_id,
[this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
std::vector<ClientID> locations_vector = UpdateObjectLocations(
client_ids, location_history, gcs_client_->client_table());
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(locations_vector, object_id);
});
ray::Status status;
auto it = listeners_.find(object_id);
if (it == listeners_.end()) {
JobID job_id = JobID::nil();
status = gcs_client_->object_table().Lookup(
job_id, object_id,
[this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
bool has_been_created = false;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&client_ids, &has_been_created);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, client_ids, has_been_created);
});
} else {
// If we have locations cached due to a concurrent SubscribeObjectLocations
// call, call the callback immediately with the cached locations.
auto &locations = it->second.current_object_locations;
bool has_been_created = it->second.has_been_created;
io_service_.post([callback, object_id, locations, has_been_created]() {
callback(object_id, locations, has_been_created);
});
}
return status;
}

Expand Down
20 changes: 18 additions & 2 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class ObjectDirectoryInterface {
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;

/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const std::vector<ray::ClientID> &,
const ray::ObjectID &object_id)>;
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &,
bool has_been_created)>;

/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
Expand All @@ -59,6 +60,13 @@ class ObjectDirectoryInterface {
virtual ray::Status LookupLocations(const ObjectID &object_id,
const OnLocationsFound &callback) = 0;

/// Handle the removal of an object manager client. This updates the
/// locations of all subscribed objects that have the removed client as a
/// location, and fires the subscribed callbacks for those objects.
///
/// \param client_id The object manager client that was removed.
virtual void HandleClientRemoved(const ClientID &client_id) = 0;

/// Subscribe to be notified of locations (ClientID) of the given object.
/// The callback will be invoked with the complete list of known locations
/// whenever the set of locations changes. The callback will also be fired if
Expand Down Expand Up @@ -138,6 +146,8 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status LookupLocations(const ObjectID &object_id,
const OnLocationsFound &callback) override;

void HandleClientRemoved(const ClientID &client_id) override;

ray::Status SubscribeObjectLocations(const UniqueID &callback_id,
const ObjectID &object_id,
const OnLocationsFound &callback) override;
Expand All @@ -164,6 +174,12 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<UniqueID, OnLocationsFound> callbacks;
/// The current set of known locations of this object.
std::unordered_set<ClientID> current_object_locations;
/// This flag will get set to true if the object has ever been created. It
/// should never go back to false once set to true. If this is true, and
/// the current_object_locations is empty, then this means that the object
/// does not exist on any nodes due to eviction (rather than due to the
/// object never getting created, for instance).
bool has_been_created;
};

/// Reference to the event loop.
Expand Down
48 changes: 12 additions & 36 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,9 @@ namespace ray {

ObjectManager::ObjectManager(asio::io_service &main_service,
const ObjectManagerConfig &config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
// TODO(hme): Eliminate knowledge of GCS.
: client_id_(gcs_client->client_table().GetLocalClientId()),
config_(config),
object_directory_(new ObjectDirectory(main_service, gcs_client)),
store_notification_(main_service, config_.store_socket_name),
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
connection_pool_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
main_service_ = &main_service;
store_notification_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
HandleObjectAdded(object_info);
});
store_notification_.SubscribeObjDeleted(
[this](const ObjectID &oid) { NotifyDirectoryObjectDeleted(oid); });
StartIOService();
}

ObjectManager::ObjectManager(asio::io_service &main_service,
const ObjectManagerConfig &config,
std::unique_ptr<ObjectDirectoryInterface> od)
std::shared_ptr<ObjectDirectoryInterface> object_directory)
: config_(config),
object_directory_(std::move(od)),
object_directory_(std::move(object_directory)),
store_notification_(main_service, config_.store_socket_name),
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
Expand Down Expand Up @@ -156,7 +128,8 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
// no ordering guarantee between notifications.
return object_directory_->SubscribeObjectLocations(
object_directory_pull_callback_id_, object_id,
[this](const std::vector<ClientID> &client_ids, const ObjectID &object_id) {
[this](const ObjectID &object_id, const std::unordered_set<ClientID> &client_ids,
bool created) {
// Exit if the Pull request has already been fulfilled or canceled.
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
Expand All @@ -166,7 +139,8 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
// NOTE(swang): Since we are overwriting the previous list of clients,
// we may end up sending a duplicate request to the same client as
// before.
it->second.client_locations = client_ids;
it->second.client_locations =
std::vector<ClientID>(client_ids.begin(), client_ids.end());
if (it->second.client_locations.empty()) {
// The object locations are now empty, so we should wait for the next
// notification about a new object location. Cancel the timer until
Expand Down Expand Up @@ -591,8 +565,9 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) {
// Lookup remaining objects.
wait_state.requested_objects.insert(object_id);
RAY_RETURN_NOT_OK(object_directory_->LookupLocations(
object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &lookup_object_id) {
object_id,
[this, wait_id](const ObjectID &lookup_object_id,
const std::unordered_set<ClientID> &client_ids, bool created) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
if (!client_ids.empty()) {
wait_state.remaining.erase(lookup_object_id);
Expand Down Expand Up @@ -624,8 +599,9 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
wait_state.requested_objects.insert(object_id);
// Subscribe to object notifications.
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
wait_id, object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &subscribe_object_id) {
wait_id, object_id,
[this, wait_id](const ObjectID &subscribe_object_id,
const std::unordered_set<ClientID> &client_ids, bool created) {
if (!client_ids.empty()) {
auto object_id_wait_state = active_wait_requests_.find(wait_id);
if (object_id_wait_state == active_wait_requests_.end()) {
Expand Down
15 changes: 3 additions & 12 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,16 @@ class ObjectManagerInterface {
// TODO(hme): Add success/failure callbacks for push and pull.
class ObjectManager : public ObjectManagerInterface {
public:
/// Implicitly instantiates Ray implementation of ObjectDirectory.
///
/// \param main_service The main asio io_service.
/// \param config ObjectManager configuration.
/// \param gcs_client A client connection to the Ray GCS.
explicit ObjectManager(boost::asio::io_service &main_service,
const ObjectManagerConfig &config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client);

/// Takes user-defined ObjectDirectoryInterface implementation.
/// When this constructor is used, the ObjectManager assumes ownership of
/// the given ObjectDirectory instance.
///
/// \param main_service The main asio io_service.
/// \param config ObjectManager configuration.
/// \param od An object implementing the object directory interface.
/// \param object_directory An object implementing the object directory interface.
explicit ObjectManager(boost::asio::io_service &main_service,
const ObjectManagerConfig &config,
std::unique_ptr<ObjectDirectoryInterface> od);
std::shared_ptr<ObjectDirectoryInterface> object_directory);

~ObjectManager();

Expand Down Expand Up @@ -363,7 +354,7 @@ class ObjectManager : public ObjectManagerInterface {

ClientID client_id_;
const ObjectManagerConfig config_;
std::unique_ptr<ObjectDirectoryInterface> object_directory_;
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
ObjectStoreNotificationManager store_notification_;
ObjectBufferPool buffer_pool_;

Expand Down
3 changes: 2 additions & 1 deletion src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class MockServer {
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config, gcs_client) {
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
Expand Down
Loading

0 comments on commit fcc3702

Please sign in to comment.