Skip to content

Commit

Permalink
Properly short circuit core worker Get() on exception (#5672)
Browse files Browse the repository at this point in the history
  • Loading branch information
edoakes authored and pcmoritz committed Sep 12, 2019
1 parent 946ebfa commit 0bf79cf
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 43 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ cc_library(
copts = COPTS,
deps = [
":common_cc_proto",
":gcs_cc_proto",
":node_manager_fbs",
":ray_util",
"@boost//:asio",
Expand Down
22 changes: 22 additions & 0 deletions src/ray/common/ray_object.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#include "ray/common/ray_object.h"

namespace ray {

bool RayObject::IsException() {
if (metadata_ == nullptr) {
return false;
}
// TODO (kfstorm): metadata should be structured.
const std::string metadata(reinterpret_cast<const char *>(metadata_->Data()),
metadata_->Size());
const auto error_type_descriptor = ray::rpc::ErrorType_descriptor();
for (int i = 0; i < error_type_descriptor->value_count(); i++) {
const auto error_type_number = error_type_descriptor->value(i)->number();
if (metadata == std::to_string(error_type_number)) {
return true;
}
}
return false;
}

} // namespace ray
6 changes: 5 additions & 1 deletion src/ray/common/ray_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define RAY_COMMON_RAY_OBJECT_H

#include "ray/common/buffer.h"
#include "ray/protobuf/gcs.pb.h"
#include "ray/util/logging.h"

namespace ray {
Expand Down Expand Up @@ -65,6 +66,9 @@ class RayObject {
/// Whether this object has metadata.
bool HasMetadata() const { return metadata_ != nullptr; }

/// Whether the object represents an exception.
bool IsException();

private:
std::shared_ptr<Buffer> data_;
std::shared_ptr<Buffer> metadata_;
Expand All @@ -74,4 +78,4 @@ class RayObject {

} // namespace ray

#endif // RAY_COMMON_BUFFER_H
#endif // RAY_COMMON_RAY_OBJECT_H
6 changes: 5 additions & 1 deletion src/ray/core_worker/object_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,

std::unordered_map<ObjectID, std::shared_ptr<RayObject>> result_map;
auto remaining_timeout_ms = timeout_ms;
bool got_exception = false;

// Re-order the list so that we always get from plasma store provider first,
// since it uses a loop of `FetchOrReconstruct` and plasma `Get`, it's not
Expand All @@ -99,7 +100,10 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
auto start_time = current_time_ms();
RAY_RETURN_NOT_OK(store_providers_[entry.first]->Get(
entry.second, remaining_timeout_ms, worker_context_.GetCurrentTaskID(),
&result_map));
&result_map, &got_exception));
if (got_exception) {
break;
}
if (remaining_timeout_ms > 0) {
int64_t duration = current_time_ms() - start_time;
remaining_timeout_ms =
Expand Down
6 changes: 5 additions & 1 deletion src/ray/core_worker/store_provider/memory_store_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Status CoreWorkerMemoryStoreProvider::Put(const RayObject &object,
Status CoreWorkerMemoryStoreProvider::Get(
const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
const TaskID &task_id,
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) {
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception) {
const std::vector<ObjectID> id_vector(object_ids.begin(), object_ids.end());
std::vector<std::shared_ptr<RayObject>> result_objects;
RAY_RETURN_NOT_OK(
Expand All @@ -39,6 +40,9 @@ Status CoreWorkerMemoryStoreProvider::Get(
for (size_t i = 0; i < id_vector.size(); i++) {
if (result_objects[i] != nullptr) {
(*results)[id_vector[i]] = result_objects[i];
if (result_objects[i]->IsException()) {
*got_exception = true;
}
}
}
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/store_provider/memory_store_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class CoreWorkerMemoryStoreProvider : public CoreWorkerStoreProvider {
/// See `CoreWorkerStoreProvider::Get` for semantics.
Status Get(const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
const TaskID &task_id,
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) override;
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception) override;

/// See `CoreWorkerStoreProvider::Wait` for semantics.
/// Note that `num_objects` must equal to number of items in `object_ids`.
Expand Down
31 changes: 7 additions & 24 deletions src/ray/core_worker/store_provider/plasma_store_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
const auto result_object = std::make_shared<RayObject>(data, metadata);
(*results)[object_id] = result_object;
remaining.erase(object_id);
if (IsException(*result_object)) {
if (result_object->IsException()) {
*got_exception = true;
}
}
Expand All @@ -92,9 +92,9 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
Status CoreWorkerPlasmaStoreProvider::Get(
const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
const TaskID &task_id,
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) {
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception) {
int64_t batch_size = RayConfig::instance().worker_fetch_request_size();
bool got_exception = false;
std::vector<ObjectID> batch_ids;
std::unordered_set<ObjectID> remaining(object_ids.begin(), object_ids.end());

Expand All @@ -108,11 +108,11 @@ Status CoreWorkerPlasmaStoreProvider::Get(
}
RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, batch_ids, /*timeout_ms=*/0,
/*fetch_only=*/true, task_id, results,
&got_exception));
got_exception));
}

// If all objects were fetched already, return.
if (remaining.empty() || got_exception) {
if (remaining.empty() || *got_exception) {
return Status::OK();
}

Expand Down Expand Up @@ -142,8 +142,8 @@ Status CoreWorkerPlasmaStoreProvider::Get(
size_t previous_size = remaining.size();
RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, batch_ids, batch_timeout,
/*fetch_only=*/false, task_id, results,
&got_exception));
should_break = should_break || got_exception;
got_exception));
should_break = should_break || *got_exception;

if ((previous_size - remaining.size()) < batch_ids.size()) {
unsuccessful_attempts++;
Expand Down Expand Up @@ -178,23 +178,6 @@ Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector<ObjectID> &object
return raylet_client_->FreeObjects(object_ids, local_only, delete_creating_tasks);
}

bool CoreWorkerPlasmaStoreProvider::IsException(const RayObject &object) {
// TODO (kfstorm): metadata should be structured.
if (!object.HasMetadata()) {
return false;
}
const std::string metadata(reinterpret_cast<const char *>(object.GetMetadata()->Data()),
object.GetMetadata()->Size());
const auto error_type_descriptor = ray::rpc::ErrorType_descriptor();
for (int i = 0; i < error_type_descriptor->value_count(); i++) {
const auto error_type_number = error_type_descriptor->value(i)->number();
if (metadata == std::to_string(error_type_number)) {
return true;
}
}
return false;
}

void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes(
int num_attempts, const std::unordered_set<ObjectID> &remaining) {
if (num_attempts % RayConfig::instance().object_store_get_warn_per_num_attempts() ==
Expand Down
11 changes: 3 additions & 8 deletions src/ray/core_worker/store_provider/plasma_store_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
/// See `CoreWorkerStoreProvider::Get` for semantics.
Status Get(const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
const TaskID &task_id,
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) override;
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception) override;

/// See `CoreWorkerStoreProvider::Wait` for semantics.
Status Wait(const std::unordered_set<ObjectID> &object_ids, int num_objects,
Expand All @@ -51,7 +52,7 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
/// \param[out] results Map of objects to write results into. This method will only
/// add to this map, not clear or remove from it, so the caller can pass in a non-empty
/// map.
/// \param[out] got_exception Whether any of the fetched objects contained an
/// \param[out] got_exception Set to true if any of the fetched objects contained an
/// exception.
/// \return Status.
Status FetchAndGetFromPlasmaStore(
Expand All @@ -60,12 +61,6 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception);

/// Whether the buffer represents an exception object.
///
/// \param[in] object Object data.
/// \return Whether it represents an exception object.
static bool IsException(const RayObject &object);

/// Print a warning if we've attempted too many times, but some objects are still
/// unavailable.
///
Expand Down
11 changes: 6 additions & 5 deletions src/ray/core_worker/store_provider/store_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ class CoreWorkerStoreProvider {
/// \param[in] task_id ID for the current task.
/// \param[out] results Map of objects to write results into. Get will only add to this
/// map, not clear or remove from it, so the caller can pass in a non-empty map.
/// \return Status.
virtual Status Get(
const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
const TaskID &task_id,
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) = 0;
/// \param[out] got_exception Set to true if any of the fetched results were an
/// exception. \return Status.
virtual Status Get(const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
const TaskID &task_id,
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception) = 0;

/// Wait for a list of objects to appear in the object store. Objects that appear will
/// be added to the ready set.
Expand Down
23 changes: 21 additions & 2 deletions src/ray/core_worker/test/core_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "ray/raylet/raylet_client.h"
#include "src/ray/protobuf/direct_actor.grpc.pb.h"
#include "src/ray/protobuf/direct_actor.pb.h"
#include "src/ray/protobuf/gcs.pb.h"
#include "src/ray/util/test_util.h"

#include <boost/asio.hpp>
Expand Down Expand Up @@ -518,10 +519,12 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
ASSERT_TRUE(wait_results.count(nonexistent_id) == 0);

// Test Get().
bool got_exception = false;
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> results;
std::unordered_set<ObjectID> ids_set(ids.begin(), ids.end());
RAY_CHECK_OK(provider.Get(ids_set, -1, RandomTaskId(), &results));
RAY_CHECK_OK(provider.Get(ids_set, -1, RandomTaskId(), &results, &got_exception));

ASSERT_TRUE(!got_exception);
ASSERT_EQ(results.size(), ids.size());
for (size_t i = 0; i < ids.size(); i++) {
const auto &expected = buffers[i];
Expand All @@ -542,7 +545,8 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
RAY_CHECK_OK(provider.Delete(ids, true, false));

usleep(200 * 1000);
RAY_CHECK_OK(provider.Get(ids_set, 0, RandomTaskId(), &results));
RAY_CHECK_OK(provider.Get(ids_set, 0, RandomTaskId(), &results, &got_exception));
ASSERT_TRUE(!got_exception);
ASSERT_EQ(results.size(), 0);

// Test Wait() with objects which will become ready later.
Expand Down Expand Up @@ -823,6 +827,21 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
ASSERT_EQ(*results[i]->GetMetadata(), *buffers[i].GetMetadata());
}

// Test Get() returns early when it encounters an error.
std::vector<ObjectID> ids_with_exception(ids.begin(), ids.end());
ids_with_exception.push_back(ObjectID::FromRandom());
std::vector<RayObject> buffers_with_exception(buffers.begin(), buffers.end());
std::string error_string = std::to_string(ray::rpc::TASK_EXECUTION_EXCEPTION);
char error_buffer[error_string.size()];
size_t len = error_string.copy(error_buffer, error_string.size(), 0);
buffers_with_exception.emplace_back(
nullptr, std::make_shared<LocalMemoryBuffer>(
reinterpret_cast<uint8_t *>(error_buffer), len));

RAY_CHECK_OK(core_worker.Objects().Put(buffers_with_exception.back(),
ids_with_exception.back()));
RAY_CHECK_OK(core_worker.Objects().Get(ids_with_exception, -1, &results));

// Test Wait().
ObjectID non_existent_id = ObjectID::FromRandom();
std::vector<ObjectID> all_ids(ids);
Expand Down

0 comments on commit 0bf79cf

Please sign in to comment.