From 622d1bcff4b10c4047aa41356d5df641b52522ae Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Wed, 28 Oct 2020 09:56:51 -0700 Subject: [PATCH 1/2] updated load test --- .../queries/load_test/tpcx_bb_load_test.py | 32 ++++++------------- 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/tpcx_bb/queries/load_test/tpcx_bb_load_test.py b/tpcx_bb/queries/load_test/tpcx_bb_load_test.py index 4651d6f0..27f2c766 100644 --- a/tpcx_bb/queries/load_test/tpcx_bb_load_test.py +++ b/tpcx_bb/queries/load_test/tpcx_bb_load_test.py @@ -3,9 +3,9 @@ import os, subprocess, math, time -config["data_dir"] = "/".join(config["data_dir"].rstrip("/").split("/")[:-1]) +config = tpcxbb_argparser() -spark_schema_dir = f"{os.getcwd()}/../../spark_table_schemas/" +spark_schema_dir = f"{os.getcwd()}/spark_table_schemas/" # these tables have extra data produced by bigbench dataGen refresh_tables = [ @@ -50,7 +50,7 @@ def read_csv_table(table, chunksize="256 MiB"): names, types = get_schema(table) dtype = {names[i]: types[i] for i in range(0, len(names))} - data_dir = config["data_dir"] + data_dir = config["data_dir"].split('parquet_')[0] base_path = f"{data_dir}/data/{table}" files = os.listdir(base_path) # item_marketprices has "audit" files that should be excluded @@ -100,7 +100,7 @@ def multiplier(unit): # we use size of the CSV data on disk to determine number of Parquet partitions def get_size_gb(table): - data_dir = config["data_dir"] + data_dir = config["data_dir"].split('parquet_')[0] path = data_dir + "/data/" + table size = subprocess.check_output(["du", "-sh", path]).split()[0].decode("utf-8") unit = size[-1] @@ -125,29 +125,15 @@ def repartition(table, outdir, npartitions=None, chunksize=None, compression="sn print( f"Converting {table} of {size} GB to {npartitions} parquet files, chunksize: {chunksize}" ) - # web_clickstreams is particularly memory intensive - # we sacrifice a bit of speed for stability, converting half at a time - if table in ["web_clickstreams"]: - df = read_csv_table(table, chunksize) - half = int(df.npartitions / 2) - df.partitions[0:half].repartition(npartitions=int(npartitions / 2)).to_parquet( - outdir + table, compression=compression - ) - print("Completed first half of web_clickstreams..") - df.partitions[half:].repartition(npartitions=int(npartitions / 2)).to_parquet( - outdir + table, compression=compression - ) - - else: - read_csv_table(table, chunksize).repartition( - npartitions=npartitions - ).to_parquet(outdir + table, compression=compression) + read_csv_table(table, chunksize).repartition( + npartitions=npartitions + ).to_parquet(outdir + table, compression=compression) def main(client, config): # location you want to write Parquet versions of the table data - data_dir = "/".join(config["data_dir"].split("/")[:-1]) - outdir = f"{data_dir}/parquet_{part_size}gb/" + data_dir = config["data_dir"].split('parquet_')[0] + outdir = f"{data_dir}/parquet_{part_size}gb_test/" total = 0 for table in tables: From e988e977bc7b418f9c0761e5ff732747906c3498 Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Wed, 28 Oct 2020 21:23:17 +0000 Subject: [PATCH 2/2] fix trailing _test in load test --- tpcx_bb/queries/load_test/tpcx_bb_load_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tpcx_bb/queries/load_test/tpcx_bb_load_test.py b/tpcx_bb/queries/load_test/tpcx_bb_load_test.py index 27f2c766..c5a7acdf 100644 --- a/tpcx_bb/queries/load_test/tpcx_bb_load_test.py +++ b/tpcx_bb/queries/load_test/tpcx_bb_load_test.py @@ -133,7 +133,7 @@ def repartition(table, outdir, npartitions=None, chunksize=None, compression="sn def main(client, config): # location you want to write Parquet versions of the table data data_dir = config["data_dir"].split('parquet_')[0] - outdir = f"{data_dir}/parquet_{part_size}gb_test/" + outdir = f"{data_dir}/parquet_{part_size}gb/" total = 0 for table in tables: