Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory scheduling 2 #1

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions OSDI22/buildRay
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pushd ../dashboard/client
npm install
npm run build
popd
cd ../python
pip install -e . --verbose
27 changes: 27 additions & 0 deletions OSDI22/microbench/a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import ray
import numpy as np

NUM_PARTITION = 4

@ray.remote
def map(data, npartitions):
outputs = [list() for _ in range(NUM_PARTITION)]
for row in data:
outputs[int(row * NUM_PARTITION)].append(row)
return tuple(sorted(output) for output in outputs)

@ray.remote
def reduce(*partitions):
# Flatten and sort the partitions.
return sorted(row for partition in partitions for row in partition)

ray.init()
dataset = [np.random.rand(12) for _ in range(NUM_PARTITION)] # Random floats from the range [0, 1).
map_outputs = [
map.options(num_returns=NUM_PARTITION).remote(partition, NUM_PARTITION)
for partition in dataset]
outputs = []
for i in range(NUM_PARTITION):
# Gather one output from each map task.
outputs.append(reduce.remote(*[partition[i] for partition in map_outputs]))
print(ray.get(outputs))
39 changes: 39 additions & 0 deletions OSDI22/microbench/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#! /bin/bash

# Pipeline Test
#Production Ray
./../script/install_production_ray.sh
PIPELINE_RESULT=../data/pipeline_production.csv
NUM_STAGES=1
test -f "$PIPELINE_RESULT" && rm $PIPELINE_RESULT
echo "working_set_ratio, num_stages, object_store_size,object_size,baseline_pipeline,ray_pipeline" >>$PIPELINE_RESULT
for w in 1 2 4 8 16
do
for o in 1000000000 5000000000 10000000000 #((o=$OBJECT_STORE_SIZE; o<=$OBJECT_STORE_SIZE_MAX; o += $OBJECT_STORE_SIZE_INCREASE))
do
for ((os=10000000; os<=160000000; os *= 2))
do
echo -n -e "test_pipeline.py -w $w -o $o -os $os\n"
python test_pipeline.py -w $w -o $o -os $os -r $PIPELINE_RESULT -ns $NUM_STAGES
rm -rf /tmp/ray/session_2*
done
done
done
#Memory Scheduled Ray
./../script/install_scheduler_ray.sh
PIPELINE_RESULT=../data/pipeline_memory.csv
NUM_STAGES=1
test -f "$PIPELINE_RESULT" && rm $PIPELINE_RESULT
echo "working_set_ratio, num_stages, object_store_size,object_size,baseline_pipeline,ray_pipeline" >>$PIPELINE_RESULT
for w in 1 2 4 8 16
do
for o in 1000000000 5000000000 10000000000 #((o=$OBJECT_STORE_SIZE; o<=$OBJECT_STORE_SIZE_MAX; o += $OBJECT_STORE_SIZE_INCREASE))
do
for ((os=10000000; os<=160000000; os *= 2))
do
echo -n -e "test_pipeline.py -w $w -o $o -os $os\n"
python test_pipeline.py -w $w -o $o -os $os -r $PIPELINE_RESULT -ns $NUM_STAGES
rm -rf /tmp/ray/session_2*
done
done
done
47 changes: 47 additions & 0 deletions OSDI22/microbench/shuffle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import ray
import argparse
import numpy as np

####################
## Argument Parse ##
####################
parser = argparse.ArgumentParser()
parser.add_argument('--NUM_PARTITION', '-p', type=int, default = 4)
parser.add_argument('--NUM_LAYERS', '-l', type=int, default = 1)
parser.add_argument('--OBJECT_STORE_SIZE', '-o', type=int, default=1_000_000_000)
parser.add_argument('--OBJECT_SIZE', '-os', type=int, default=50_000_000)
args = parser.parse_args()
params = vars(args)

NUM_PARTITION = params['NUM_PARTITION']
NUM_LAYERS = params['NUM_LAYERS']
OBJECT_SIZE = params['OBJECT_SIZE']
OBJECT_STORE_SIZE = params['OBJECT_STORE_SIZE']

def test_ray_shuffle():
@ray.remote
def map(data, npartitions):
outputs = [list() for _ in range(NUM_PARTITION)]
for row in data:
outputs[int(row * NUM_PARTITION)].append(row)
return tuple(sorted(output) for output in outputs)

