Skip to content

Commit

Permalink
Merge pull request #9060 from ministryofjustice/ELM-3031_generate_par…
Browse files Browse the repository at this point in the history
…titioned_rowhash_4

long running gluejob in prod - fix - 1212 - 1
  • Loading branch information
madhu-k-sr2 authored Dec 12, 2024
2 parents c14265a + 1bc9c83 commit 20de261
Showing 1 changed file with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,30 +384,32 @@ def write_rds_df_to_s3_parquet(df_rds_write: DataFrame,
coalesce_int = int(args.get('coalesce_int', 0))
if coalesce_int != 0:
LOGGER.warn(f"""WARNING ! >> Given coalesce_int = {coalesce_int}""")
rds_hashed_rows_df_write = rds_hashed_rows_df.coalesce(coalesce_int).cache()
rds_hashed_rows_df_write = rds_hashed_rows_df.coalesce(coalesce_int)
else:
rds_hashed_rows_df_write = rds_hashed_rows_df.alias("rds_hashed_rows_df_write").cache()
rds_hashed_rows_df_write = rds_hashed_rows_df.alias("rds_hashed_rows_df_write")
# ----------------------------------------------------------

unique_partitions_df = rds_hashed_rows_df_write\
.select(*yyyy_mm_partition_by_cols)\
.distinct()\
.orderBy(yyyy_mm_partition_by_cols, ascending=True)
# rds_hashed_rows_df_write = rds_hashed_rows_df_write.cache()
# unique_partitions_df = rds_hashed_rows_df_write\
# .select(*yyyy_mm_partition_by_cols)\
# .distinct()\
# .orderBy(yyyy_mm_partition_by_cols, ascending=True)

for row in unique_partitions_df.toLocalIterator():
LOGGER.info(f"""year: {row[yyyy_mm_partition_by_cols[0]]},
month: {row[yyyy_mm_partition_by_cols[1]]}""")
# for row in unique_partitions_df.toLocalIterator():
# LOGGER.info(f"""year: {row[yyyy_mm_partition_by_cols[0]]},
# month: {row[yyyy_mm_partition_by_cols[1]]}""")

# write_rds_df_to_s3_parquet_v2(rds_hashed_rows_df_write,
# yyyy_mm_partition_by_cols,
# prq_table_folder_path)

LOGGER.info(f"""write_rds_df_to_s3_parquet() - function called.""")
write_rds_df_to_s3_parquet(rds_hashed_rows_df_write,
yyyy_mm_partition_by_cols,
prq_table_folder_path)

LOGGER.info(f"""'{prq_table_folder_path}' writing completed.""")
rds_hashed_rows_df_write.unpersist()
# rds_hashed_rows_df_write.unpersist()


total_files, total_size = S3Methods.get_s3_folder_info(HASHED_OUTPUT_S3_BUCKET_NAME,
Expand Down

0 comments on commit 20de261

Please sign in to comment.