Skip to content

Commit

Permalink
blocktasks implemented #1
Browse files Browse the repository at this point in the history
  • Loading branch information
jaewan committed Nov 12, 2021
1 parent 4506300 commit b099e84
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 29 deletions.
1 change: 1 addition & 0 deletions OSDI22/microbench/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def consumer(obj_ref):

@ray.remote(num_cpus=1)
def producer():
time.sleep(0.1)
for i in range(1000000):
pass
return np.zeros(OBJECT_SIZE // 8)
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024)

/// If set to less than 1.0, Ray will start spilling objects when existing objects
/// take more than this percentage of the available memory.
RAY_CONFIG(float, object_spilling_threshold, 0.8)
RAY_CONFIG(float, object_spilling_threshold, 1.0)

/// Maximum number of objects that can be fused into a single file.
RAY_CONFIG(int64_t, max_fused_object_count, 2000)
Expand Down
25 changes: 15 additions & 10 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,26 @@ void ReferenceCounter::RemoveLocalReference(const ObjectID &object_id,
}

Priority& ReferenceCounter::GetObjectPriority(const ObjectID &object_id){
auto it = object_id_priority_.find(object_id);
if (it == object_id_priority_.end()) {
// This happens if a large argument is transparently passed by reference
// because we don't hold a Python reference to its ObjectID.
// When an object is made with Put() Priority is not set. Should to this Jae
return Priority();
it = object_id_priority_.emplace(object_id, Priority()).first;
}
return it->second;
absl::MutexLock lock(&mutex_);
auto it = object_id_priority_.find(object_id);
if (it == object_id_priority_.end()) {
// This happens if a large argument is transparently passed by reference
// because we don't hold a Python reference to its ObjectID.
// When an object is made with Put() Priority is not set. Should to this Jae
/*
Priority pri = Priority();
return pri;
it = object_id_priority_.emplace(object_id, Priority()).first;
*/
}
return it->second;
}

void ReferenceCounter::UpdateObjectPriority(
const ObjectID &object_id,
const Priority &priority){
object_id_priority_.[object_id] = priority;
absl::MutexLock lock(&mutex_);
object_id_priority_[object_id] = priority;
}

