Skip to content

Commit

Permalink
Add checks in task spec getter methods
Browse files Browse the repository at this point in the history
  • Loading branch information
raulchen committed Jul 9, 2019
1 parent 53342a2 commit 840058a
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 55 deletions.
9 changes: 9 additions & 0 deletions src/ray/common/task/task.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <sstream>

#include "task.h"

namespace ray {
Expand Down Expand Up @@ -32,4 +34,11 @@ void Task::CopyTaskExecutionSpec(const Task &task) {
ComputeDependencies();
}

std::string Task::DebugString() const {
std::ostringstream stream;
stream << "task_spec={" << task_spec_.DebugString() << "}, task_execution_spec={"
<< task_execution_spec_.DebugString() << "}";
return stream.str();
}

} // namespace ray
2 changes: 2 additions & 0 deletions src/ray/common/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class Task {
/// \param task Task structure with updated dynamic information.
void CopyTaskExecutionSpec(const Task &task);

std::string DebugString() const;

private:
void ComputeDependencies();

Expand Down
9 changes: 9 additions & 0 deletions src/ray/common/task/task_execution_spec.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <sstream>

#include "ray/common/task/task_execution_spec.h"

namespace ray {
Expand All @@ -12,4 +14,11 @@ void TaskExecutionSpecification::IncrementNumForwards() {
message_.set_num_forwards(message_.num_forwards() + 1);
}

std::string TaskExecutionSpecification::DebugString() const {
std::ostringstream stream;
stream << "num_dependencies=" << message_.dependencies_size()
<< ", num_forwards=" << message_.num_forwards();
return stream.str();
}

} // namespace ray
2 changes: 2 additions & 0 deletions src/ray/common/task/task_execution_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class TaskExecutionSpecification : public MessageWrapper<rpc::TaskExecutionSpec>

/// Increment the number of times this task has been forwarded.
void IncrementNumForwards();

std::string DebugString() const;
};

} // namespace ray
Expand Down
98 changes: 53 additions & 45 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include <sstream>

#include "ray/common/task/task_spec.h"
Expand Down Expand Up @@ -34,20 +33,6 @@ std::vector<std::string> TaskSpecification::FunctionDescriptor() const {
return VectorFromProtobuf(message_.function_descriptor());
}

std::string TaskSpecification::FunctionDescriptorString() const {
auto list = VectorFromProtobuf(message_.function_descriptor());
std::ostringstream stream;
// The 4th is the code hash which is binary bits. No need to output it.
size_t size = std::min(static_cast<size_t>(3), list.size());
for (int i = 0; i < size; ++i) {
if (i != 0) {
stream << ",";
}
stream << list[i];
}
return stream.str();
}

size_t TaskSpecification::NumArgs() const { return message_.args_size(); }

size_t TaskSpecification::NumReturns() const { return message_.num_returns(); }
Expand Down Expand Up @@ -99,66 +84,89 @@ bool TaskSpecification::IsActorTask() const {
return message_.type() == TaskType::ACTOR_TASK;
}

// === Below are getter methods for actor creation tasks.

ActorID TaskSpecification::ActorCreationId() const {
// TODO(hchen) Add a check to make sure this function can only be called if
// task is an actor creation task.
if (!IsActorCreationTask()) {
return ActorID::Nil();
}
RAY_CHECK(IsActorCreationTask());
return ActorID::FromBinary(message_.actor_creation_task_spec().actor_id());
}

ObjectID TaskSpecification::ActorCreationDummyObjectId() const {
if (!IsActorTask()) {
return ObjectID::Nil();
}
return ObjectID::FromBinary(
message_.actor_task_spec().actor_creation_dummy_object_id());
}

uint64_t TaskSpecification::MaxActorReconstructions() const {
if (!IsActorCreationTask()) {
return 0;
}
RAY_CHECK(IsActorCreationTask());
return message_.actor_creation_task_spec().max_actor_reconstructions();
}

std::vector<std::string> TaskSpecification::DynamicWorkerOptions() const {
RAY_CHECK(IsActorCreationTask());
return VectorFromProtobuf(message_.actor_creation_task_spec().dynamic_worker_options());
}

// === Below are getter methods for actor tasks.

