Skip to content

Commit

Permalink
Merge pull request #124 from beckernick/bugfix/update-load-test
Browse files Browse the repository at this point in the history
[REVIEW] Fix logic and update load test script
  • Loading branch information
beckernick authored Oct 28, 2020
2 parents ec699f8 + e988e97 commit 7068944
Showing 1 changed file with 8 additions and 22 deletions.
30 changes: 8 additions & 22 deletions tpcx_bb/queries/load_test/tpcx_bb_load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import os, subprocess, math, time


config["data_dir"] = "/".join(config["data_dir"].rstrip("/").split("/")[:-1])
config = tpcxbb_argparser()

spark_schema_dir = f"{os.getcwd()}/../../spark_table_schemas/"
spark_schema_dir = f"{os.getcwd()}/spark_table_schemas/"

# these tables have extra data produced by bigbench dataGen
refresh_tables = [
Expand Down Expand Up @@ -50,7 +50,7 @@ def read_csv_table(table, chunksize="256 MiB"):
names, types = get_schema(table)
dtype = {names[i]: types[i] for i in range(0, len(names))}

data_dir = config["data_dir"]
data_dir = config["data_dir"].split('parquet_')[0]
base_path = f"{data_dir}/data/{table}"
files = os.listdir(base_path)
# item_marketprices has "audit" files that should be excluded
Expand Down Expand Up @@ -100,7 +100,7 @@ def multiplier(unit):

# we use size of the CSV data on disk to determine number of Parquet partitions
def get_size_gb(table):
data_dir = config["data_dir"]
data_dir = config["data_dir"].split('parquet_')[0]
path = data_dir + "/data/" + table
size = subprocess.check_output(["du", "-sh", path]).split()[0].decode("utf-8")
unit = size[-1]
Expand All @@ -125,28 +125,14 @@ def repartition(table, outdir, npartitions=None, chunksize=None, compression="sn
print(
f"Converting {table} of {size} GB to {npartitions} parquet files, chunksize: {chunksize}"
)
# web_clickstreams is particularly memory intensive
# we sacrifice a bit of speed for stability, converting half at a time
if table in ["web_clickstreams"]:
df = read_csv_table(table, chunksize)
half = int(df.npartitions / 2)
df.partitions[0:half].repartition(npartitions=int(npartitions / 2)).to_parquet(
outdir + table, compression=compression
)
print("Completed first half of web_clickstreams..")
df.partitions[half:].repartition(npartitions=int(npartitions / 2)).to_parquet(
outdir + table, compression=compression
)

else:
read_csv_table(table, chunksize).repartition(
npartitions=npartitions
).to_parquet(outdir + table, compression=compression)
read_csv_table(table, chunksize).repartition(
npartitions=npartitions
).to_parquet(outdir + table, compression=compression)


def main(client, config):
# location you want to write Parquet versions of the table data
data_dir = "/".join(config["data_dir"].split("/")[:-1])
data_dir = config["data_dir"].split('parquet_')[0]
outdir = f"{data_dir}/parquet_{part_size}gb/"

total = 0
Expand Down

0 comments on commit 7068944

Please sign in to comment.