@ray.remote
def reduce(*partitions):
# Flatten and sort the partitions.
return sorted(row for partition in partitions for row in partition)

ray.init(object_store_memory=OBJECT_STORE_SIZE)
dataset = [np.random.rand(8) for _ in range(NUM_PARTITION)] # Random floats from the range [0, 1).
for partition in dataset:
for row in partition:
print(int(row*NUM_PARTITION), row)
quit()
map_outputs = [
map.options(num_returns=NUM_PARTITION).remote(partition, NUM_PARTITION)
for partition in dataset]
outputs = []
for i in range(NUM_PARTITION):
# Gather one output from each map task.
outputs.append(reduce.remote(*[partition[i] for partition in map_outputs]))
print(ray.get(outputs))
115 changes: 115 additions & 0 deletions OSDI22/microbench/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import ray
import time
import sys
import argparse
import csv
import numpy as np
from time import perf_counter
from time import perf_counter

####################
## Argument Parse ##
####################
parser = argparse.ArgumentParser()
parser.add_argument('--WORKING_SET_RATIO', '-w', type=int, default=4)
parser.add_argument('--OBJECT_STORE_SIZE', '-o', type=int, default=1_000_000_000)
parser.add_argument('--OBJECT_SIZE', '-os', type=int, default=10_000_000)
parser.add_argument('--RESULT_PATH', '-r', type=str, default="../data/pipeline.csv")
parser.add_argument('--NUM_STAGES', '-ns', type=int, default=1)
parser.add_argument('--NUM_TRIAL', '-t', type=int, default=10)
args = parser.parse_args()
params = vars(args)

OBJECT_STORE_SIZE = params['OBJECT_STORE_SIZE']
OBJECT_SIZE = params['OBJECT_SIZE']
WORKING_SET_RATIO = params['WORKING_SET_RATIO']
RESULT_PATH = params['RESULT_PATH']
NUM_STAGES = params['NUM_STAGES']
NUM_TRIAL = params['NUM_TRIAL']

def test_ray_pipeline():
ray_pipeline_begin = perf_counter()

@ray.remote(num_cpus=1)
def consumer(obj_ref):
#args = ray.get(obj_ref)
return True

