Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw exception for ray.get of an evicted actor object #3490

Merged
merged 12 commits into from
Dec 14, 2018
103 changes: 64 additions & 39 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,43 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,

namespace {

std::vector<ClientID> UpdateObjectLocations(
std::unordered_set<ClientID> &client_ids,
const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table) {
/// Process a suffix of the object table log and store the result in
/// client_ids. This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix. This also stores a
/// bool in has_been_created indicating whether the object has ever been
/// created before.
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document this function. In particular the fact that we have output arguments.

const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids,
bool *has_been_created) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unfortunately doesn't seem to play too nicely with #3499 because sometimes we evict the keys so they will appear to have never been created. cc @ericl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's true...I can't really think of a foolproof way around that except to fail the object after some number of attempts.

// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
// client1.is_eviction = true
// client2.is_eviction = false
// In such a scenario, we want to indicate client2 is the only client that contains
// the object, which the following code achieves.
if (!location_history.empty()) {
// If there are entries, then the object has been created. Once this flag
// is set to true, it should never go back to false.
*has_been_created = true;
}
for (const auto &object_table_data : location_history) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (!object_table_data.is_eviction) {
client_ids.insert(client_id);
client_ids->insert(client_id);
} else {
client_ids.erase(client_id);
client_ids->erase(client_id);
}
}
// Filter out the removed clients from the object locations.
for (auto it = client_ids.begin(); it != client_ids.end();) {
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids.erase(it);
it = client_ids->erase(it);
} else {
it++;
}
}
return std::vector<ClientID>(client_ids.begin(), client_ids.end());
}

} // namespace
Expand All @@ -45,26 +54,27 @@ void ObjectDirectory::RegisterBackend() {
gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Objects are added to this map in SubscribeObjectLocations.
auto object_id_listener_pair = listeners_.find(object_id);
auto it = listeners_.find(object_id);
// Do nothing for objects we are not listening for.
if (object_id_listener_pair == listeners_.end()) {
if (it == listeners_.end()) {
return;
}
// Update entries for this object.
std::vector<ClientID> client_id_vec =
UpdateObjectLocations(object_id_listener_pair->second.current_object_locations,
location_history, gcs_client_->client_table());
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations,
&it->second.has_been_created);
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
auto callbacks = object_id_listener_pair->second.callbacks;
auto callbacks = it->second.callbacks;
// Call all callbacks associated with the object id locations we have
// received. This notifies the client even if the list of locations is
// empty, since this may indicate that the objects have been evicted from
// all nodes.
for (const auto &callback_pair : callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(client_id_vec, object_id);
callback_pair.second(object_id, it->second.current_object_locations,
it->second.has_been_created);
}
};
RAY_CHECK_OK(gcs_client_->object_table().Subscribe(
Expand Down Expand Up @@ -137,24 +147,26 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
const ObjectID &object_id,
const OnLocationsFound &callback) {
ray::Status status = ray::Status::OK();
if (listeners_.find(object_id) == listeners_.end()) {
listeners_.emplace(object_id, LocationListenerState());
auto it = listeners_.find(object_id);
if (it == listeners_.end()) {
it = listeners_.emplace(object_id, LocationListenerState()).first;
status = gcs_client_->object_table().RequestNotifications(
JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
}
auto &listener_state = listeners_.find(object_id)->second;
auto &listener_state = it->second;
// TODO(hme): Make this fatal after implementing Pull suppression.
if (listener_state.callbacks.count(callback_id) > 0) {
return ray::Status::OK();
}
listener_state.callbacks.emplace(callback_id, callback);
// Immediately notify of object locations. This notifies the client even if
// the list of locations is empty, since this may indicate that the objects
// have been evicted from all nodes.
std::vector<ClientID> client_id_vec(listener_state.current_object_locations.begin(),
listener_state.current_object_locations.end());
io_service_.post(
[callback, client_id_vec, object_id]() { callback(client_id_vec, object_id); });
// If we previously received some notifications about the object's locations,
// immediately notify the caller of the current known locations.
if (listener_state.has_been_created) {
auto &locations = listener_state.current_object_locations;
io_service_.post([callback, locations, object_id]() {
callback(object_id, locations, /*has_been_created=*/true);
});
}
return status;
}

Expand All @@ -176,19 +188,32 @@ ray::Status ObjectDirectory::UnsubscribeObjectLocations(const UniqueID &callback

ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
const OnLocationsFound &callback) {
JobID job_id = JobID::nil();
ray::Status status = gcs_client_->object_table().Lookup(
job_id, object_id,
[this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
std::vector<ClientID> locations_vector = UpdateObjectLocations(
client_ids, location_history, gcs_client_->client_table());
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(locations_vector, object_id);
});
ray::Status status;
auto it = listeners_.find(object_id);
if (it == listeners_.end()) {
JobID job_id = JobID::nil();
status = gcs_client_->object_table().Lookup(
job_id, object_id,
[this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
bool has_been_created = false;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&client_ids, &has_been_created);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, client_ids, has_been_created);
});
} else {
// If we have locations cached due to a concurrent SubscribeObjectLocations
// call, call the callback immediately with the cached locations.
auto &locations = it->second.current_object_locations;
bool has_been_created = it->second.has_been_created;
io_service_.post([callback, object_id, locations, has_been_created]() {
callback(object_id, locations, has_been_created);
});
}
return status;
}

Expand Down
11 changes: 9 additions & 2 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class ObjectDirectoryInterface {
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;

/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const std::vector<ray::ClientID> &,
const ray::ObjectID &object_id)>;
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &,
bool has_been_created)>;

