Skip to content

Commit

Permalink
Add parallel processing for image resizing
Browse files Browse the repository at this point in the history
  • Loading branch information
Luodian committed Dec 10, 2023
1 parent f324cb7 commit 1e09ecf
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions pipeline/utils/convert_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import argparse
import orjson
import dask.dataframe as dd
from concurrent.futures import ThreadPoolExecutor, as_completed

def process_images(base64_str, resize_res=-1):
import base64
Expand Down Expand Up @@ -44,21 +45,40 @@ def convert_json_to_parquet(input_path, output_path, max_partition_size):
data = f.read()
data_dict = orjson.loads(data)

# Estimate the size of the JSON dictionary in bytes
total_size = len(data)
print(f"Total size of the JSON data: {total_size} bytes")

# Calculate the number of partitions needed
nparitions = max(1, total_size // max_partition_size)
nparitions = int(max(1, total_size // max_partition_size))
print(f"Number of partitions: {nparitions}")

resized_data_dict = {}
dropped_keys = []
for key, value in tqdm(data_dict.items(), desc=f"Processing {input_path}"):

# Initialize the progress bar
progress_bar = tqdm(total=len(data_dict), unit="item", desc="Processing items")

# Define a function to process a single item and update the progress bar
def process_item(key, value):
if isinstance(value, list):
value = value[0]
resized_base64 = process_images(value)
resized_data_dict[key] = resized_base64
progress_bar.update(1) # Update the progress bar here
return key, resized_base64

with ThreadPoolExecutor(max_workers=256) as executor:
future_to_key = {executor.submit(process_item, key, value): key for key, value in data_dict.items()}

for future in as_completed(future_to_key):
key = future_to_key[future]
try:
resized_data_dict[key] = future.result()
except Exception as e:
print(f"Warning: Failed to process key {key}. Error: {e}")
dropped_keys.append(key)
progress_bar.update(1) # Update the progress bar for failed items as well

# Close the progress bar after all tasks are done
progress_bar.close()

ddf = dd.from_pandas(pd.DataFrame.from_dict(resized_data_dict, orient="index", columns=["base64"]), npartitions=nparitions)
ddf.to_parquet(output_path, engine="pyarrow")
Expand Down

0 comments on commit 1e09ecf

Please sign in to comment.