Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to control number of nodes & partitions #411

Merged
merged 5 commits into from
Jul 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions tools/emr/submit_datagen_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@
import __main__

from math import ceil
from botocore.credentials import subprocess
from datagen import lib, util
import subprocess

import argparse

from datagen.util import KeyValue, split_passthrough_args

min_num_workers = 1
max_num_workers = 1000
min_num_threads = 1

defaults = {
'bucket': 'ldbc-snb-datagen-store',
'use_spot': True,
'master_instance_type': 'r6gd.2xlarge',
'instance_type': 'r6gd.4xlarge',
'sf_ratio': 100.0, # ratio of SFs and machines. a ratio of 250.0 for SF1000 yields 4 machines
'instance_type': 'i3.4xlarge',
'sf_per_executor': 3e3,
'sf_per_partition': 10,
'az': 'us-west-2c',
'yes': False,
'ec2_key': None,
Expand All @@ -43,15 +43,6 @@
ec2_instances = [dict(row) for row in reader]


def calculate_cluster_config(scale_factor, sf_ratio, vcpu):
num_workers = max(min_num_workers, min(max_num_workers, ceil(scale_factor / sf_ratio)))
num_threads = ceil(num_workers * vcpu * 2)
return {
'num_workers': num_workers,
'num_threads': num_threads
}


def get_instance_info(instance_type):
def parse_vcpu(col):
return int(re.search(r'(\d+) .*', col).group(1))
Expand All @@ -76,7 +67,10 @@ def submit_datagen_job(name,
jar,
use_spot,
instance_type,
sf_ratio,
executors,
sf_per_executor,
partitions,
sf_per_partition,
master_instance_type,
az,
emr_release,
Expand All @@ -97,10 +91,6 @@ def submit_datagen_job(name,
else:
copy_filter = f'.*{build_dir}/{copy_filter}'

exec_info = get_instance_info(instance_type)

cluster_config = calculate_cluster_config(sf, sf_ratio, exec_info['vcpu'])

emr = boto3.client('emr')

ts = datetime.utcnow()
Expand All @@ -115,8 +105,15 @@ def submit_datagen_job(name,
'maximizeResourceAllocation': 'true'
}

if executors is None:
executors = max(min_num_workers, min(max_num_workers, ceil(sf / sf_per_executor)))

if partitions is None:
partitions = max(min_num_threads, ceil(sf / sf_per_partition))

spark_defaults_config = {
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
'spark.default.parallelism': str(partitions),
**(dict(conf) if conf else {})
}

Expand Down Expand Up @@ -157,7 +154,7 @@ def submit_datagen_job(name,
'Market': market,
'InstanceRole': 'CORE',
'InstanceType': instance_type,
'InstanceCount': cluster_config['num_workers'],
'InstanceCount': executors,
}
],
**ec2_key_dict,
Expand All @@ -178,7 +175,7 @@ def submit_datagen_job(name,
'Args': ['spark-submit', '--class', lib.main_class, jar_url,
'--output-dir', build_dir,
'--scale-factor', str(sf),
'--num-threads', str(cluster_config['num_threads']),
'--num-threads', str(partitions),
'--mode', mode,
'--format', format,
*passthrough_args
Expand Down Expand Up @@ -263,6 +260,26 @@ def submit_datagen_job(name,
nargs='+',
action=KeyValue,
help="SparkConf as key=value pairs")
executor_args=parser.add_mutually_exclusive_group()
executor_args.add_argument("--executors",
type=int,
help=f"Total number of Spark executors."
)
executor_args.add_argument("--sf-per-executor",
type=float,
default=defaults['sf_per_executor'],
help=f"Number of scale factors per Spark executor. Default: {defaults['sf_per_executor']}"
)
partitioning_args = parser.add_mutually_exclusive_group()
partitioning_args.add_argument("--partitions",
type=int,
help=f"Total number of Spark partitions to use when generating the dataset."
)
partitioning_args.add_argument("--sf-per-partition",
type=float,
default=defaults['sf_per_partition'],
help=f"Number of scale factors per Spark partitions. Default: {defaults['sf_per_partition']}"
)

parser.add_argument('--', nargs='*', help='Arguments passed to LDBC SNB Datagen', dest="arg")

Expand All @@ -271,6 +288,5 @@ def submit_datagen_job(name,
args = parser.parse_args(self_args)

submit_datagen_job(passthrough_args=passthrough_args,
sf_ratio=defaults['sf_ratio'],
master_instance_type=defaults['master_instance_type'],
**args.__dict__)