From 0b112edf930fd4dd7ddcb2a4c0c32390eaf02171 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 7 Nov 2023 10:16:14 +0800 Subject: [PATCH 1/8] Add time zone config to set non-UTC Signed-off-by: Chong Gao --- integration_tests/run_pyspark_from_build.sh | 9 ++++++--- integration_tests/src/main/python/conftest.py | 9 +++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index 63331d6cab3..3ee7f0b615c 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -223,11 +223,14 @@ else export PYSP_TEST_spark_jars="${ALL_JARS//:/,}" fi + # time zone will be tested + TEST_TZ=${TEST_TZ:-UTC} + # Set the Delta log cache size to prevent the driver from caching every Delta log indefinitely - export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=UTC -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS" - export PYSP_TEST_spark_executor_extraJavaOptions='-ea -Duser.timezone=UTC' + export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=$TEST_TZ -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS" + export PYSP_TEST_spark_executor_extraJavaOptions="-ea -Duser.timezone=$TEST_TZ" export PYSP_TEST_spark_ui_showConsoleProgress='false' - export PYSP_TEST_spark_sql_session_timeZone='UTC' + export PYSP_TEST_spark_sql_session_timeZone=$TEST_TZ export PYSP_TEST_spark_sql_shuffle_partitions='4' # prevent cluster shape to change export PYSP_TEST_spark_dynamicAllocation_enabled='false' diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py index 2ce21505686..e06233465ea 100644 --- a/integration_tests/src/main/python/conftest.py +++ b/integration_tests/src/main/python/conftest.py @@ -77,6 +77,15 @@ def is_emr_runtime(): def is_dataproc_runtime(): return runtime_env() == "dataproc" +def get_test_tz(): + return os.environ["PYSP_TEST_spark_sql_session_timeZone"] + +def is_utc(): + return get_test_tz() == "UTC" + +def is_not_utc(): + return not is_utc() + _is_nightly_run = False _is_precommit_run = False From cbba2e307547973e78ad14c926b3924af3726a4e Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 7 Nov 2023 15:52:42 +0800 Subject: [PATCH 2/8] Remove the skip logic when time zone is not UTC --- integration_tests/src/main/python/data_gen.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index fdfc595bf4e..5d0e8b8920c 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -21,10 +21,9 @@ from pyspark.sql.types import * import pyspark.sql.functions as f import random -from spark_session import is_tz_utc, is_before_spark_340, with_cpu_session +from spark_session import is_before_spark_340, with_cpu_session import sre_yield import struct -from conftest import skip_unless_precommit_tests import time import os from functools import lru_cache @@ -749,10 +748,6 @@ def gen_bytes(): return bytes([ rand.randint(0, 255) for _ in range(length) ]) self._start(rand, gen_bytes) -def skip_if_not_utc(): - if (not is_tz_utc()): - skip_unless_precommit_tests('The java system time zone is not set to UTC') - # Note: Current(2023/06/06) maxmium IT data size is 7282688 bytes, so LRU cache with maxsize 128 # will lead to 7282688 * 128 = 932 MB additional memory usage in edge case, which is acceptable. @lru_cache(maxsize=128, typed=True) @@ -771,10 +766,6 @@ def gen_df(spark, data_gen, length=2048, seed=0, num_slices=None): # we cannot create a data frame from a nullable struct assert not data_gen.nullable - # Before we get too far we need to verify that we can run with timestamps - if src.contains_ts(): - skip_if_not_utc() - data = gen_df_help(src, length, seed) # We use `numSlices` to create an RDD with the specific number of partitions, From ee930f3a123c8d4bf42ba1c6e30cb2c32370dff0 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 10 Nov 2023 09:48:38 +0800 Subject: [PATCH 3/8] Add default value --- integration_tests/src/main/python/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py index e06233465ea..0f974a480ad 100644 --- a/integration_tests/src/main/python/conftest.py +++ b/integration_tests/src/main/python/conftest.py @@ -78,7 +78,7 @@ def is_dataproc_runtime(): return runtime_env() == "dataproc" def get_test_tz(): - return os.environ["PYSP_TEST_spark_sql_session_timeZone"] + os.environ.get('PYSP_TEST_spark_sql_session_timeZone', 'UTC') def is_utc(): return get_test_tz() == "UTC" From f1267de3e9fca1fbb74ccc9e70667d8da5196b82 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 10 Nov 2023 09:55:38 +0800 Subject: [PATCH 4/8] Remove useless is_tz_utc --- integration_tests/src/main/python/spark_session.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index df6f1329471..116b30d3b87 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -55,16 +55,6 @@ def _from_scala_map(scala_map): 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true', } -def is_tz_utc(spark=_spark): - """ - true if the tz is UTC else false - """ - # Now we have to do some kind of ugly internal java stuff - jvm = spark.sparkContext._jvm - utc = jvm.java.time.ZoneId.of('UTC').normalized() - sys_tz = jvm.java.time.ZoneId.systemDefault().normalized() - return utc == sys_tz - def _set_all_confs(conf): newconf = _default_conf.copy() if (should_inject_oom()): From d7bb5e6eada4a402e4dda96ff14ecd52ef3a22ca Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 14 Nov 2023 17:26:27 +0800 Subject: [PATCH 5/8] Fix bug --- integration_tests/src/main/python/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py index 0f974a480ad..5157edf1940 100644 --- a/integration_tests/src/main/python/conftest.py +++ b/integration_tests/src/main/python/conftest.py @@ -78,7 +78,7 @@ def is_dataproc_runtime(): return runtime_env() == "dataproc" def get_test_tz(): - os.environ.get('PYSP_TEST_spark_sql_session_timeZone', 'UTC') + return os.environ.get('PYSP_TEST_spark_sql_session_timeZone', 'UTC') def is_utc(): return get_test_tz() == "UTC" From 28591d41ec75a711626adae85cdf97518e7643c0 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 15 Nov 2023 14:31:55 +0800 Subject: [PATCH 6/8] Fix bug: remove skip code for non-UTC time zone --- integration_tests/src/main/python/data_gen.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 5d0e8b8920c..8ace35b4eb6 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -813,10 +813,6 @@ def _gen_scalars_common(data_gen, count, seed=0): else: src = data_gen - # Before we get too far we need to verify that we can run with timestamps - if src.contains_ts(): - skip_if_not_utc() - rand = random.Random(seed) src.start(rand) return src From 4c8fa96957ef49cf90fe0e30f998e22fc5de20ca Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Thu, 16 Nov 2023 06:04:24 +0000 Subject: [PATCH 7/8] Use TZ insteand of TEST_TZ to set time zone --- integration_tests/run_pyspark_from_build.sh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index f745aec9892..b5cd2758089 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -223,14 +223,14 @@ else export PYSP_TEST_spark_jars="${ALL_JARS//:/,}" fi - # time zone will be tested - TEST_TZ=${TEST_TZ:-UTC} + # time zone will be tested; use export TZ=time_zone_name before run this script + TZ=${TZ:-UTC} # Set the Delta log cache size to prevent the driver from caching every Delta log indefinitely - export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=$TEST_TZ -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS" - export PYSP_TEST_spark_executor_extraJavaOptions="-ea -Duser.timezone=$TEST_TZ" + export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=$TZ -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS" + export PYSP_TEST_spark_executor_extraJavaOptions="-ea -Duser.timezone=$TZ" export PYSP_TEST_spark_ui_showConsoleProgress='false' - export PYSP_TEST_spark_sql_session_timeZone=$TEST_TZ + export PYSP_TEST_spark_sql_session_timeZone=$TZ export PYSP_TEST_spark_sql_shuffle_partitions='4' # prevent cluster shape to change export PYSP_TEST_spark_dynamicAllocation_enabled='false' From a449e955282265ebc0286c5fd59497a1b3bb0517 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 17 Nov 2023 05:36:52 +0000 Subject: [PATCH 8/8] Remove the TZ overwrite logic from date_gen.py --- integration_tests/src/main/python/conftest.py | 2 +- integration_tests/src/main/python/data_gen.py | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py index 23d86063795..cf21c405899 100644 --- a/integration_tests/src/main/python/conftest.py +++ b/integration_tests/src/main/python/conftest.py @@ -78,7 +78,7 @@ def is_dataproc_runtime(): return runtime_env() == "dataproc" def get_test_tz(): - return os.environ.get('PYSP_TEST_spark_sql_session_timeZone', 'UTC') + return os.environ.get('TZ', 'UTC') def is_utc(): return get_test_tz() == "UTC" diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 6ebb6808641..4b7ba5f4b45 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -30,11 +30,6 @@ from functools import lru_cache import hashlib -# set time zone to UTC for timestamp test cases to avoid `datetime` out-of-range error: -# refer to: https://github.com/NVIDIA/spark-rapids/issues/7535 -os.environ['TZ'] = 'UTC' -time.tzset() - class DataGen: """Base class for data generation""" @@ -584,12 +579,18 @@ def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc): elif not isinstance(start, datetime): raise RuntimeError('Unsupported type passed in for start {}'.format(start)) + # Spark supports time through: "9999-12-31 23:59:59.999999" + # but in order to avoid out-of-range error in non-UTC time zone, here use 9999-12-30 instead of 12-31 as max end + # for details, refer to https://github.com/NVIDIA/spark-rapids/issues/7535 + max_end = datetime(9999, 12, 30, 23, 59, 59, 999999, tzinfo=tzinfo) if end is None: - # Spark supports time through - # "9999-12-31 23:59:59.999999" - end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=tzinfo) + end = max_end elif isinstance(end, timedelta): - end = start + end + max_timedelta = max_end - start + if ( end >= max_timedelta): + end = max_end + else: + end = start + end elif not isinstance(start, date): raise RuntimeError('Unsupported type passed in for end {}'.format(end))