diff --git a/tpcx_bb/queries/load_test/tpcx_bb_load_test.py b/tpcx_bb/queries/load_test/tpcx_bb_load_test.py index 4651d6f0..c5a7acdf 100644 --- a/tpcx_bb/queries/load_test/tpcx_bb_load_test.py +++ b/tpcx_bb/queries/load_test/tpcx_bb_load_test.py @@ -3,9 +3,9 @@ import os, subprocess, math, time -config["data_dir"] = "/".join(config["data_dir"].rstrip("/").split("/")[:-1]) +config = tpcxbb_argparser() -spark_schema_dir = f"{os.getcwd()}/../../spark_table_schemas/" +spark_schema_dir = f"{os.getcwd()}/spark_table_schemas/" # these tables have extra data produced by bigbench dataGen refresh_tables = [ @@ -50,7 +50,7 @@ def read_csv_table(table, chunksize="256 MiB"): names, types = get_schema(table) dtype = {names[i]: types[i] for i in range(0, len(names))} - data_dir = config["data_dir"] + data_dir = config["data_dir"].split('parquet_')[0] base_path = f"{data_dir}/data/{table}" files = os.listdir(base_path) # item_marketprices has "audit" files that should be excluded @@ -100,7 +100,7 @@ def multiplier(unit): # we use size of the CSV data on disk to determine number of Parquet partitions def get_size_gb(table): - data_dir = config["data_dir"] + data_dir = config["data_dir"].split('parquet_')[0] path = data_dir + "/data/" + table size = subprocess.check_output(["du", "-sh", path]).split()[0].decode("utf-8") unit = size[-1] @@ -125,28 +125,14 @@ def repartition(table, outdir, npartitions=None, chunksize=None, compression="sn print( f"Converting {table} of {size} GB to {npartitions} parquet files, chunksize: {chunksize}" ) - # web_clickstreams is particularly memory intensive - # we sacrifice a bit of speed for stability, converting half at a time - if table in ["web_clickstreams"]: - df = read_csv_table(table, chunksize) - half = int(df.npartitions / 2) - df.partitions[0:half].repartition(npartitions=int(npartitions / 2)).to_parquet( - outdir + table, compression=compression - ) - print("Completed first half of web_clickstreams..") - df.partitions[half:].repartition(npartitions=int(npartitions / 2)).to_parquet( - outdir + table, compression=compression - ) - - else: - read_csv_table(table, chunksize).repartition( - npartitions=npartitions - ).to_parquet(outdir + table, compression=compression) + read_csv_table(table, chunksize).repartition( + npartitions=npartitions + ).to_parquet(outdir + table, compression=compression) def main(client, config): # location you want to write Parquet versions of the table data - data_dir = "/".join(config["data_dir"].split("/")[:-1]) + data_dir = config["data_dir"].split('parquet_')[0] outdir = f"{data_dir}/parquet_{part_size}gb/" total = 0