Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory scheduling 2 DFS with Priority #3

Merged
merged 19 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion benchmarks/single_node/test_single_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
def assert_no_leaks():
total = ray.cluster_resources()
current = ray.available_resources()
print(total, current)
total.pop("memory")
total.pop("object_store_memory")
current.pop("memory")
Expand Down Expand Up @@ -137,7 +138,8 @@ def test_large_object():
assert big_obj[-1] == 0


ray.init(address="auto")
#ray.init(address="auto")
ray.init()

args_start = perf_counter()
test_many_args()
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
local_mode_ray_tuntime_.GetCurrentTaskId(), 0,
local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1,
required_resources, required_placement_resources,
std::make_pair(PlacementGroupID::Nil(), -1), true, "");
std::make_pair(PlacementGroupID::Nil(), -1), true, "", Priority());
if (invocation.task_type == TaskType::NORMAL_TASK) {
} else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) {
invocation.actor_id = local_mode_ray_tuntime_.GetNextActorID();
Expand Down
15,252 changes: 0 additions & 15,252 deletions dashboard/client/package-lock.json

This file was deleted.

67 changes: 67 additions & 0 deletions python/ray/tests/debug_block_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import ray
import time
import sys
import argparse
import csv
import numpy as np
from time import perf_counter
from time import perf_counter

####################
## Argument Parse ##
####################
parser = argparse.ArgumentParser()
parser.add_argument('--WORKING_SET_RATIO', '-w', type=int, default=2)
parser.add_argument('--OBJECT_STORE_SIZE', '-o', type=int, default=1_000_000_000)
parser.add_argument('--OBJECT_SIZE', '-os', type=int, default=200_000_000)
parser.add_argument('--NUM_STAGES', '-ns', type=int, default=1)
args = parser.parse_args()
params = vars(args)

OBJECT_STORE_SIZE = params['OBJECT_STORE_SIZE']
OBJECT_SIZE = params['OBJECT_SIZE']
WORKING_SET_RATIO = params['WORKING_SET_RATIO']
NUM_STAGES = params['NUM_STAGES']

def test_ray_pipeline():
ray_pipeline_begin = perf_counter()

@ray.remote(num_cpus=1)
def consumer(obj_ref):
#args = ray.get(obj_ref)
return True

@ray.remote(num_cpus=1)
def producer():
time.sleep(0.1)
for i in range(1000000):
pass
return np.zeros(OBJECT_SIZE // 8)

num_fill_object_store = OBJECT_STORE_SIZE//OBJECT_SIZE
produced_objs = [producer.remote() for _ in range(WORKING_SET_RATIO*num_fill_object_store)]
refs = [[] for _ in range(NUM_STAGES)]

for obj in produced_objs:
refs[0].append(consumer.remote(obj))
'''
for stage in range(1, NUM_STAGES):
for r in refs[stage-1]:
refs[stage].append(consumer.remote(r))
'''
del produced_objs
#ray.get(refs[-1])
for r in refs[0]:
print(r)
ray.get(r)
'''
for ref in refs:
for r in ref:
ray.get(r)
'''
ray_pipeline_end = perf_counter()

return ray_pipeline_end - ray_pipeline_begin

ray.init(object_store_memory=OBJECT_STORE_SIZE)
print(test_ray_pipeline())
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024)

/// If set to less than 1.0, Ray will start spilling objects when existing objects
/// take more than this percentage of the available memory.
RAY_CONFIG(float, object_spilling_threshold, 0.8)
RAY_CONFIG(float, object_spilling_threshold, 1.0)

/// Maximum number of objects that can be fused into a single file.
RAY_CONFIG(int64_t, max_fused_object_count, 2000)
Expand Down
23 changes: 23 additions & 0 deletions src/ray/common/task/task_priority.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ void Priority::extend(int64_t size) const {
}
}

void Priority::SetFromParentPriority(Priority &parent, int s){
//param s id the last score to add
if(parent.score.size() == 1 && parent.score[0] == INT_MAX){
score[0] = s;
}else{
score = parent.score;
score.push_back(s);
}
}

