Skip to content

Commit

Permalink
Move task to common module and add checks in getter methods (#5147)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulchen authored Jul 11, 2019
1 parent d8b50a5 commit fd835d1
Show file tree
Hide file tree
Showing 51 changed files with 346 additions and 361 deletions.
71 changes: 38 additions & 33 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,42 @@ cc_library(

# === End of rpc definitions ===

cc_library(
name = "ray_common",
srcs = glob(
[
"src/ray/common/**/*.cc",
],
exclude = [
"src/ray/common/**/*_test.cc",
],
),
hdrs = glob(
[
"src/ray/common/**/*.h",
],
),
copts = COPTS,
deps = [
":common_cc_proto",
":node_manager_fbs",
":ray_util",
"@boost//:asio",
"@com_github_grpc_grpc//:grpc++",
"@plasma//:plasma_client",
],
)

cc_test(
name = "common_test",
srcs = glob(["src/ray/common/**/*_test.cc"]),
copts = COPTS,
deps = [
":ray_common",
"@com_google_googletest//:gtest_main",
],
)

cc_binary(
name = "raylet",
srcs = ["src/ray/raylet/main.cc"],
Expand Down Expand Up @@ -307,6 +343,8 @@ cc_library(
":core_worker_cc_proto",
":ray_common",
":ray_util",
# TODO(hchen): After `raylet_client` is migrated to gRPC, `core_worker_lib`
# should only depend on `raylet_client`, instead of the whole `raylet_lib`.
":raylet_lib",
":worker_rpc",
],
Expand Down Expand Up @@ -387,16 +425,6 @@ cc_test(
],
)

cc_test(
name = "task_test",
srcs = ["src/ray/raylet/task_test.cc"],
copts = COPTS,
deps = [
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "client_connection_test",
srcs = ["src/ray/raylet/client_connection_test.cc"],
Expand Down Expand Up @@ -495,29 +523,6 @@ cc_library(
],
)

cc_library(
name = "ray_common",
srcs = glob(
[
"src/ray/common/*.cc",
],
exclude = [
"src/ray/common/*_test.cc",
],
),
hdrs = glob(
[
"src/ray/common/*.h",
],
),
copts = COPTS,
deps = [
":ray_util",
"@boost//:asio",
"@plasma//:plasma_client",
],
)

cc_library(
name = "sha256",
srcs = [
Expand Down
4 changes: 2 additions & 2 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ cdef extern from "ray/protobuf/common.pb.h" namespace "Language" nogil:
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"


cdef extern from "ray/raylet/scheduling_resources.h" \
namespace "ray::raylet" nogil:
cdef extern from "ray/common/task/scheduling_resources.h" \
namespace "ray" nogil:
cdef cppclass ResourceSet "ResourceSet":
ResourceSet()
ResourceSet(const unordered_map[c_string, double] &resource_map)
Expand Down
30 changes: 15 additions & 15 deletions python/ray/includes/task.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ cdef extern from "ray/protobuf/gcs.pb.h" namespace "ray::rpc" nogil:
const c_string &SerializeAsString()


cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
cdef cppclass CTaskSpec "ray::raylet::TaskSpecification":
cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil:
cdef cppclass CTaskSpec "ray::TaskSpecification":
CTaskSpec(const RpcTaskSpec message)
CTaskSpec(const c_string &serialized_binary)
const RpcTaskSpec &GetMessage()
Expand All @@ -61,7 +61,7 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
const ResourceSet GetRequiredPlacementResources() const
c_bool IsDriverTask() const
CLanguage GetLanguage() const

c_bool IsNormalTask() const
c_bool IsActorCreationTask() const
c_bool IsActorTask() const
CActorID ActorCreationId() const
Expand All @@ -74,38 +74,38 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
c_vector[CActorHandleID] NewActorHandles() const


cdef extern from "ray/raylet/task_util.h" namespace "ray::raylet" nogil:
cdef cppclass TaskSpecBuilder "ray::raylet::TaskSpecBuilder":
cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil:
cdef cppclass TaskSpecBuilder "ray::TaskSpecBuilder":
TaskSpecBuilder &SetCommonTaskSpec(
const CLanguage &language, const c_vector[c_string] &function_descriptor,
const CJobID &job_id, const CTaskID &parent_task_id, uint64_t parent_counter,
uint64_t num_returns, const unordered_map[c_string, double] &required_resources,
const unordered_map[c_string, double] &required_placement_resources);
const unordered_map[c_string, double] &required_placement_resources)

TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id);
TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id)

TaskSpecBuilder &AddByValueArg(const c_string &data);
TaskSpecBuilder &AddByValueArg(const c_string &data)

TaskSpecBuilder &SetActorCreationTaskSpec(
const CActorID &actor_id, uint64_t max_reconstructions,
const c_vector[c_string] &dynamic_worker_options);
const c_vector[c_string] &dynamic_worker_options)

TaskSpecBuilder &SetActorTaskSpec(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CObjectID &actor_creation_dummy_object_id, uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids);
const c_vector[CActorHandleID] &new_handle_ids)