ActorID TaskSpecification::ActorId() const {
if (!IsActorTask()) {
return ActorID::Nil();
}
RAY_CHECK(IsActorTask());
return ActorID::FromBinary(message_.actor_task_spec().actor_id());
}

ActorHandleID TaskSpecification::ActorHandleId() const {
if (!IsActorTask()) {
return ActorHandleID::Nil();
}
RAY_CHECK(IsActorTask());
return ActorHandleID::FromBinary(message_.actor_task_spec().actor_handle_id());
}

uint64_t TaskSpecification::ActorCounter() const {
if (!IsActorTask()) {
return 0;
}
RAY_CHECK(IsActorTask());
return message_.actor_task_spec().actor_counter();
}

ObjectID TaskSpecification::ActorCreationDummyObjectId() const {
RAY_CHECK(IsActorTask());
return ObjectID::FromBinary(
message_.actor_task_spec().actor_creation_dummy_object_id());
}

ObjectID TaskSpecification::ActorDummyObject() const {
RAY_CHECK(IsActorTask() || IsActorCreationTask());
return ReturnId(NumReturns() - 1);
}

std::vector<ActorHandleID> TaskSpecification::NewActorHandles() const {
if (!IsActorTask()) {
return {};
}
RAY_CHECK(IsActorTask());
return IdVectorFromProtobuf<ActorHandleID>(
message_.actor_task_spec().new_actor_handles());
}

std::vector<std::string> TaskSpecification::DynamicWorkerOptions() const {
return VectorFromProtobuf(message_.actor_creation_task_spec().dynamic_worker_options());
std::string TaskSpecification::DebugString() const {
std::ostringstream stream;
stream << "Type=" << TaskType_Name(message_.type())
<< ", Language=" << Language_Name(message_.language())
<< ", function_descriptor=";

// Print function descriptor.
auto list = VectorFromProtobuf(message_.function_descriptor());
// The 4th is the code hash which is binary bits. No need to output it.
size_t size = std::min(static_cast<size_t>(3), list.size());
for (int i = 0; i < size; ++i) {
if (i != 0) {
stream << ",";
}
stream << list[i];
}

stream << ", task_id=" << TaskId() << ", job_id=" << JobId()
<< ", num_args=" << NumArgs() << ", num_returns" << NumReturns();

if (IsActorCreationTask()) {
// Print actor creation task spec.
stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId()
<< ", max_reconstructions=" << MaxActorReconstructions() << "}";
} else if (IsActorTask()) {
// Print actor task spec.
stream << ", actor_task_spec={actor_id=" << ActorId()
<< ", actor_handle_id=" << ActorHandleId()
<< ", actor_counter=" << ActorCounter() << "}";
}

return stream.str();
}

} // namespace ray
5 changes: 2 additions & 3 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

std::vector<std::string> FunctionDescriptor() const;

// Output the function descriptor as a string for log purpose.
std::string FunctionDescriptorString() const;

size_t NumArgs() const;

size_t NumReturns() const;
Expand Down Expand Up @@ -111,6 +108,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

std::vector<std::string> DynamicWorkerOptions() const;

std::string DebugString() const;

private:
void ComputeResources();
/// Field storing required resources. Initalized in constructor.
Expand Down
8 changes: 1 addition & 7 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1463,13 +1463,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
stats::TaskCountReceived().Record(1);
const TaskSpecification &spec = task.GetTaskSpecification();
const TaskID &task_id = spec.TaskId();
RAY_LOG(DEBUG) << "Submitting task: task_id=" << task_id
<< ", actor_id=" << spec.ActorId()
<< ", actor_creation_id=" << spec.ActorCreationId()
<< ", actor_handle_id=" << spec.ActorHandleId()
<< ", actor_counter=" << spec.ActorCounter()
<< ", task_descriptor=" << spec.FunctionDescriptorString() << " on node "
<< gcs_client_->client_table().GetLocalClientId();
RAY_LOG(DEBUG) << "Submitting task: " << task.DebugString();

if (local_queues_.HasTask(task_id)) {
RAY_LOG(WARNING) << "Submitted task " << task_id
Expand Down

0 comments on commit 840058a

Please sign in to comment.