/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
Expand Down Expand Up @@ -164,6 +165,12 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<UniqueID, OnLocationsFound> callbacks;
/// The current set of known locations of this object.
std::unordered_set<ClientID> current_object_locations;
/// This flag will get set to true if the object has ever been created. It
/// should never go back to false once set to true. If this is true, and
/// the current_object_locations is empty, then this means that the object
/// does not exist on any nodes due to eviction (rather than due to the
/// object never getting created, for instance).
bool has_been_created;
};

/// Reference to the event loop.
Expand Down
48 changes: 12 additions & 36 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,9 @@ namespace ray {

ObjectManager::ObjectManager(asio::io_service &main_service,
const ObjectManagerConfig &config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
// TODO(hme): Eliminate knowledge of GCS.
: client_id_(gcs_client->client_table().GetLocalClientId()),
config_(config),
object_directory_(new ObjectDirectory(main_service, gcs_client)),
store_notification_(main_service, config_.store_socket_name),
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
connection_pool_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
main_service_ = &main_service;
store_notification_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
HandleObjectAdded(object_info);
});
store_notification_.SubscribeObjDeleted(
[this](const ObjectID &oid) { NotifyDirectoryObjectDeleted(oid); });
StartIOService();
}

ObjectManager::ObjectManager(asio::io_service &main_service,
const ObjectManagerConfig &config,
std::unique_ptr<ObjectDirectoryInterface> od)
std::shared_ptr<ObjectDirectoryInterface> object_directory)
: config_(config),
object_directory_(std::move(od)),
object_directory_(std::move(object_directory)),
store_notification_(main_service, config_.store_socket_name),
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
Expand Down Expand Up @@ -156,7 +128,8 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
// no ordering guarantee between notifications.
return object_directory_->SubscribeObjectLocations(
object_directory_pull_callback_id_, object_id,
[this](const std::vector<ClientID> &client_ids, const ObjectID &object_id) {
[this](const ObjectID &object_id, const std::unordered_set<ClientID> &client_ids,
bool created) {
// Exit if the Pull request has already been fulfilled or canceled.
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
Expand All @@ -166,7 +139,8 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
// NOTE(swang): Since we are overwriting the previous list of clients,
// we may end up sending a duplicate request to the same client as
// before.
it->second.client_locations = client_ids;
it->second.client_locations =
std::vector<ClientID>(client_ids.begin(), client_ids.end());
if (it->second.client_locations.empty()) {
// The object locations are now empty, so we should wait for the next
// notification about a new object location. Cancel the timer until
Expand Down Expand Up @@ -591,8 +565,9 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) {
// Lookup remaining objects.
wait_state.requested_objects.insert(object_id);
RAY_RETURN_NOT_OK(object_directory_->LookupLocations(
object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &lookup_object_id) {
object_id,
[this, wait_id](const ObjectID &lookup_object_id,
const std::unordered_set<ClientID> &client_ids, bool created) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
if (!client_ids.empty()) {
wait_state.remaining.erase(lookup_object_id);
Expand Down Expand Up @@ -624,8 +599,9 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
wait_state.requested_objects.insert(object_id);
// Subscribe to object notifications.
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
wait_id, object_id, [this, wait_id](const std::vector<ClientID> &client_ids,
const ObjectID &subscribe_object_id) {
wait_id, object_id,
[this, wait_id](const ObjectID &subscribe_object_id,
const std::unordered_set<ClientID> &client_ids, bool created) {
if (!client_ids.empty()) {
auto object_id_wait_state = active_wait_requests_.find(wait_id);
if (object_id_wait_state == active_wait_requests_.end()) {
Expand Down
15 changes: 3 additions & 12 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,16 @@ class ObjectManagerInterface {
// TODO(hme): Add success/failure callbacks for push and pull.
class ObjectManager : public ObjectManagerInterface {
public:
/// Implicitly instantiates Ray implementation of ObjectDirectory.
///
/// \param main_service The main asio io_service.
/// \param config ObjectManager configuration.
/// \param gcs_client A client connection to the Ray GCS.
explicit ObjectManager(boost::asio::io_service &main_service,
const ObjectManagerConfig &config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client);

/// Takes user-defined ObjectDirectoryInterface implementation.
/// When this constructor is used, the ObjectManager assumes ownership of
/// the given ObjectDirectory instance.
///
/// \param main_service The main asio io_service.
/// \param config ObjectManager configuration.
/// \param od An object implementing the object directory interface.
/// \param object_directory An object implementing the object directory interface.
explicit ObjectManager(boost::asio::io_service &main_service,
const ObjectManagerConfig &config,
std::unique_ptr<ObjectDirectoryInterface> od);
std::shared_ptr<ObjectDirectoryInterface> object_directory);

~ObjectManager();

Expand Down Expand Up @@ -363,7 +354,7 @@ class ObjectManager : public ObjectManagerInterface {

ClientID client_id_;
const ObjectManagerConfig config_;
std::unique_ptr<ObjectDirectoryInterface> object_directory_;
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
ObjectStoreNotificationManager store_notification_;
ObjectBufferPool buffer_pool_;

Expand Down
3 changes: 2 additions & 1 deletion src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class MockServer {
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config, gcs_client) {
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
Expand Down
8 changes: 5 additions & 3 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class MockServer {
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config, gcs_client) {
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
Expand Down Expand Up @@ -285,8 +286,9 @@ class TestObjectManager : public TestObjectManagerBase {

RAY_CHECK_OK(server1->object_manager_.object_directory_->SubscribeObjectLocations(
sub_id, object_1,
[this, sub_id, object_1, object_2](const std::vector<ray::ClientID> &clients,
const ray::ObjectID &object_id) {
[this, sub_id, object_1, object_2](
const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &clients, bool created) {
if (!clients.empty()) {
TestWaitWhileSubscribed(sub_id, object_1, object_2);
}
Expand Down
Loading