From 840058a4d4a7c23f92a1a808c47ba9115b94b63a Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 9 Jul 2019 17:01:32 +0800 Subject: [PATCH] Add checks in task spec getter methods --- src/ray/common/task/task.cc | 9 ++ src/ray/common/task/task.h | 2 + src/ray/common/task/task_execution_spec.cc | 9 ++ src/ray/common/task/task_execution_spec.h | 2 + src/ray/common/task/task_spec.cc | 98 ++++++++++++---------- src/ray/common/task/task_spec.h | 5 +- src/ray/raylet/node_manager.cc | 8 +- 7 files changed, 78 insertions(+), 55 deletions(-) diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index 35cde9dd75ea..3a603d72da08 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -1,3 +1,5 @@ +#include + #include "task.h" namespace ray { @@ -32,4 +34,11 @@ void Task::CopyTaskExecutionSpec(const Task &task) { ComputeDependencies(); } +std::string Task::DebugString() const { + std::ostringstream stream; + stream << "task_spec={" << task_spec_.DebugString() << "}, task_execution_spec={" + << task_execution_spec_.DebugString() << "}"; + return stream.str(); +} + } // namespace ray diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index 39bd4b8b6322..d1ef7c971de4 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -58,6 +58,8 @@ class Task { /// \param task Task structure with updated dynamic information. void CopyTaskExecutionSpec(const Task &task); + std::string DebugString() const; + private: void ComputeDependencies(); diff --git a/src/ray/common/task/task_execution_spec.cc b/src/ray/common/task/task_execution_spec.cc index 0d6c9cacbd76..8494143893ae 100644 --- a/src/ray/common/task/task_execution_spec.cc +++ b/src/ray/common/task/task_execution_spec.cc @@ -1,3 +1,5 @@ +#include + #include "ray/common/task/task_execution_spec.h" namespace ray { @@ -12,4 +14,11 @@ void TaskExecutionSpecification::IncrementNumForwards() { message_.set_num_forwards(message_.num_forwards() + 1); } +std::string TaskExecutionSpecification::DebugString() const { + std::ostringstream stream; + stream << "num_dependencies=" << message_.dependencies_size() + << ", num_forwards=" << message_.num_forwards(); + return stream.str(); +} + } // namespace ray diff --git a/src/ray/common/task/task_execution_spec.h b/src/ray/common/task/task_execution_spec.h index 167f70030fa9..217486da1a39 100644 --- a/src/ray/common/task/task_execution_spec.h +++ b/src/ray/common/task/task_execution_spec.h @@ -38,6 +38,8 @@ class TaskExecutionSpecification : public MessageWrapper /// Increment the number of times this task has been forwarded. void IncrementNumForwards(); + + std::string DebugString() const; }; } // namespace ray diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 2239c4af8dac..6370b4aeaa59 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -1,4 +1,3 @@ - #include #include "ray/common/task/task_spec.h" @@ -34,20 +33,6 @@ std::vector TaskSpecification::FunctionDescriptor() const { return VectorFromProtobuf(message_.function_descriptor()); } -std::string TaskSpecification::FunctionDescriptorString() const { - auto list = VectorFromProtobuf(message_.function_descriptor()); - std::ostringstream stream; - // The 4th is the code hash which is binary bits. No need to output it. - size_t size = std::min(static_cast(3), list.size()); - for (int i = 0; i < size; ++i) { - if (i != 0) { - stream << ","; - } - stream << list[i]; - } - return stream.str(); -} - size_t TaskSpecification::NumArgs() const { return message_.args_size(); } size_t TaskSpecification::NumReturns() const { return message_.num_returns(); } @@ -99,66 +84,89 @@ bool TaskSpecification::IsActorTask() const { return message_.type() == TaskType::ACTOR_TASK; } +// === Below are getter methods for actor creation tasks. + ActorID TaskSpecification::ActorCreationId() const { - // TODO(hchen) Add a check to make sure this function can only be called if - // task is an actor creation task. - if (!IsActorCreationTask()) { - return ActorID::Nil(); - } + RAY_CHECK(IsActorCreationTask()); return ActorID::FromBinary(message_.actor_creation_task_spec().actor_id()); } -ObjectID TaskSpecification::ActorCreationDummyObjectId() const { - if (!IsActorTask()) { - return ObjectID::Nil(); - } - return ObjectID::FromBinary( - message_.actor_task_spec().actor_creation_dummy_object_id()); -} - uint64_t TaskSpecification::MaxActorReconstructions() const { - if (!IsActorCreationTask()) { - return 0; - } + RAY_CHECK(IsActorCreationTask()); return message_.actor_creation_task_spec().max_actor_reconstructions(); } +std::vector TaskSpecification::DynamicWorkerOptions() const { + RAY_CHECK(IsActorCreationTask()); + return VectorFromProtobuf(message_.actor_creation_task_spec().dynamic_worker_options()); +} + +// === Below are getter methods for actor tasks. + ActorID TaskSpecification::ActorId() const { - if (!IsActorTask()) { - return ActorID::Nil(); - } + RAY_CHECK(IsActorTask()); return ActorID::FromBinary(message_.actor_task_spec().actor_id()); } ActorHandleID TaskSpecification::ActorHandleId() const { - if (!IsActorTask()) { - return ActorHandleID::Nil(); - } + RAY_CHECK(IsActorTask()); return ActorHandleID::FromBinary(message_.actor_task_spec().actor_handle_id()); } uint64_t TaskSpecification::ActorCounter() const { - if (!IsActorTask()) { - return 0; - } + RAY_CHECK(IsActorTask()); return message_.actor_task_spec().actor_counter(); } +ObjectID TaskSpecification::ActorCreationDummyObjectId() const { + RAY_CHECK(IsActorTask()); + return ObjectID::FromBinary( + message_.actor_task_spec().actor_creation_dummy_object_id()); +} + ObjectID TaskSpecification::ActorDummyObject() const { RAY_CHECK(IsActorTask() || IsActorCreationTask()); return ReturnId(NumReturns() - 1); } std::vector TaskSpecification::NewActorHandles() const { - if (!IsActorTask()) { - return {}; - } + RAY_CHECK(IsActorTask()); return IdVectorFromProtobuf( message_.actor_task_spec().new_actor_handles()); } -std::vector TaskSpecification::DynamicWorkerOptions() const { - return VectorFromProtobuf(message_.actor_creation_task_spec().dynamic_worker_options()); +std::string TaskSpecification::DebugString() const { + std::ostringstream stream; + stream << "Type=" << TaskType_Name(message_.type()) + << ", Language=" << Language_Name(message_.language()) + << ", function_descriptor="; + + // Print function descriptor. + auto list = VectorFromProtobuf(message_.function_descriptor()); + // The 4th is the code hash which is binary bits. No need to output it. + size_t size = std::min(static_cast(3), list.size()); + for (int i = 0; i < size; ++i) { + if (i != 0) { + stream << ","; + } + stream << list[i]; + } + + stream << ", task_id=" << TaskId() << ", job_id=" << JobId() + << ", num_args=" << NumArgs() << ", num_returns" << NumReturns(); + + if (IsActorCreationTask()) { + // Print actor creation task spec. + stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId() + << ", max_reconstructions=" << MaxActorReconstructions() << "}"; + } else if (IsActorTask()) { + // Print actor task spec. + stream << ", actor_task_spec={actor_id=" << ActorId() + << ", actor_handle_id=" << ActorHandleId() + << ", actor_counter=" << ActorCounter() << "}"; + } + + return stream.str(); } } // namespace ray diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index b811735eea97..2c86b533d65c 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -47,9 +47,6 @@ class TaskSpecification : public MessageWrapper { std::vector FunctionDescriptor() const; - // Output the function descriptor as a string for log purpose. - std::string FunctionDescriptorString() const; - size_t NumArgs() const; size_t NumReturns() const; @@ -111,6 +108,8 @@ class TaskSpecification : public MessageWrapper { std::vector DynamicWorkerOptions() const; + std::string DebugString() const; + private: void ComputeResources(); /// Field storing required resources. Initalized in constructor. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9859695754c7..2edba0895522 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1463,13 +1463,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag stats::TaskCountReceived().Record(1); const TaskSpecification &spec = task.GetTaskSpecification(); const TaskID &task_id = spec.TaskId(); - RAY_LOG(DEBUG) << "Submitting task: task_id=" << task_id - << ", actor_id=" << spec.ActorId() - << ", actor_creation_id=" << spec.ActorCreationId() - << ", actor_handle_id=" << spec.ActorHandleId() - << ", actor_counter=" << spec.ActorCounter() - << ", task_descriptor=" << spec.FunctionDescriptorString() << " on node " - << gcs_client_->client_table().GetLocalClientId(); + RAY_LOG(DEBUG) << "Submitting task: " << task.DebugString(); if (local_queues_.HasTask(task_id)) { RAY_LOG(WARNING) << "Submitted task " << task_id