Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move task to common module and add checks in getter methods #5147

Merged
merged 29 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 38 additions & 33 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,42 @@ cc_library(

# === End of rpc definitions ===

cc_library(
name = "ray_common",
srcs = glob(
[
"src/ray/common/**/*.cc",
],
exclude = [
"src/ray/common/**/*_test.cc",
],
),
hdrs = glob(
[
"src/ray/common/**/*.h",
],
),
copts = COPTS,
deps = [
":common_cc_proto",
":node_manager_fbs",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be removed later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add a comment on the core_worker section that the dependency to raylet lib will be removed later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it'll be removed.

":ray_util",
"@boost//:asio",
"@com_github_grpc_grpc//:grpc++",
"@plasma//:plasma_client",
],
)

cc_test(
name = "common_test",
srcs = glob(["src/ray/common/**/*_test.cc"]),
copts = COPTS,
deps = [
":ray_common",
"@com_google_googletest//:gtest_main",
],
)

cc_binary(
name = "raylet",
srcs = ["src/ray/raylet/main.cc"],
Expand Down Expand Up @@ -307,6 +343,8 @@ cc_library(
":core_worker_cc_proto",
":ray_common",
":ray_util",
# TODO(hchen): After `raylet_client` is migrated to gRPC, `core_worker_lib`
# should only depend on `raylet_client`, instead of the whole `raylet_lib`.
":raylet_lib",
":worker_rpc",
],
Expand Down Expand Up @@ -387,16 +425,6 @@ cc_test(
],
)

cc_test(
name = "task_test",
srcs = ["src/ray/raylet/task_test.cc"],
copts = COPTS,
deps = [
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "client_connection_test",
srcs = ["src/ray/raylet/client_connection_test.cc"],
Expand Down Expand Up @@ -495,29 +523,6 @@ cc_library(
],
)

cc_library(
name = "ray_common",
srcs = glob(
[
"src/ray/common/*.cc",
],
exclude = [
"src/ray/common/*_test.cc",
],
),
hdrs = glob(
[
"src/ray/common/*.h",
],
),
copts = COPTS,
deps = [
":ray_util",
"@boost//:asio",
"@plasma//:plasma_client",
],
)

cc_library(
name = "sha256",
srcs = [
Expand Down
4 changes: 2 additions & 2 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ cdef extern from "ray/protobuf/common.pb.h" namespace "Language" nogil:
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"


cdef extern from "ray/raylet/scheduling_resources.h" \
namespace "ray::raylet" nogil:
cdef extern from "ray/common/task/scheduling_resources.h" \
namespace "ray" nogil:
cdef cppclass ResourceSet "ResourceSet":
ResourceSet()
ResourceSet(const unordered_map[c_string, double] &resource_map)
Expand Down
30 changes: 15 additions & 15 deletions python/ray/includes/task.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ cdef extern from "ray/protobuf/gcs.pb.h" namespace "ray::rpc" nogil:
const c_string &SerializeAsString()


cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
cdef cppclass CTaskSpec "ray::raylet::TaskSpecification":
cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil:
cdef cppclass CTaskSpec "ray::TaskSpecification":
CTaskSpec(const RpcTaskSpec message)
CTaskSpec(const c_string &serialized_binary)
const RpcTaskSpec &GetMessage()
Expand All @@ -61,7 +61,7 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
const ResourceSet GetRequiredPlacementResources() const
c_bool IsDriverTask() const
CLanguage GetLanguage() const

c_bool IsNormalTask() const
c_bool IsActorCreationTask() const
c_bool IsActorTask() const
CActorID ActorCreationId() const
Expand All @@ -74,38 +74,38 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
c_vector[CActorHandleID] NewActorHandles() const


cdef extern from "ray/raylet/task_util.h" namespace "ray::raylet" nogil:
cdef cppclass TaskSpecBuilder "ray::raylet::TaskSpecBuilder":
cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil:
cdef cppclass TaskSpecBuilder "ray::TaskSpecBuilder":
TaskSpecBuilder &SetCommonTaskSpec(
const CLanguage &language, const c_vector[c_string] &function_descriptor,
const CJobID &job_id, const CTaskID &parent_task_id, uint64_t parent_counter,
uint64_t num_returns, const unordered_map[c_string, double] &required_resources,
const unordered_map[c_string, double] &required_placement_resources);
const unordered_map[c_string, double] &required_placement_resources)

TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id);
TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id)

TaskSpecBuilder &AddByValueArg(const c_string &data);
TaskSpecBuilder &AddByValueArg(const c_string &data)

TaskSpecBuilder &SetActorCreationTaskSpec(
const CActorID &actor_id, uint64_t max_reconstructions,
const c_vector[c_string] &dynamic_worker_options);
const c_vector[c_string] &dynamic_worker_options)