@ray.remote(num_cpus=1)
def producer():
return np.zeros(OBJECT_SIZE // 8)

num_fill_object_store = OBJECT_STORE_SIZE//OBJECT_SIZE
produced_objs = [producer.remote() for _ in range(WORKING_SET_RATIO*num_fill_object_store)]
refs = [[] for _ in range(NUM_STAGES)]

for obj in produced_objs:
refs[0].append(consumer.remote(obj))
'''
for stage in range(1, NUM_STAGES):
for r in refs[stage-1]:
refs[stage].append(consumer.remote(r))
'''
del produced_objs
ray.get(refs[-1])
'''
for ref in refs:
for r in ref:
ray.get(r)
'''
ray_pipeline_end = perf_counter()

return ray_pipeline_end - ray_pipeline_begin

def test_baseline_pipeline():
baseline_start = perf_counter()

@ray.remote(num_cpus=1)
def consumer(obj_ref):
#args = ray.get(obj_ref)
return True

@ray.remote(num_cpus=1)
def producer():
return np.zeros(OBJECT_SIZE // 8)

num_fill_object_store = OBJECT_STORE_SIZE//OBJECT_SIZE
for i in range(WORKING_SET_RATIO):
produced_objs = [producer.remote() for _ in range(num_fill_object_store)]

refs = [[] for _ in range(NUM_STAGES)]
for obj in produced_objs:
refs[0].append(consumer.remote(obj))

del produced_objs
'''
for stage in range(1, NUM_STAGES):
for r in refs[stage-1]:
refs[stage].append(consumer.remote(r))
'''
ray.get(refs[-1])

baseline_end = perf_counter()
return baseline_end - baseline_start

ray.init(object_store_memory=OBJECT_STORE_SIZE)

#Warm up tasks
test_ray_pipeline()

ray_time = []
base_time = []
for i in range(NUM_TRIAL):
ray_time.append(test_ray_pipeline())
#base_time.append(test_baseline_pipeline())

#header = ['working_set_ratio', 'num_stages', 'object_store_size','object_size','baseline_pipeline','ray_pipeline']
'''
data = [WORKING_SET_RATIO, NUM_STAGES, OBJECT_STORE_SIZE, OBJECT_SIZE, sum(base_time)/NUM_TRIAL, sum(ray_time)/NUM_TRIAL]
with open(RESULT_PATH, 'a', encoding='UTF-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(data)
'''

#print(f"Baseline Pipieline time: {sum(base_time)/NUM_TRIAL}")
print(f"Ray Pipieline time: {sum(ray_time)/NUM_TRIAL}")
80 changes: 80 additions & 0 deletions OSDI22/microbench/test_pipeline_nested.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import ray
import time
import sys
import argparse
from time import perf_counter
from time import perf_counter

#Driver call all the tasks

####################
## Argument Parse ##
####################
parser = argparse.ArgumentParser()
parser.add_argument('--LIST_SIZE', '-l', type=int, default=1000)
args = parser.parse_args()
params = vars(args)

#LIST_SIZE = params['LIST_SIZE']
LIST_SIZE = 1000
OBJECT_STORE_SIZE = 1000_000_000 # 1GB


def test_ray_pipeline():
ray_pipeline_begin = perf_counter()

@ray.remote(num_cpus=1)
def consumer(args):
#args = ray.get(obj_ref)
time.sleep(5)
return sum(sum(arg) for arg in args)

@ray.remote(num_cpus=1) def producer(): args = [[1 for _ in range(LIST_SIZE)] for _ in range(LIST_SIZE)] obj_ref = ray.put(args)
result = consumer.remote(obj_ref)
return ray.get(result) == LIST_SIZE * LIST_SIZE

num_cpus = int(ray.cluster_resources()["CPU"])
refs = []
#refs = for [producer.remote() _ in range(num_cpus)]
for _ in range(num_cpus):
refs.append(producer.remote())

for ref in refs:
assert ray.get(ref)
ray_pipeline_end = perf_counter()

return ray_pipeline_end - ray_pipeline_begin

def test_baseline_pipeline():
baseline_start = perf_counter()
def consumer(*args):
time.sleep(5)
return sum(sum(arg) for arg in args)

@ray.remote(num_cpus=1)
def producer():
args = [[1 for _ in range(LIST_SIZE)] for _ in range(LIST_SIZE)]
result = consumer(*args)
return result == LIST_SIZE * LIST_SIZE

num_cpus = int(ray.cluster_resources()["CPU"])
refs = []

for _ in range(num_cpus):
refs.append(producer.remote())

for ref in refs:
assert ray.get(ref)

baseline_end = perf_counter()
return baseline_end - baseline_start

ray.init(address='auto', object_store_memory=OBJECT_STORE_SIZE)

baseline_time = test_baseline_pipeline()
ray_pipeline_time = test_ray_pipeline()

args = [[1 for _ in range(LIST_SIZE)] for _ in range(LIST_SIZE)]

print(f"Baseline Pipieline time: {baseline_time}")
print(f"Ray Pipieline time: {ray_pipeline_time} Memory Used:{sys.getsizeof(args)*int(ray.cluster_resources()['CPU'])*LIST_SIZE}")
4 changes: 3 additions & 1 deletion benchmarks/single_node/test_single_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
def assert_no_leaks():
total = ray.cluster_resources()
current = ray.available_resources()
print(total, current)
total.pop("memory")
total.pop("object_store_memory")
current.pop("memory")
Expand Down Expand Up @@ -137,7 +138,8 @@ def test_large_object():
assert big_obj[-1] == 0


ray.init(address="auto")
#ray.init(address="auto")
ray.init()

args_start = perf_counter()
test_many_args()
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
local_mode_ray_tuntime_.GetCurrentTaskId(), 0,
local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1,
required_resources, required_placement_resources,
std::make_pair(PlacementGroupID::Nil(), -1), true, "");
std::make_pair(PlacementGroupID::Nil(), -1), true, "", Priority());
if (invocation.task_type == TaskType::NORMAL_TASK) {
} else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) {
invocation.actor_id = local_mode_ray_tuntime_.GetNextActorID();
Expand Down
Loading