From 17343d5f242f28cf3aecdd18472a8a76237aad3d Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 5 Jan 2023 10:43:01 -0600 Subject: [PATCH] Sort Delta log objects when comparing and avoid caching all logs (#7456) Signed-off-by: Jason Lowe Signed-off-by: Jason Lowe --- integration_tests/run_pyspark_from_build.sh | 5 +++-- integration_tests/src/main/python/delta_lake_write_test.py | 6 ++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index 7b0abd06af0..e61d682518c 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -220,7 +220,8 @@ else export PYSP_TEST_spark_jars="${ALL_JARS//:/,}" fi - export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=UTC $COVERAGE_SUBMIT_FLAGS" + # Set the Delta log cache size to prevent the driver from caching every Delta log indefinitely + export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=UTC -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS" export PYSP_TEST_spark_executor_extraJavaOptions='-ea -Duser.timezone=UTC' export PYSP_TEST_spark_ui_showConsoleProgress='false' export PYSP_TEST_spark_sql_session_timeZone='UTC' diff --git a/integration_tests/src/main/python/delta_lake_write_test.py b/integration_tests/src/main/python/delta_lake_write_test.py index cae4df1245a..094dded6872 100644 --- a/integration_tests/src/main/python/delta_lake_write_test.py +++ b/integration_tests/src/main/python/delta_lake_write_test.py @@ -90,6 +90,12 @@ def decode_jsons(json_data): # Skip whitespace between records while idx < len(json_data) and json_data[idx].isspace(): idx += 1 + # reorder to produce a consistent output for comparison + def json_to_sort_key(j): + keys = sorted(j.keys()) + paths = sorted([ v.get("path", "") for v in j.values() ]) + return ','.join(keys + paths) + jsons.sort(key=json_to_sort_key) return jsons def assert_gpu_and_cpu_delta_logs_equivalent(spark, data_path):