RpcTaskSpec GetMessage();
RpcTaskSpec GetMessage()


cdef extern from "ray/raylet/task_execution_spec.h" namespace "ray::raylet" nogil:
cdef cppclass CTaskExecutionSpec "ray::raylet::TaskExecutionSpecification":
cdef extern from "ray/common/task/task_execution_spec.h" namespace "ray" nogil:
cdef cppclass CTaskExecutionSpec "ray::TaskExecutionSpecification":
CTaskExecutionSpec(RpcTaskExecutionSpec message)
CTaskExecutionSpec(const c_string &serialized_binary)
const RpcTaskExecutionSpec &GetMessage()
c_vector[CObjectID] ExecutionDependencies()
uint64_t NumForwards()

cdef extern from "ray/raylet/task.h" namespace "ray::raylet" nogil:
cdef cppclass CTask "ray::raylet::Task":
cdef extern from "ray/common/task/task.h" namespace "ray" nogil:
cdef cppclass CTask "ray::Task":
CTask(CTaskSpec task_spec, CTaskExecutionSpec task_execution_spec)
26 changes: 23 additions & 3 deletions python/ray/includes/task.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ from ray.includes.task cimport (


cdef class TaskSpec:
"""Cython wrapper class of C++ `ray::raylet::TaskSpecification`."""
"""Cython wrapper class of C++ `ray::TaskSpecification`."""
cdef:
unique_ptr[CTaskSpec] task_spec

Expand Down Expand Up @@ -121,6 +121,18 @@ cdef class TaskSpec:
"""
return self.task_spec.get().Serialize()

def is_normal_task(self):
"""Whether this task is a normal task."""
return self.task_spec.get().IsNormalTask()

def is_actor_task(self):
"""Whether this task is an actor task."""
return self.task_spec.get().IsActorTask()

def is_actor_creation_task(self):
"""Whether this task is an actor creation task."""
return self.task_spec.get().IsActorCreationTask()

def job_id(self):
"""Return the job ID for this task."""
return JobID(self.task_spec.get().JobId().Binary())
Expand Down Expand Up @@ -206,24 +218,32 @@ cdef class TaskSpec:

def actor_creation_id(self):
"""Return the actor creation ID for the task."""
if not self.is_actor_creation_task():
return ActorID.nil()
return ActorID(self.task_spec.get().ActorCreationId().Binary())

def actor_creation_dummy_object_id(self):
"""Return the actor creation dummy object ID for the task."""
if not self.is_actor_task():
return ObjectID.nil()
return ObjectID(
self.task_spec.get().ActorCreationDummyObjectId().Binary())

def actor_id(self):
"""Return the actor ID for this task."""
if not self.is_actor_task():
return ActorID.nil()
return ActorID(self.task_spec.get().ActorId().Binary())

def actor_counter(self):
"""Return the actor counter for this task."""
if not self.is_actor_task():
return 0
return self.task_spec.get().ActorCounter()


cdef class TaskExecutionSpec:
"""Cython wrapper class of C++ `ray::raylet::TaskExecutionSpecification`."""
"""Cython wrapper class of C++ `ray::TaskExecutionSpecification`."""
cdef:
unique_ptr[CTaskExecutionSpec] c_spec

Expand Down Expand Up @@ -259,7 +279,7 @@ cdef class TaskExecutionSpec:


cdef class Task:
"""Cython wrapper class of C++ `ray::raylet::Task`."""
"""Cython wrapper class of C++ `ray::Task`."""
cdef:
unique_ptr[CTask] c_task

Expand Down
19 changes: 8 additions & 11 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ def _process_task(self, task, function_execution_info):
assert self.current_task_id.is_nil()
assert self.task_context.task_index == 0
assert self.task_context.put_index == 1
if task.actor_id().is_nil():
if not task.is_actor_task():
# If this worker is not an actor, check that `current_job_id`
# was reset when the worker finished the previous task.
assert self.current_job_id.is_nil()
Expand All @@ -887,8 +887,7 @@ def _process_task(self, task, function_execution_info):
task.function_descriptor_list())
args = task.arguments()
return_object_ids = task.returns()
if (not task.actor_id().is_nil()
or not task.actor_creation_id().is_nil()):
if task.is_actor_task() or task.is_actor_creation_task():
dummy_return_id = return_object_ids.pop()
function_executor = function_execution_info.function
function_name = function_execution_info.function_name
Expand All @@ -911,11 +910,10 @@ def _process_task(self, task, function_execution_info):
try:
self._current_task = task
with profiling.profile("task:execute"):
if (task.actor_id().is_nil()
and task.actor_creation_id().is_nil()):
if task.is_normal_task():
outputs = function_executor(*arguments)
else:
if not task.actor_id().is_nil():
if task.is_actor_task():
key = task.actor_id()
else:
key = task.actor_creation_id()
Expand All @@ -924,7 +922,7 @@ def _process_task(self, task, function_execution_info):
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
task_exception = task.actor_id().is_nil()
task_exception = not task.is_actor_task()
traceback_str = ray.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)
self._handle_process_task_failure(
Expand Down Expand Up @@ -980,8 +978,7 @@ def _wait_for_and_process_task(self, task):

# TODO(rkn): It would be preferable for actor creation tasks to share
# more of the code path with regular task execution.
if not task.actor_creation_id().is_nil():
assert self.actor_id.is_nil()
if task.is_actor_creation_task():
self.actor_id = task.actor_creation_id()
self.actor_creation_task_id = task.task_id()
actor_class = self.function_actor_manager.load_actor_class(
Expand All @@ -999,8 +996,8 @@ def _wait_for_and_process_task(self, task):
# Execute the task.
function_name = execution_info.function_name
extra_data = {"name": function_name, "task_id": task.task_id().hex()}
if task.actor_id().is_nil():
if task.actor_creation_id().is_nil():
if not task.is_actor_task():
if not task.is_actor_creation_task():
title = "ray_worker:{}()".format(function_name)
next_title = "ray_worker"
else:
Expand Down
39 changes: 34 additions & 5 deletions src/ray/rpc/util.h → src/ray/common/grpc_util.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,44 @@
#ifndef RAY_RPC_UTIL_H
#define RAY_RPC_UTIL_H
#ifndef RAY_COMMON_GRPC_UTIL_H
#define RAY_COMMON_GRPC_UTIL_H

#include <google/protobuf/map.h>
#include <google/protobuf/repeated_field.h>
#include <grpcpp/grpcpp.h>

#include "ray/common/status.h"
#include "status.h"

namespace ray {
namespace rpc {

/// Wrap a protobuf message.
template <class Message>
class MessageWrapper {
public:
/// Construct an empty message wrapper. This should not be used directly.
MessageWrapper() {}

/// Construct from a protobuf message object.
/// The input message will be **copied** into this object.
///
/// \param message The protobuf message.
explicit MessageWrapper(const Message message) : message_(std::move(message)) {}

/// Construct from protobuf-serialized binary.
///
/// \param serialized_binary Protobuf-serialized binary.
explicit MessageWrapper(const std::string &serialized_binary) {
message_.ParseFromString(serialized_binary);
}

/// Get reference of the protobuf message.
const Message &GetMessage() const { return message_; }

/// Serialize the message to a string.
const std::string Serialize() const { return message_.SerializeAsString(); }

protected:
/// The wrapped message.
Message message_;
};

/// Helper function that converts a ray status to gRPC status.
inline grpc::Status RayStatusToGrpcStatus(const Status &ray_status) {
Expand Down Expand Up @@ -60,7 +90,6 @@ inline std::unordered_map<K, V> MapFromProtobuf(::google::protobuf::Map<K, V> pb
return std::unordered_map<K, V>(pb_map.begin(), pb_map.end());
}

} // namespace rpc
} // namespace ray

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

namespace ray {

namespace raylet {

FractionalResourceQuantity::FractionalResourceQuantity() { resource_quantity_ = 0; }

FractionalResourceQuantity::FractionalResourceQuantity(double resource_quantity) {
Expand Down Expand Up @@ -785,6 +783,4 @@ std::string SchedulingResources::DebugString() const {
return result.str();
};

} // namespace raylet

} // namespace ray
Loading

0 comments on commit fd835d1

Please sign in to comment.