Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dependencies from TaskExecutionSpecification #5166

Merged
merged 13 commits into from
Jul 16, 2019
7 changes: 2 additions & 5 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,11 @@ cdef class RayletClient:
def disconnect(self):
check_status(self.client.get().Disconnect())

def submit_task(self, TaskSpec task_spec, execution_dependencies):
def submit_task(self, TaskSpec task_spec):
cdef:
CObjectID c_id
c_vector[CObjectID] c_dependencies
for dep in execution_dependencies:
c_dependencies.push_back((<ObjectID>dep).native())
check_status(self.client.get().SubmitTask(
c_dependencies, task_spec.task_spec.get()[0]))
task_spec.task_spec.get()[0]))

def get_task(self):
cdef:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ def _actor_method_call(self,
actor_counter=self._ray_actor_counter,
actor_creation_dummy_object_id=(
self._ray_actor_creation_dummy_object_id),
execution_dependencies=[self._ray_actor_cursor],
previous_actor_task_dummy_object_id=self._ray_actor_cursor,
new_actor_handles=self._ray_new_actor_handles,
# We add one for the dummy return ID.
num_return_vals=num_return_vals + 1,
Expand Down
4 changes: 1 addition & 3 deletions python/ray/includes/libraylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ cdef extern from "ray/raylet/raylet_client.h" nogil:
c_bool is_worker, const CJobID &job_id,
const CLanguage &language)
CRayStatus Disconnect()
CRayStatus SubmitTask(
const c_vector[CObjectID] &execution_dependencies,
const CTaskSpec &task_spec)
CRayStatus SubmitTask(const CTaskSpec &task_spec)
CRayStatus GetTask(unique_ptr[CTaskSpec] *task_spec)
CRayStatus TaskDone()
CRayStatus FetchOrReconstruct(c_vector[CObjectID] &object_ids,
Expand Down
7 changes: 5 additions & 2 deletions python/ray/includes/task.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil:
c_bool IsActorTask() const
CActorID ActorCreationId() const
CObjectID ActorCreationDummyObjectId() const
CObjectID PreviousActorTaskDummyObjectId() const
uint64_t MaxActorReconstructions() const
CActorID ActorId() const
CActorHandleID ActorHandleId() const
Expand All @@ -92,8 +93,10 @@ cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil:

TaskSpecBuilder &SetActorTaskSpec(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CObjectID &actor_creation_dummy_object_id, uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids)
const CObjectID &actor_creation_dummy_object_id,
const CObjectID &previous_actor_task_dummy_object_id,
uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids);

RpcTaskSpec GetMessage()

Expand Down
24 changes: 10 additions & 14 deletions python/ray/includes/task.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ cdef class TaskSpec:
int num_returns, TaskID parent_task_id, int parent_counter,
ActorID actor_creation_id,
ObjectID actor_creation_dummy_object_id,
ObjectID previous_actor_task_dummy_object_id,
int32_t max_actor_reconstructions, ActorID actor_id,
ActorHandleID actor_handle_id, int actor_counter,
new_actor_handles, resource_map, placement_resource_map):
Expand Down Expand Up @@ -85,6 +86,7 @@ cdef class TaskSpec:
actor_id.native(),
actor_handle_id.native(),
actor_creation_dummy_object_id.native(),
previous_actor_task_dummy_object_id.native(),
actor_counter,
c_new_actor_handles,
)
Expand Down Expand Up @@ -229,6 +231,13 @@ cdef class TaskSpec:
return ObjectID(
self.task_spec.get().ActorCreationDummyObjectId().Binary())

def previous_actor_task_dummy_object_id(self):
"""Return the object ID of the previously executed actor task."""
if not self.is_actor_task():
return ObjectID.nil()
return ObjectID(
edoakes marked this conversation as resolved.
Show resolved Hide resolved
self.task_spec.get().PreviousActorTaskDummyObjectId().Binary())

def actor_id(self):
"""Return the actor ID for this task."""
if not self.is_actor_task():
Expand All @@ -247,13 +256,10 @@ cdef class TaskExecutionSpec:
cdef:
unique_ptr[CTaskExecutionSpec] c_spec

def __init__(self, execution_dependencies):
def __init__(self):
cdef:
RpcTaskExecutionSpec message;

for dependency in execution_dependencies:
message.add_dependencies(
(<ObjectID?>dependency).binary())
self.c_spec.reset(new CTaskExecutionSpec(message))

@staticmethod
Expand All @@ -264,16 +270,6 @@ cdef class TaskExecutionSpec:
self.c_spec.reset(new CTaskExecutionSpec(string))
return self

def dependencies(self):
cdef:
CObjectID c_id
c_vector[CObjectID] dependencies = (
self.c_spec.get().ExecutionDependencies())
ret = []
for c_id in dependencies:
ret.append(ObjectID(c_id.Binary()))
return ret