bool Priority::operator<(const Priority &rhs) const {
rhs.extend(score.size());
extend(rhs.score.size());
Expand All @@ -25,6 +35,19 @@ bool Priority::operator<=(const Priority &rhs) const {
return score <= rhs.score;
}

bool Priority::operator>(const Priority &rhs) const {
rhs.extend(score.size());
extend(rhs.score.size());

return score > rhs.score;
}

bool Priority::operator>=(const Priority &rhs) const {
rhs.extend(score.size());
extend(rhs.score.size());

return score >= rhs.score;
}

std::ostream &operator<<(std::ostream &os, const Priority &p) {
os << "[ ";
Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/task/task_priority.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ struct Priority {

void extend(int64_t size) const;

void SetFromParentPriority(Priority &parent, int);

bool operator==(const Priority &rhs) const {
rhs.extend(score.size());
extend(rhs.score.size());
Expand All @@ -37,6 +39,10 @@ struct Priority {

bool operator<=(const Priority &rhs) const;

bool operator>(const Priority &rhs) const;

bool operator>=(const Priority &rhs) const;

int GetScore(int64_t depth) const {
extend(depth + 1);
return score[depth];
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class TaskSpecBuilder {
const std::unordered_map<std::string, double> &required_placement_resources,
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string &debugger_breakpoint,
const Priority &priority,
const std::string &serialized_runtime_env = "{}",
const std::vector<std::string> &runtime_env_uris = {},
const std::string &concurrency_group_name = "") {
Expand All @@ -129,6 +130,12 @@ class TaskSpecBuilder {
placement_group_capture_child_tasks);
message_->set_debugger_breakpoint(debugger_breakpoint);
message_->mutable_runtime_env()->set_serialized_runtime_env(serialized_runtime_env);

auto pri = message_->mutable_priority();
for (auto &s : priority.score) {
pri->Add(s);
}

for (const std::string &uri : runtime_env_uris) {
message_->mutable_runtime_env()->add_uris(uri);
}
Expand Down
16 changes: 13 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ void BuildCommonTaskSpec(
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string debugger_breakpoint, const std::string &serialized_runtime_env,
const std::string debugger_breakpoint,
const Priority &priority,
const std::string &serialized_runtime_env,
const std::vector<std::string> &runtime_env_uris,
const std::string &concurrency_group_name = "") {
// Build common task spec.
builder.SetCommonTaskSpec(
task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
current_task_id, task_index, caller_id, address, num_returns, required_resources,
required_placement_resources, bundle_id, placement_group_capture_child_tasks,
debugger_breakpoint, serialized_runtime_env, runtime_env_uris,
debugger_breakpoint,
priority,
serialized_runtime_env, runtime_env_uris,
concurrency_group_name);
// Set task arguments.
for (const auto &arg : args) {
Expand Down Expand Up @@ -681,7 +685,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
std::move(lease_policy), memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds(), actor_creator_,
/*get_task_priority=*/[](const TaskSpecification &spec) {
return Priority();
return spec.GetPriority();
},
RayConfig::instance().max_tasks_in_flight_per_worker(),
boost::asio::steady_timer(io_service_));
Expand Down Expand Up @@ -1671,9 +1675,11 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
rpc_address_, function, args, task_options.num_returns,
constrained_resources, required_resources, placement_options,
placement_group_capture_child_tasks, debugger_breakpoint,
Priority(),
task_options.serialized_runtime_env, task_options.runtime_env_uris);
builder.SetNormalTaskSpec(max_retries, retry_exceptions);
TaskSpecification task_spec = builder.Build();
//priority = task_manager_->GenerateTaskPriority(task_spec);
RAY_LOG(DEBUG) << "Submit task " << task_spec.DebugString();
std::vector<rpc::ObjectReference> returned_refs;
if (options_.is_local_mode) {
Expand All @@ -1683,6 +1689,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
CurrentCallSite(), max_retries);
io_service_.post(
[this, task_spec]() {
//(Jae) This is the reason why tasks are not placed with priority
RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec));
},
"CoreWorker.SubmitTask");
Expand Down Expand Up @@ -1727,6 +1734,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
new_placement_resources, actor_creation_options.placement_options,
actor_creation_options.placement_group_capture_child_tasks,
"", /* debugger_breakpoint */
Priority(),
actor_creation_options.serialized_runtime_env,
actor_creation_options.runtime_env_uris);

Expand Down Expand Up @@ -1911,6 +1919,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(
required_resources, std::make_pair(PlacementGroupID::Nil(), -1),
true, /* placement_group_capture_child_tasks */
"", /* debugger_breakpoint */
Priority(),
"{}", /* serialized_runtime_env */
{}, /* runtime_env_uris */
task_options.concurrency_group_name);
Expand All @@ -1923,6 +1932,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(

// Submit task.
TaskSpecification task_spec = builder.Build();
std::vector<ObjectID> task_deps;
std::vector<rpc::ObjectReference> returned_refs;
if (options_.is_local_mode) {
returned_refs = ExecuteTaskLocalMode(task_spec, actor_id);
Expand Down
14 changes: 14 additions & 0 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,20 @@ void ReferenceCounter::RemoveLocalReference(const ObjectID &object_id,
}
}

Priority& ReferenceCounter::GetObjectPriority(const ObjectID &object_id){
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
absl::MutexLock lock(&mutex_);
auto it = object_id_priority_.find(object_id);
RAY_CHECK(it != object_id_priority_.end()) << "Object priority not found " << object_id;
return it->second;
}

void ReferenceCounter::UpdateObjectPriority(
const ObjectID &object_id,
const Priority &priority){
absl::MutexLock lock(&mutex_);
object_id_priority_[object_id] = priority;
}

void ReferenceCounter::UpdateSubmittedTaskReferences(
const std::vector<ObjectID> &argument_ids_to_add,
const std::vector<ObjectID> &argument_ids_to_remove, std::vector<ObjectID> *deleted) {
Expand Down
10 changes: 10 additions & 0 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class ReferenceCounter : public ReferenceCounterInterface,
void RemoveLocalReference(const ObjectID &object_id, std::vector<ObjectID> *deleted)
LOCKS_EXCLUDED(mutex_);

Priority& GetObjectPriority(const ObjectID &object_id);
void UpdateObjectPriority(
const ObjectID &object_id,
const Priority &priority);

/// Add references for the provided object IDs that correspond to them being
/// dependencies to a submitted task. If lineage pinning is enabled, then
/// this will also pin the Reference entry for each new argument until the
Expand Down Expand Up @@ -633,6 +638,8 @@ class ReferenceCounter : public ReferenceCounterInterface,

using ReferenceTable = absl::flat_hash_map<ObjectID, Reference>;

using PriorityTable = absl::flat_hash_map<ObjectID, Priority>;

void SetNestedRefInUseRecursive(ReferenceTable::iterator inner_ref_it)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Expand Down Expand Up @@ -799,6 +806,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Holds all reference counts and dependency information for tracked ObjectIDs.
ReferenceTable object_id_refs_ GUARDED_BY(mutex_);

/// Holds priority of tracked ObjectIDs.
PriorityTable object_id_priority_ GUARDED_BY(mutex_);

/// Objects whose values have been freed by the language frontend.
/// The values in plasma will not be pinned. An object ID is
/// removed from this set once its Reference has been deleted
Expand Down
23 changes: 22 additions & 1 deletion src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,27 @@ const int64_t kTaskFailureThrottlingThreshold = 50;
// Throttle task failure logs to once this interval.
const int64_t kTaskFailureLoggingFrequencyMillis = 5000;

//TODO(Jae) Delete object priority when a task is finished
Priority TaskManager::GenerateTaskPriority(
TaskSpecification &spec, std::vector<ObjectID> &task_deps) {
RAY_LOG(DEBUG) << "Generating priority of task " << spec.TaskId();
Priority dummy_pri = Priority();
Priority &max_priority = dummy_pri;
for (const ObjectID &argument_id : task_deps) {
Priority &p = reference_counter_->GetObjectPriority(argument_id);
if(max_priority > p){
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want min_priority, not max?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's little messed up.
Btree_map picks the lowest value and from priority queue perspective it is the highest priority.
Not sure how to name it to make it clear max or min

max_priority = p;
}
}

Priority pri;
pri.SetFromParentPriority(max_priority, new_priority_s++);
spec.SetPriority(pri);
return pri;
}

std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
const rpc::Address &caller_address, const TaskSpecification &spec,
const rpc::Address &caller_address, TaskSpecification &spec,
const std::string &call_site, int max_retries) {
RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries
<< " retries";
Expand Down Expand Up @@ -63,6 +82,7 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
num_returns--;
}
std::vector<rpc::ObjectReference> returned_refs;
Priority task_priority = GenerateTaskPriority(spec, task_deps);
for (size_t i = 0; i < num_returns; i++) {
if (!spec.IsActorCreationTask()) {
// We pass an empty vector for inner IDs because we do not know the return
Expand All @@ -79,6 +99,7 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
ref.set_object_id(spec.ReturnId(i).Binary());
ref.mutable_owner_address()->CopyFrom(caller_address);
ref.set_call_site(call_site);
reference_counter_->UpdateObjectPriority(spec.ReturnId(i), task_priority);
returned_refs.push_back(std::move(ref));
}

Expand Down
9 changes: 8 additions & 1 deletion src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
RemoveLineageReference(object_id, ids_to_release);
ShutdownIfNeeded();
});
new_priority_s = 0;
}

Priority GenerateTaskPriority(TaskSpecification &spec, std::vector<ObjectID> &task_deps);

/// Add a task that is pending execution.
///
/// \param[in] caller_address The rpc address of the calling task.
Expand All @@ -96,7 +99,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// on failure.
/// \return ObjectRefs returned by this task.
std::vector<rpc::ObjectReference> AddPendingTask(const rpc::Address &caller_address,
const TaskSpecification &spec,
TaskSpecification &spec,
const std::string &call_site,
int max_retries = 0);

Expand Down Expand Up @@ -258,6 +261,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// Used to store task results.
std::shared_ptr<CoreWorkerMemoryStore> in_memory_store_;

// Priority id to assign when a new task is invoked.
// Sequentially increase new_priority_s after assign this to a new priority
int new_priority_s;

/// Used for reference counting objects.
/// The task manager is responsible for managing all references related to
/// submitted tasks (dependencies and return objects).
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/core_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
builder.SetCommonTaskSpec(RandomTaskId(), options.name, function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0,
RandomTaskId(), address, num_returns, resources, resources,
std::make_pair(PlacementGroupID::Nil(), -1), true, "");
std::make_pair(PlacementGroupID::Nil(), -1), true, "", Priority());
// Set task arguments.
for (const auto &arg : args) {
builder.AddArg(*arg);
Expand Down
Loading