TaskSpecBuilder &SetActorTaskSpec(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CObjectID &actor_creation_dummy_object_id, uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids);
const c_vector[CActorHandleID] &new_handle_ids)

RpcTaskSpec GetMessage();
RpcTaskSpec GetMessage()


cdef extern from "ray/raylet/task_execution_spec.h" namespace "ray::raylet" nogil:
cdef cppclass CTaskExecutionSpec "ray::raylet::TaskExecutionSpecification":
cdef extern from "ray/common/task/task_execution_spec.h" namespace "ray" nogil:
cdef cppclass CTaskExecutionSpec "ray::TaskExecutionSpecification":
CTaskExecutionSpec(RpcTaskExecutionSpec message)
CTaskExecutionSpec(const c_string &serialized_binary)
const RpcTaskExecutionSpec &GetMessage()
c_vector[CObjectID] ExecutionDependencies()
uint64_t NumForwards()

cdef extern from "ray/raylet/task.h" namespace "ray::raylet" nogil:
cdef cppclass CTask "ray::raylet::Task":
cdef extern from "ray/common/task/task.h" namespace "ray" nogil:
cdef cppclass CTask "ray::Task":
CTask(CTaskSpec task_spec, CTaskExecutionSpec task_execution_spec)
18 changes: 15 additions & 3 deletions python/ray/includes/task.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ from ray.includes.task cimport (


cdef class TaskSpec:
"""Cython wrapper class of C++ `ray::raylet::TaskSpecification`."""
"""Cython wrapper class of C++ `ray::TaskSpecification`."""
cdef:
unique_ptr[CTaskSpec] task_spec

Expand Down Expand Up @@ -121,6 +121,18 @@ cdef class TaskSpec:
"""
return self.task_spec.get().Serialize()

def is_normal_task(self):
"""Whether this task is a normal task."""
return self.task_spec.get().IsNormalTask()

def is_actor_task(self):
"""Whether this task is an actor task."""
return self.task_spec.get().IsActorTask()

def is_actor_creation_task(self):
"""Whether this task is an actor creation task."""
return self.task_spec.get().IsActorCreationTask()

def job_id(self):
"""Return the job ID for this task."""
return JobID(self.task_spec.get().JobId().Binary())
Expand Down Expand Up @@ -223,7 +235,7 @@ cdef class TaskSpec:


cdef class TaskExecutionSpec:
"""Cython wrapper class of C++ `ray::raylet::TaskExecutionSpecification`."""
"""Cython wrapper class of C++ `ray::TaskExecutionSpecification`."""
cdef:
unique_ptr[CTaskExecutionSpec] c_spec

Expand Down Expand Up @@ -259,7 +271,7 @@ cdef class TaskExecutionSpec:


cdef class Task:
"""Cython wrapper class of C++ `ray::raylet::Task`."""
"""Cython wrapper class of C++ `ray::Task`."""
cdef:
unique_ptr[CTask] c_task

Expand Down
20 changes: 9 additions & 11 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def _process_task(self, task, function_execution_info):
assert self.current_task_id.is_nil()
assert self.task_context.task_index == 0
assert self.task_context.put_index == 1
if task.actor_id().is_nil():
if not task.is_actor_task():
# If this worker is not an actor, check that `current_job_id`
# was reset when the worker finished the previous task.
assert self.current_job_id.is_nil()
Expand All @@ -879,8 +879,8 @@ def _process_task(self, task, function_execution_info):
task.function_descriptor_list())
args = task.arguments()
return_object_ids = task.returns()
if (not task.actor_id().is_nil()
or not task.actor_creation_id().is_nil()):
if (task.is_actor_task()
or task.is_actor_creation_task()):
dummy_return_id = return_object_ids.pop()
function_executor = function_execution_info.function
function_name = function_execution_info.function_name
Expand All @@ -903,11 +903,10 @@ def _process_task(self, task, function_execution_info):
try:
self._current_task = task
with profiling.profile("task:execute"):
if (task.actor_id().is_nil()
and task.actor_creation_id().is_nil()):
if task.is_normal_task():
outputs = function_executor(*arguments)
else:
if not task.actor_id().is_nil():
if task.is_actor_task():
key = task.actor_id()
else:
key = task.actor_creation_id()
Expand All @@ -916,7 +915,7 @@ def _process_task(self, task, function_execution_info):
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
task_exception = task.actor_id().is_nil()
task_exception = not task.is_actor_task()
traceback_str = ray.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)
self._handle_process_task_failure(
Expand Down Expand Up @@ -972,8 +971,7 @@ def _wait_for_and_process_task(self, task):

# TODO(rkn): It would be preferable for actor creation tasks to share
# more of the code path with regular task execution.
if not task.actor_creation_id().is_nil():
assert self.actor_id.is_nil()
if task.is_actor_creation_task():
self.actor_id = task.actor_creation_id()
self.actor_creation_task_id = task.task_id()
actor_class = self.function_actor_manager.load_actor_class(
Expand All @@ -991,8 +989,8 @@ def _wait_for_and_process_task(self, task):
# Execute the task.
function_name = execution_info.function_name
extra_data = {"name": function_name, "task_id": task.task_id().hex()}
if task.actor_id().is_nil():
if task.actor_creation_id().is_nil():
if not task.is_actor_task():
if not task.is_actor_creation_task():
title = "ray_worker:{}()".format(function_name)
next_title = "ray_worker"
else:
Expand Down
36 changes: 31 additions & 5 deletions src/ray/rpc/util.h → src/ray/common/grpc_util.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,41 @@
#ifndef RAY_RPC_UTIL_H
#define RAY_RPC_UTIL_H
#ifndef RAY_COMMON_GRPC_UTIL_H
#define RAY_COMMON_GRPC_UTIL_H

#include <google/protobuf/map.h>
#include <google/protobuf/repeated_field.h>
#include <grpcpp/grpcpp.h>

#include "ray/common/status.h"
#include "status.h"

namespace ray {
namespace rpc {

/// Wrap a protobuf message.
template <class Message>
class MessageWrapper {
public:
/// Construct from a protobuf message object.
/// The input message will be **copied** into this object.
///
/// \param message The protobuf message.
explicit MessageWrapper(const Message message) : message_(std::move(message)) {}

/// Construct from protobuf-serialized binary.
///
/// \param serialized_binary Protobuf-serialized binary.
explicit MessageWrapper(const std::string &serialized_binary) {
message_.ParseFromString(serialized_binary);
}

/// Get reference of the protobuf message.
const Message &GetMessage() const { return message_; }

/// Serialize the message to a string.
const std::string Serialize() const { return message_.SerializeAsString(); }

protected:
/// The wrapped message.
Message message_;
};

/// Helper function that converts a ray status to gRPC status.
inline grpc::Status RayStatusToGrpcStatus(const Status &ray_status) {
Expand Down Expand Up @@ -60,7 +87,6 @@ inline std::unordered_map<K, V> MapFromProtobuf(::google::protobuf::Map<K, V> pb
return std::unordered_map<K, V>(pb_map.begin(), pb_map.end());
}

} // namespace rpc
} // namespace ray

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

namespace ray {

namespace raylet {

FractionalResourceQuantity::FractionalResourceQuantity() { resource_quantity_ = 0; }

FractionalResourceQuantity::FractionalResourceQuantity(double resource_quantity) {
Expand Down Expand Up @@ -285,7 +283,7 @@ const std::unordered_map<std::string, FractionalResourceQuantity>
};

ResourceSet ResourceSet::FindUpdatedResources(
const ray::raylet::ResourceSet &new_resource_set) const {
const ResourceSet &new_resource_set) const {
// Find any new resources and return a ResourceSet with the resource and new capacities
ResourceSet updated_resource_set;
for (const auto &resource_pair : new_resource_set.GetResourceAmountMap()) {
Expand All @@ -307,7 +305,7 @@ ResourceSet ResourceSet::FindUpdatedResources(
}

ResourceSet ResourceSet::FindDeletedResources(
const ray::raylet::ResourceSet &new_resource_set) const {
const ResourceSet &new_resource_set) const {
// Find any new resources and return a ResourceSet with the resource and new capacities
ResourceSet deleted_resource_set;
auto &new_resource_map = new_resource_set.GetResourceAmountMap();
Expand Down Expand Up @@ -823,6 +821,4 @@ std::string SchedulingResources::DebugString() const {
return result.str();
};

} // namespace raylet

} // namespace ray
Loading