def num_forwards(self):
return self.c_spec.get().NumForwards()

Expand Down
3 changes: 2 additions & 1 deletion python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ def _task_table(self, task_id):
"ActorCreationID": task.actor_creation_id().hex(),
"ActorCreationDummyObjectID": (
task.actor_creation_dummy_object_id().hex()),
"PreviousActorTaskDummyObjectID": (
task.previous_actor_task_dummy_object_id().hex()),
"ActorCounter": task.actor_counter(),
"Args": task.arguments(),
"ReturnObjectIDs": task.returns(),
Expand All @@ -356,7 +358,6 @@ def _task_table(self, task_id):
task_table_data.task.task_execution_spec.SerializeToString())
return {
"ExecutionSpec": {
"Dependencies": execution_spec.dependencies(),
"NumForwards": execution_spec.num_forwards(),
},
"TaskSpec": task_spec_info
Expand Down
14 changes: 7 additions & 7 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,8 @@ def submit_task(self,
actor_counter=0,
actor_creation_id=None,
actor_creation_dummy_object_id=None,
previous_actor_task_dummy_object_id=None,
max_actor_reconstructions=0,
execution_dependencies=None,
new_actor_handles=None,
num_return_vals=None,
resources=None,
Expand All @@ -611,7 +611,9 @@ def submit_task(self,
actor_creation_dummy_object_id: If this task is an actor method,
then this argument is the dummy object ID associated with the
actor creation task for the corresponding actor.
execution_dependencies: The execution dependencies for this task.
previous_actor_task_dummy_object_id: If this task is an actor,
then this argument is the dummy object ID associated with the
task previously submitted to the corresponding actor.
num_return_vals: The number of return values this function should
have.
resources: The resource requirements for this task.
Expand Down Expand Up @@ -652,10 +654,6 @@ def submit_task(self,
else:
args_for_raylet.append(put(arg))

# By default, there are no execution dependencies.
if execution_dependencies is None:
execution_dependencies = []

if new_actor_handles is None:
new_actor_handles = []

Expand Down Expand Up @@ -705,6 +703,7 @@ def submit_task(self,
self.task_context.task_index,
actor_creation_id,
actor_creation_dummy_object_id,
previous_actor_task_dummy_object_id,
max_actor_reconstructions,
actor_id,
actor_handle_id,
Expand All @@ -713,7 +712,7 @@ def submit_task(self,
resources,
placement_resources,
)
self.raylet_client.submit_task(task, execution_dependencies)
self.raylet_client.submit_task(task)

return task.returns()

Expand Down Expand Up @@ -1887,6 +1886,7 @@ def connect(node,
0, # parent_counter.
ActorID.nil(), # actor_creation_id.
ObjectID.nil(), # actor_creation_dummy_object_id.
ObjectID.nil(), # previous_actor_task_dummy_object_id.
0, # max_actor_reconstructions.
ActorID.nil(), # actor_id.
ActorHandleID.nil(), # actor_handle_id.
Expand Down
9 changes: 3 additions & 6 deletions src/ray/common/task/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ void Task::ComputeDependencies() {
dependencies_.push_back(task_spec_.ArgId(i, j));
}
}
// TODO(atumanov): why not just return a const reference to ExecutionDependencies() and
// avoid a copy.
auto execution_dependencies = task_execution_spec_.ExecutionDependencies();
dependencies_.insert(dependencies_.end(), execution_dependencies.begin(),
execution_dependencies.end());
if (task_spec_.IsActorTask()) {
dependencies_.push_back(task_spec_.PreviousActorTaskDummyObjectId());
}
}

void Task::CopyTaskExecutionSpec(const Task &task) {
task_execution_spec_ = task.task_execution_spec_;
ComputeDependencies();
}

std::string Task::DebugString() const {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Task {
/// dependencies, etc.
TaskSpecification task_spec_;
/// Task execution specification, consisting of all dynamic/mutable
/// information about this task determined at execution time..
/// information about this task determined at execution time.
TaskExecutionSpecification task_execution_spec_;
/// A cached copy of the task's object dependencies, including arguments from
/// the TaskSpecification and execution dependencies from the
Expand Down
7 changes: 1 addition & 6 deletions src/ray/common/task/task_execution_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@

namespace ray {

const std::vector<ObjectID> TaskExecutionSpecification::ExecutionDependencies() const {
return IdVectorFromProtobuf<ObjectID>(message_.dependencies());
}

size_t TaskExecutionSpecification::NumForwards() const { return message_.num_forwards(); }

void TaskExecutionSpecification::IncrementNumForwards() {
Expand All @@ -16,8 +12,7 @@ void TaskExecutionSpecification::IncrementNumForwards() {

std::string TaskExecutionSpecification::DebugString() const {
std::ostringstream stream;
stream << "num_dependencies=" << message_.dependencies_size()
<< ", num_forwards=" << message_.num_forwards();
stream << "num_forwards=" << message_.num_forwards();
return stream.str();
}

Expand Down
6 changes: 0 additions & 6 deletions src/ray/common/task/task_execution_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ class TaskExecutionSpecification : public MessageWrapper<rpc::TaskExecutionSpec>
explicit TaskExecutionSpecification(const std::string &serialized_binary)
: MessageWrapper(serialized_binary) {}

/// Get the task's execution dependencies.
///
/// \return A vector of object IDs representing this task's execution
/// dependencies.
const std::vector<ObjectID> ExecutionDependencies() const;

/// Get the number of times this task has been forwarded.
///
/// \return The number of times this task has been forwarded.
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ ObjectID TaskSpecification::ActorCreationDummyObjectId() const {
message_.actor_task_spec().actor_creation_dummy_object_id());
}

ObjectID TaskSpecification::PreviousActorTaskDummyObjectId() const {
RAY_CHECK(IsActorTask());
return ObjectID::FromBinary(
message_.actor_task_spec().previous_actor_task_dummy_object_id());
}


ObjectID TaskSpecification::ActorDummyObject() const {
RAY_CHECK(IsActorTask() || IsActorCreationTask());
return ReturnId(NumReturns() - 1);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

ObjectID ActorCreationDummyObjectId() const;

ObjectID PreviousActorTaskDummyObjectId() const;

std::vector<ActorHandleID> NewActorHandles() const;

ObjectID ActorDummyObject() const;
Expand Down
6 changes: 5 additions & 1 deletion src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ class TaskSpecBuilder {
/// \return Reference to the builder object itself.
TaskSpecBuilder &SetActorTaskSpec(
const ActorID &actor_id, const ActorHandleID &actor_handle_id,
const ObjectID &actor_creation_dummy_object_id, uint64_t actor_counter,
const ObjectID &actor_creation_dummy_object_id,
const ObjectID &previous_actor_task_dummy_object_id,
uint64_t actor_counter,
const std::vector<ActorHandleID> &new_handle_ids = {}) {
message_.set_type(TaskType::ACTOR_TASK);
auto actor_spec = message_.mutable_actor_task_spec();
actor_spec->set_actor_id(actor_id.Binary());
actor_spec->set_actor_handle_id(actor_handle_id.Binary());
actor_spec->set_actor_creation_dummy_object_id(
actor_creation_dummy_object_id.Binary());
actor_spec->set_previous_actor_task_dummy_object_id(
previous_actor_task_dummy_object_id.Binary());
actor_spec->set_actor_counter(actor_counter);
for (const auto &id : new_handle_ids) {
actor_spec->add_new_actor_handles(id.Binary());
Expand Down
24 changes: 0 additions & 24 deletions src/ray/core_worker/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,6 @@ struct TaskInfo {
const TaskType task_type;
};

/// Task specification, which includes the immutable information about the task
/// which are determined at the submission time.
/// TODO(zhijunfu): this can be removed after everything is moved to protobuf.
class TaskSpec {
public:
TaskSpec(const TaskSpecification &task_spec, const std::vector<ObjectID> &dependencies)
: task_spec_(task_spec), dependencies_(dependencies) {}

TaskSpec(const TaskSpecification &&task_spec,
const std::vector<ObjectID> &&dependencies)
: task_spec_(task_spec), dependencies_(dependencies) {}

const TaskSpecification &GetTaskSpecification() const { return task_spec_; }

const std::vector<ObjectID> &GetDependencies() const { return dependencies_; }

private:
/// Raylet task specification.
TaskSpecification task_spec_;

/// Dependencies.
std::vector<ObjectID> dependencies_;
};

enum class StoreProviderType { PLASMA };

enum class TaskTransportType { RAYLET };
Expand Down
10 changes: 6 additions & 4 deletions src/ray/core_worker/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ struct WorkerThreadContext {
put_index = 0;
}

void SetCurrentTask(const TaskSpecification &spec) { SetCurrentTask(spec.TaskId()); }
void SetCurrentTask(const TaskSpecification &task_spec) {
SetCurrentTask(task_spec.TaskId());
}

private:
/// The task ID for current task.
Expand Down Expand Up @@ -62,9 +64,9 @@ const TaskID &WorkerContext::GetCurrentTaskID() const {
return GetThreadContext().GetCurrentTaskID();
}

void WorkerContext::SetCurrentTask(const TaskSpecification &spec) {
current_job_id = spec.JobId();
GetThreadContext().SetCurrentTask(spec);
void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
current_job_id = task_spec.JobId();
GetThreadContext().SetCurrentTask(task_spec);
}

WorkerThreadContext &WorkerContext::GetThreadContext() {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class WorkerContext {

const TaskID &GetCurrentTaskID() const;

void SetCurrentTask(const TaskSpecification &spec);
void SetCurrentTask(const TaskSpecification &task_spec);

int GetNextTaskIndex();

Expand Down
Loading