From b099e841bcb5300f73e40756c50f291ea7adee3f Mon Sep 17 00:00:00 2001 From: Jaewan Hong Date: Fri, 12 Nov 2021 14:08:53 +0000 Subject: [PATCH] blocktasks implemented #1 --- OSDI22/microbench/test_pipeline.py | 1 + src/ray/common/ray_config_def.h | 2 +- src/ray/core_worker/reference_count.cc | 25 +++++++++++-------- src/ray/object_manager/common.h | 4 ++- src/ray/object_manager/object_manager.h | 4 +++ .../plasma/create_request_queue.cc | 13 +++++++--- .../plasma/create_request_queue.h | 5 ++++ src/ray/object_manager/plasma/store.cc | 19 +++++++++++--- src/ray/object_manager/plasma/store.h | 10 ++++++++ src/ray/object_manager/plasma/store_runner.h | 4 +++ src/ray/raylet/node_manager.cc | 15 ++++++++--- .../raylet/scheduling/cluster_task_manager.cc | 18 +++++++++---- .../raylet/scheduling/cluster_task_manager.h | 7 ++++-- .../cluster_task_manager_interface.h | 3 ++- 14 files changed, 101 insertions(+), 29 deletions(-) diff --git a/OSDI22/microbench/test_pipeline.py b/OSDI22/microbench/test_pipeline.py index cb970c6d255b..00f5f359dff5 100644 --- a/OSDI22/microbench/test_pipeline.py +++ b/OSDI22/microbench/test_pipeline.py @@ -37,6 +37,7 @@ def consumer(obj_ref): @ray.remote(num_cpus=1) def producer(): + time.sleep(0.1) for i in range(1000000): pass return np.zeros(OBJECT_SIZE // 8) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index e0019098d919..863ea11c8007 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -374,7 +374,7 @@ RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024) /// If set to less than 1.0, Ray will start spilling objects when existing objects /// take more than this percentage of the available memory. -RAY_CONFIG(float, object_spilling_threshold, 0.8) +RAY_CONFIG(float, object_spilling_threshold, 1.0) /// Maximum number of objects that can be fused into a single file. RAY_CONFIG(int64_t, max_fused_object_count, 2000) diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 8c1bda9b2f2f..ae57e0052e48 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -263,21 +263,26 @@ void ReferenceCounter::RemoveLocalReference(const ObjectID &object_id, } Priority& ReferenceCounter::GetObjectPriority(const ObjectID &object_id){ - auto it = object_id_priority_.find(object_id); - if (it == object_id_priority_.end()) { - // This happens if a large argument is transparently passed by reference - // because we don't hold a Python reference to its ObjectID. - // When an object is made with Put() Priority is not set. Should to this Jae - return Priority(); - it = object_id_priority_.emplace(object_id, Priority()).first; - } - return it->second; + absl::MutexLock lock(&mutex_); + auto it = object_id_priority_.find(object_id); + if (it == object_id_priority_.end()) { + // This happens if a large argument is transparently passed by reference + // because we don't hold a Python reference to its ObjectID. + // When an object is made with Put() Priority is not set. Should to this Jae + /* + Priority pri = Priority(); + return pri; + it = object_id_priority_.emplace(object_id, Priority()).first; + */ + } + return it->second; } void ReferenceCounter::UpdateObjectPriority( const ObjectID &object_id, const Priority &priority){ - object_id_priority_.[object_id] = priority; + absl::MutexLock lock(&mutex_); + object_id_priority_[object_id] = priority; } void ReferenceCounter::UpdateSubmittedTaskReferences( diff --git a/src/ray/object_manager/common.h b/src/ray/object_manager/common.h index d95c904aa8f7..8774ad034d0d 100644 --- a/src/ray/object_manager/common.h +++ b/src/ray/object_manager/common.h @@ -29,7 +29,9 @@ using SpillObjectsCallback = std::function; /// Callback when the creation of object(s) is blocked. The priority is the /// highest priority of a blocked object. -using ObjectCreationBlockedCallback = std::function; +using ObjectCreationBlockedCallback = std::function; + +using SetShouldSpillCallback = std::function; /// A callback to call when space has been released. using SpaceReleasedCallback = std::function; diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 3bd7b1fefec6..fc16764acdd7 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -263,6 +263,10 @@ class ObjectManager : public ObjectManagerInterface, bool PullManagerHasPullsQueued() const { return pull_manager_->HasPullsQueued(); } + void SetShouldSpill(bool should_spill){ + plasma::plasma_store_runner->SetShouldSpill(should_spill); + } + private: friend class TestObjectManager; diff --git a/src/ray/object_manager/plasma/create_request_queue.cc b/src/ray/object_manager/plasma/create_request_queue.cc index 0e6f7ceb7f63..ad0288991670 100644 --- a/src/ray/object_manager/plasma/create_request_queue.cc +++ b/src/ray/object_manager/plasma/create_request_queue.cc @@ -143,6 +143,10 @@ Status CreateRequestQueue::ProcessFirstRequest() { return Status::OK(); } +void CreateRequestQueue::SetShouldSpill(bool should_spill){ + should_spill_ = should_spill; +} + Status CreateRequestQueue::ProcessRequests() { // Suppress OOM dump to once per grace period. bool logged_oom = false; @@ -169,8 +173,11 @@ Status CreateRequestQueue::ProcessRequests() { oom_start_time_ns_ = now; } - bool wait = on_object_creation_blocked_callback_(queue_it->first.first); - if (wait) { + //bool wait = on_object_creation_blocked_callback_(queue_it->first.first); + //if (wait) { + on_object_creation_blocked_callback_(queue_it->first.first); + if(!should_spill_){ + //TODO(Jae actually call BlockTasks) RAY_LOG(INFO) << "Object creation of priority " << queue_it->first.first << " blocked"; return Status::TransientObjectStoreFull("Waiting for higher priority tasks to finish"); } @@ -215,7 +222,7 @@ Status CreateRequestQueue::ProcessRequests() { } void CreateRequestQueue::FinishRequest( - absl::btree_map>::iterator queue_it) { + absl::btree_map>::iterator queue_it) { // Fulfill the request. //auto &request = *(queue_it->second); auto it = fulfilled_requests_.find(queue_it->second->request_id); diff --git a/src/ray/object_manager/plasma/create_request_queue.h b/src/ray/object_manager/plasma/create_request_queue.h index 3bea99a06b36..cdcc47cec7cd 100644 --- a/src/ray/object_manager/plasma/create_request_queue.h +++ b/src/ray/object_manager/plasma/create_request_queue.h @@ -119,6 +119,8 @@ class CreateRequestQueue { size_t NumPendingBytes() const { return num_bytes_pending_; } + void SetShouldSpill(bool should_spill); + private: struct CreateRequest { CreateRequest(const ObjectID &object_id, uint64_t request_id, @@ -213,6 +215,9 @@ class CreateRequestQueue { size_t num_bytes_pending_ = 0; + //Shared between the object store thread and the scheduler thread. + bool should_spill_ = false; + friend class CreateRequestQueueTest; }; diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 3ef89d289f47..74398edc2405 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -74,7 +74,7 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service, IAllocator &allo std::function object_store_full_callback, ray::AddObjectCallback add_object_callback, ray::DeleteObjectCallback delete_object_callback, - ray::ObjectCreationBlockedCallback on_object_creation_blocked_callback) + ray::ObjectCreationBlockedCallback on_object_creation_blocked_callback) : io_context_(main_service), socket_name_(socket_name), acceptor_(main_service, ParseUrlEndpoint(socket_name)), @@ -160,13 +160,26 @@ PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr << ", data_size=" << object_info.data_size << ", metadata_size=" << object_info.metadata_size; } + //TODO(Jae) Erase this later + /* + const int64_t footprint_limit = allocator_.GetFootprintLimit(); + const float allocated_percentage = + static_cast(allocator_.Allocated()) / footprint_limit; + if(allocated_percentage >= block_tasks_threshold_){ + blockTasks(); + } + if(allocated_percentage >= evict_tasks_threshold_){ + evictTasks(); + } + */ // Trigger object spilling if current usage is above the specified threshold. if (spilling_required != nullptr) { const int64_t footprint_limit = allocator_.GetFootprintLimit(); if (footprint_limit != 0) { - const float allocated_percentage = - static_cast(allocator_.Allocated()) / footprint_limit; + const int64_t footprint_limit = allocator_.GetFootprintLimit(); + const float allocated_percentage = + static_cast(allocator_.Allocated()) / footprint_limit; if (allocated_percentage > object_spilling_threshold_) { RAY_LOG(DEBUG) << "Triggering object spilling because current usage " << allocated_percentage << "% is above threshold " diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index ac712fe5f624..c16863cc9706 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -105,6 +105,11 @@ class PlasmaStore { callback(available); } + void SetShouldSpill(bool should_spill){ + absl::MutexLock lock(&mutex_); + return create_request_queue_.SetShouldSpill(should_spill); + } + private: /// Create a new object. The client must do a call to release_object to tell /// the store when it is done with the object. @@ -262,6 +267,11 @@ class PlasmaStore { /// The percentage of object store memory used above which spilling is triggered. const float object_spilling_threshold_; + /// The percentage of object store memory used above which + //blocking new tasks is triggerted + const float block_request_threshold_ = 0.8; + const float evict_request_threshold_ = 0.8; + /// A timer that is set when the first request in the queue is not /// serviceable because there is not enough memory. The request will be /// retried when this timer expires. diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h index 1165c609cdd0..a9952536f623 100644 --- a/src/ray/object_manager/plasma/store_runner.h +++ b/src/ray/object_manager/plasma/store_runner.h @@ -32,6 +32,10 @@ class PlasmaStoreRunner { "PlasmaStoreRunner.GetAvailableMemory"); } + void SetShouldSpill(bool should_spill){ + store_->SetShouldSpill(should_spill); + } + private: void Shutdown(); mutable absl::Mutex store_runner_mutex_; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3963f4737c9c..210dc06b2895 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -227,9 +227,15 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self return GetLocalObjectManager().IsSpillingInProgress(); }, /*on_object_creation_blocked_callback=*/ - [this](const Priority &priority) { + [this](const Priority &base_priority) { //cluster_task_manager_->BlockTasks(priority); - return false; + //TODO(Jae) Remove this line and set the actual value for should_spill + //from the ClusterTaskManager + cluster_task_manager_->BlockTasks(base_priority); + bool should_spill = cluster_task_manager_->EvictTasks(base_priority); + io_service_.post([this, should_spill](){ + object_manager_.SetShouldSpill(should_spill); + },""); }, /*object_store_full_callback=*/ [this]() { @@ -344,7 +350,10 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self std::vector> *results) { return GetObjectsFromPlasma(object_ids, results); }, - max_task_args_memory)); + max_task_args_memory, + [this](bool should_spill){ + object_manager_.SetShouldSpill(should_spill); + })); placement_group_resource_manager_ = std::make_shared( std::dynamic_pointer_cast(cluster_resource_scheduler_), // TODO (Alex): Ideally we could do these in a more robust way (retry diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 9860e31e30ad..6d2549f3a33e 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -40,7 +40,8 @@ ClusterTaskManager::ClusterTaskManager( std::function &object_ids, std::vector> *results)> get_task_arguments, - size_t max_pinned_task_arguments_bytes) + size_t max_pinned_task_arguments_bytes, + SetShouldSpillCallback set_should_spill) : self_node_id_(self_node_id), cluster_resource_scheduler_(cluster_resource_scheduler), task_dependency_manager_(task_dependency_manager), @@ -53,11 +54,13 @@ ClusterTaskManager::ClusterTaskManager( report_worker_backlog_(RayConfig::instance().report_worker_backlog()), worker_pool_(worker_pool), leased_workers_(leased_workers), + block_requested_priority_(Priority()), get_task_arguments_(get_task_arguments), max_pinned_task_arguments_bytes_(max_pinned_task_arguments_bytes), metric_tasks_queued_(0), metric_tasks_dispatched_(0), - metric_tasks_spilled_(0) {} + metric_tasks_spilled_(0), + set_should_spill_(set_should_spill){} bool ClusterTaskManager::SchedulePendingTasks() { // Always try to schedule infeasible tasks in case they are now feasible. @@ -1194,9 +1197,8 @@ bool ClusterTaskManager::ReturnCpuResourcesToBlockedWorker( return false; } -void ClusterTaskManager::BlockTasks(Priority base_priority) { - block_requested_priority_ = base_priority; - +bool ClusterTaskManager::EvictTasks(Priority base_priority) { + bool should_spill = true; for (auto &entry : leased_workers_) { std::shared_ptr worker = entry.second; Priority priority = worker->GetAssignedTask().GetTaskSpecification().GetPriority(); @@ -1205,10 +1207,16 @@ void ClusterTaskManager::BlockTasks(Priority base_priority) { if(priority >= base_priority){ //Consider Using CancelTask instead of DestroyWorker destroy_worker_(worker, rpc::WorkerExitType::INTENDED_EXIT); + should_spill = false; } } //Check Deadlock corner cases //Finer granularity preemption is not considered, kill all the lower priorities + return should_spill; +} + +void ClusterTaskManager::BlockTasks(Priority base_priority) { + block_requested_priority_ = base_priority; } void ClusterTaskManager::ScheduleAndDispatchTasks() { diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 0cea73c7dcc9..c74e232ade08 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -102,11 +102,13 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { std::function &object_ids, std::vector> *results)> get_task_arguments, - size_t max_pinned_task_arguments_bytes); + size_t max_pinned_task_arguments_bytes, + SetShouldSpillCallback set_should_spill); //Preempt currently running tasks with a lower priority //Block new tasks from being scheduled with this priority - void BlockTasks(ray::Priority) override; + void BlockTasks(Priority) override; + bool EvictTasks(Priority) override; /// (Step 1) Queue tasks and schedule. /// Queue task and schedule. This hanppens when processing the worker lease request. @@ -356,6 +358,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { uint64_t metric_tasks_dispatched_; uint64_t metric_tasks_spilled_; + const SetShouldSpillCallback set_should_spill_; /// Determine whether a task should be immediately dispatched, /// or placed on a wait queue. /// diff --git a/src/ray/raylet/scheduling/cluster_task_manager_interface.h b/src/ray/raylet/scheduling/cluster_task_manager_interface.h index 8d4281e4f1dc..f287ddb00653 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_interface.h +++ b/src/ray/raylet/scheduling/cluster_task_manager_interface.h @@ -93,7 +93,8 @@ class ClusterTaskManagerInterface { /// /// \return True if task was successfully removed. This function will return /// false if the task is already running. - virtual void BlockTasks(ray::Priority) = 0; + virtual void BlockTasks(Priority) = 0; + virtual bool EvictTasks(Priority) = 0; virtual bool CancelTask(const TaskID &task_id, bool runtime_env_setup_failed = false) = 0;