Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time zone config to set non-UTC [databricks] #9652

Merged
merged 9 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,14 @@ else
export PYSP_TEST_spark_jars="${ALL_JARS//:/,}"
fi

# time zone will be tested; use export TZ=time_zone_name before run this script
TZ=${TZ:-UTC}

# Set the Delta log cache size to prevent the driver from caching every Delta log indefinitely
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=UTC -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions='-ea -Duser.timezone=UTC'
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=$TZ -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions="-ea -Duser.timezone=$TZ"
export PYSP_TEST_spark_ui_showConsoleProgress='false'
export PYSP_TEST_spark_sql_session_timeZone='UTC'
export PYSP_TEST_spark_sql_session_timeZone=$TZ
export PYSP_TEST_spark_sql_shuffle_partitions='4'
# prevent cluster shape to change
export PYSP_TEST_spark_dynamicAllocation_enabled='false'
Expand Down
9 changes: 9 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def is_emr_runtime():
def is_dataproc_runtime():
return runtime_env() == "dataproc"

def get_test_tz():
Copy link
Collaborator

@pxLi pxLi Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: where would all these funcs be used? also pytest conf would rely on run_pyspark script seems weird

can you at least try os.environ.get('PYSP_TEST_spark_sql_session_timeZone', 'UTC') to make sure it could have a default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these funcs will be used in pytest xfail. For an existing test case before operator supports non-utc: add xfail(is_non_utc()).

also pytest conf would relying on run_pyspark script seems weird.

I tested we can get this config in the conftest.
conftest is internal file of IT, so it's safe to get Env variable from run_pyspark.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conftest is internal file of IT, so it's safe to get Env variable from run_pyspark.

hmm OK, for me pytest code itself should at least provide the defaults here

I would like to hear more feedback from other developers~

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If does not set it, then an error throws:

>>> os.environ["PYSP_TEST_spark_sql_session_timeZone"]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<frozen os>", line 679, in __getitem__
KeyError: 'PYSP_TEST_spark_sql_session_timeZone'

This error will force us to set this cfg.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just threw the code together so a get with a default looks like a great addition for safety/robustness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can not get the TZ env variable in conftest.py.

Can you elaborate on this? I'm confused how we can export one variable and read it in conftest but somehow can't do the same to another. Is something in the shell startup environment bashing the TZ variable?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@res-life wrapping TZ with PYSP_TEST_spark_sql_session_timeZone wouldn't be same as original way we did in is_tz_utc?

Java systemDefault will respect TZ environment.

import java.time.ZoneId;

public class Test{
   public static void main(String [] args) {
     System.out.println("time zone is " + ZoneId.systemDefault());
   }

}
$ export TZ="UTC" && java Test
time zone is UTC
$ export TZ="Asia/Shanghai" && java Test
time zone is Asia/Shanghai

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline discussed with @res-life , let's add comments that we need to get utc time before spark session starts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not use the following code when the Spark session is not started.

jvm = spark.sparkContext._jvm
    utc = jvm.java.time.ZoneId.of('UTC').normalized()
    sys_tz = jvm.java.time.ZoneId.systemDefault().normalized()

Copy link
Collaborator Author

@res-life res-life Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can not get the TZ env variable in conftest.py.

Because TZ is rewritten to UTC in data_gen.py

os.environ['TZ'] = 'UTC'
time.tzset()

Removed the above code. See my last commit:
I use Env variable TZ now.
And I updated TimestampGen to avoid generate out of range timestamp

return os.environ.get('TZ', 'UTC')

def is_utc():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize is_tz_utc existed. I think we should probably only keep one way of checking if the timezone is UTC or not. is_tz_utc has the problem that we need a spark session to make it work. That is fine, but it also makes it difficult to use it to skip a test unless it happens after the test starts to run. This patch removes all uses of is_tz_utc so perhaps we should also delete the implementation too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #9482

  • default value with PYSP_TEST_spark_sql_session_timeZone
  • Remove useless is_tz_utc

return get_test_tz() == "UTC"

def is_not_utc():
return not is_utc()

_is_nightly_run = False
_is_precommit_run = False

Expand Down
33 changes: 11 additions & 22 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pyspark.sql.types import *
import pyspark.sql.functions as f
import random
from spark_session import is_tz_utc, is_before_spark_340, with_cpu_session
from spark_session import is_before_spark_340, with_cpu_session
import sre_yield
import struct
from conftest import skip_unless_precommit_tests,get_datagen_seed
Expand All @@ -30,11 +30,6 @@
from functools import lru_cache
import hashlib

# set time zone to UTC for timestamp test cases to avoid `datetime` out-of-range error:
# refer to: https://github.com/NVIDIA/spark-rapids/issues/7535
os.environ['TZ'] = 'UTC'
time.tzset()

class DataGen:
"""Base class for data generation"""

Expand Down Expand Up @@ -584,12 +579,18 @@ def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc):
elif not isinstance(start, datetime):
raise RuntimeError('Unsupported type passed in for start {}'.format(start))

# Spark supports time through: "9999-12-31 23:59:59.999999"
# but in order to avoid out-of-range error in non-UTC time zone, here use 9999-12-30 instead of 12-31 as max end
# for details, refer to https://github.com/NVIDIA/spark-rapids/issues/7535
max_end = datetime(9999, 12, 30, 23, 59, 59, 999999, tzinfo=tzinfo)
if end is None:
# Spark supports time through
# "9999-12-31 23:59:59.999999"
end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=tzinfo)
end = max_end
elif isinstance(end, timedelta):
end = start + end
max_timedelta = max_end - start
if ( end >= max_timedelta):
end = max_end
else:
end = start + end
elif not isinstance(start, date):
raise RuntimeError('Unsupported type passed in for end {}'.format(end))

Expand Down Expand Up @@ -749,10 +750,6 @@ def gen_bytes():
return bytes([ rand.randint(0, 255) for _ in range(length) ])
self._start(rand, gen_bytes)

def skip_if_not_utc():
if (not is_tz_utc()):
skip_unless_precommit_tests('The java system time zone is not set to UTC')

# Note: Current(2023/06/06) maxmium IT data size is 7282688 bytes, so LRU cache with maxsize 128
# will lead to 7282688 * 128 = 932 MB additional memory usage in edge case, which is acceptable.
@lru_cache(maxsize=128, typed=True)
Expand All @@ -776,10 +773,6 @@ def gen_df(spark, data_gen, length=2048, seed=None, num_slices=None):
# we cannot create a data frame from a nullable struct
assert not data_gen.nullable

# Before we get too far we need to verify that we can run with timestamps
if src.contains_ts():
skip_if_not_utc()

data = gen_df_help(src, length, seed_value)

# We use `numSlices` to create an RDD with the specific number of partitions,
Expand Down Expand Up @@ -832,10 +825,6 @@ def _gen_scalars_common(data_gen, count, seed=None):
else:
seed_value = seed

# Before we get too far we need to verify that we can run with timestamps
if src.contains_ts():
skip_if_not_utc()

rand = random.Random(seed_value)
src.start(rand)
return src
Expand Down
10 changes: 0 additions & 10 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ def _from_scala_map(scala_map):
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
}

def is_tz_utc(spark=_spark):
"""
true if the tz is UTC else false
"""
# Now we have to do some kind of ugly internal java stuff
jvm = spark.sparkContext._jvm
utc = jvm.java.time.ZoneId.of('UTC').normalized()
sys_tz = jvm.java.time.ZoneId.systemDefault().normalized()
return utc == sys_tz

def _set_all_confs(conf):
newconf = _default_conf.copy()
if (should_inject_oom()):
Expand Down