void ReferenceCounter::UpdateSubmittedTaskReferences(
Expand Down
4 changes: 3 additions & 1 deletion src/ray/object_manager/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ using SpillObjectsCallback = std::function<bool()>;

/// Callback when the creation of object(s) is blocked. The priority is the
/// highest priority of a blocked object.
using ObjectCreationBlockedCallback = std::function<bool(const ray::Priority &priority)>;
using ObjectCreationBlockedCallback = std::function<void(const ray::Priority &priority)>;

using SetShouldSpillCallback = std::function<void(bool should_spill)>;

/// A callback to call when space has been released.
using SpaceReleasedCallback = std::function<void()>;
Expand Down
4 changes: 4 additions & 0 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ class ObjectManager : public ObjectManagerInterface,

bool PullManagerHasPullsQueued() const { return pull_manager_->HasPullsQueued(); }

void SetShouldSpill(bool should_spill){
plasma::plasma_store_runner->SetShouldSpill(should_spill);
}

private:
friend class TestObjectManager;

Expand Down
13 changes: 10 additions & 3 deletions src/ray/object_manager/plasma/create_request_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ Status CreateRequestQueue::ProcessFirstRequest() {
return Status::OK();
}

void CreateRequestQueue::SetShouldSpill(bool should_spill){
should_spill_ = should_spill;
}

Status CreateRequestQueue::ProcessRequests() {
// Suppress OOM dump to once per grace period.
bool logged_oom = false;
Expand All @@ -169,8 +173,11 @@ Status CreateRequestQueue::ProcessRequests() {
oom_start_time_ns_ = now;
}

bool wait = on_object_creation_blocked_callback_(queue_it->first.first);
if (wait) {
//bool wait = on_object_creation_blocked_callback_(queue_it->first.first);
//if (wait) {
on_object_creation_blocked_callback_(queue_it->first.first);
if(!should_spill_){
//TODO(Jae actually call BlockTasks)
RAY_LOG(INFO) << "Object creation of priority " << queue_it->first.first << " blocked";
return Status::TransientObjectStoreFull("Waiting for higher priority tasks to finish");
}
Expand Down Expand Up @@ -215,7 +222,7 @@ Status CreateRequestQueue::ProcessRequests() {
}

void CreateRequestQueue::FinishRequest(
absl::btree_map<ray::TaskKey, std::unique_ptr<CreateRequest>>::iterator queue_it) {
absl::btree_map<ray::TaskKey, std::unique_ptr<CreateRequest>>::iterator queue_it) {
// Fulfill the request.
//auto &request = *(queue_it->second);
auto it = fulfilled_requests_.find(queue_it->second->request_id);
Expand Down
5 changes: 5 additions & 0 deletions src/ray/object_manager/plasma/create_request_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class CreateRequestQueue {

size_t NumPendingBytes() const { return num_bytes_pending_; }

void SetShouldSpill(bool should_spill);

private:
struct CreateRequest {
CreateRequest(const ObjectID &object_id, uint64_t request_id,
Expand Down Expand Up @@ -213,6 +215,9 @@ class CreateRequestQueue {

size_t num_bytes_pending_ = 0;

//Shared between the object store thread and the scheduler thread.
bool should_spill_ = false;

friend class CreateRequestQueueTest;
};

Expand Down
19 changes: 16 additions & 3 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service, IAllocator &allo
std::function<void()> object_store_full_callback,
ray::AddObjectCallback add_object_callback,
ray::DeleteObjectCallback delete_object_callback,
ray::ObjectCreationBlockedCallback on_object_creation_blocked_callback)
ray::ObjectCreationBlockedCallback on_object_creation_blocked_callback)
: io_context_(main_service),
socket_name_(socket_name),
acceptor_(main_service, ParseUrlEndpoint(socket_name)),
Expand Down Expand Up @@ -160,13 +160,26 @@ PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr<Client>
<< ", data_size=" << object_info.data_size
<< ", metadata_size=" << object_info.metadata_size;
}
//TODO(Jae) Erase this later
/*
const int64_t footprint_limit = allocator_.GetFootprintLimit();
const float allocated_percentage =
static_cast<float>(allocator_.Allocated()) / footprint_limit;
if(allocated_percentage >= block_tasks_threshold_){
blockTasks();
}
if(allocated_percentage >= evict_tasks_threshold_){
evictTasks();
}
*/

// Trigger object spilling if current usage is above the specified threshold.
if (spilling_required != nullptr) {
const int64_t footprint_limit = allocator_.GetFootprintLimit();
if (footprint_limit != 0) {
const float allocated_percentage =
static_cast<float>(allocator_.Allocated()) / footprint_limit;
const int64_t footprint_limit = allocator_.GetFootprintLimit();
const float allocated_percentage =
static_cast<float>(allocator_.Allocated()) / footprint_limit;
if (allocated_percentage > object_spilling_threshold_) {
RAY_LOG(DEBUG) << "Triggering object spilling because current usage "
<< allocated_percentage << "% is above threshold "
Expand Down
10 changes: 10 additions & 0 deletions src/ray/object_manager/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ class PlasmaStore {
callback(available);
}

void SetShouldSpill(bool should_spill){
absl::MutexLock lock(&mutex_);
return create_request_queue_.SetShouldSpill(should_spill);
}

private:
/// Create a new object. The client must do a call to release_object to tell
/// the store when it is done with the object.
Expand Down Expand Up @@ -262,6 +267,11 @@ class PlasmaStore {
/// The percentage of object store memory used above which spilling is triggered.
const float object_spilling_threshold_;

/// The percentage of object store memory used above which
//blocking new tasks is triggerted
const float block_request_threshold_ = 0.8;
const float evict_request_threshold_ = 0.8;

/// A timer that is set when the first request in the queue is not
/// serviceable because there is not enough memory. The request will be
/// retried when this timer expires.
Expand Down
4 changes: 4 additions & 0 deletions src/ray/object_manager/plasma/store_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class PlasmaStoreRunner {
"PlasmaStoreRunner.GetAvailableMemory");
}

void SetShouldSpill(bool should_spill){
store_->SetShouldSpill(should_spill);
}

private:
void Shutdown();
mutable absl::Mutex store_runner_mutex_;
Expand Down
15 changes: 12 additions & 3 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,15 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
return GetLocalObjectManager().IsSpillingInProgress();
},
/*on_object_creation_blocked_callback=*/
[this](const Priority &priority) {
[this](const Priority &base_priority) {
//cluster_task_manager_->BlockTasks(priority);
return false;
//TODO(Jae) Remove this line and set the actual value for should_spill
//from the ClusterTaskManager
cluster_task_manager_->BlockTasks(base_priority);
bool should_spill = cluster_task_manager_->EvictTasks(base_priority);
io_service_.post([this, should_spill](){
object_manager_.SetShouldSpill(should_spill);
},"");
},
/*object_store_full_callback=*/
[this]() {
Expand Down Expand Up @@ -344,7 +350,10 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
std::vector<std::unique_ptr<RayObject>> *results) {
return GetObjectsFromPlasma(object_ids, results);
},
max_task_args_memory));
max_task_args_memory,
[this](bool should_spill){
object_manager_.SetShouldSpill(should_spill);
}));
placement_group_resource_manager_ = std::make_shared<NewPlacementGroupResourceManager>(
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
// TODO (Alex): Ideally we could do these in a more robust way (retry
Expand Down
18 changes: 13 additions & 5 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ ClusterTaskManager::ClusterTaskManager(
std::function<bool(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results)>
get_task_arguments,
size_t max_pinned_task_arguments_bytes)
size_t max_pinned_task_arguments_bytes,
SetShouldSpillCallback set_should_spill)
: self_node_id_(self_node_id),
cluster_resource_scheduler_(cluster_resource_scheduler),
task_dependency_manager_(task_dependency_manager),
Expand All @@ -53,11 +54,13 @@ ClusterTaskManager::ClusterTaskManager(
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
worker_pool_(worker_pool),
leased_workers_(leased_workers),
block_requested_priority_(Priority()),
get_task_arguments_(get_task_arguments),
max_pinned_task_arguments_bytes_(max_pinned_task_arguments_bytes),
metric_tasks_queued_(0),
metric_tasks_dispatched_(0),
metric_tasks_spilled_(0) {}
metric_tasks_spilled_(0),
set_should_spill_(set_should_spill){}

bool ClusterTaskManager::SchedulePendingTasks() {
// Always try to schedule infeasible tasks in case they are now feasible.
Expand Down Expand Up @@ -1194,9 +1197,8 @@ bool ClusterTaskManager::ReturnCpuResourcesToBlockedWorker(
return false;
}

void ClusterTaskManager::BlockTasks(Priority base_priority) {
block_requested_priority_ = base_priority;

bool ClusterTaskManager::EvictTasks(Priority base_priority) {
bool should_spill = true;
for (auto &entry : leased_workers_) {
std::shared_ptr<WorkerInterface> worker = entry.second;
Priority priority = worker->GetAssignedTask().GetTaskSpecification().GetPriority();
Expand All @@ -1205,10 +1207,16 @@ void ClusterTaskManager::BlockTasks(Priority base_priority) {
if(priority >= base_priority){
//Consider Using CancelTask instead of DestroyWorker
destroy_worker_(worker, rpc::WorkerExitType::INTENDED_EXIT);
should_spill = false;
}
}
//Check Deadlock corner cases
//Finer granularity preemption is not considered, kill all the lower priorities
return should_spill;
}

void ClusterTaskManager::BlockTasks(Priority base_priority) {
block_requested_priority_ = base_priority;
}

void ClusterTaskManager::ScheduleAndDispatchTasks() {
Expand Down
7 changes: 5 additions & 2 deletions src/ray/raylet/scheduling/cluster_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
std::function<bool(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results)>
get_task_arguments,
size_t max_pinned_task_arguments_bytes);
size_t max_pinned_task_arguments_bytes,
SetShouldSpillCallback set_should_spill);

//Preempt currently running tasks with a lower priority
//Block new tasks from being scheduled with this priority
void BlockTasks(ray::Priority) override;
void BlockTasks(Priority) override;
bool EvictTasks(Priority) override;

/// (Step 1) Queue tasks and schedule.
/// Queue task and schedule. This hanppens when processing the worker lease request.
Expand Down Expand Up @@ -356,6 +358,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
uint64_t metric_tasks_dispatched_;
uint64_t metric_tasks_spilled_;

const SetShouldSpillCallback set_should_spill_;
/// Determine whether a task should be immediately dispatched,
/// or placed on a wait queue.
///
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/scheduling/cluster_task_manager_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class ClusterTaskManagerInterface {
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
virtual void BlockTasks(ray::Priority) = 0;
virtual void BlockTasks(Priority) = 0;
virtual bool EvictTasks(Priority) = 0;
virtual bool CancelTask(const TaskID &task_id,
bool runtime_env_setup_failed = false) = 0;

Expand Down

0 comments on commit b099e84

Please sign in to comment.