Skip to content

Commit

Permalink
clean-up and some test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jrose committed Dec 12, 2024
1 parent dbb04ae commit 72c910b
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 72 deletions.
1 change: 0 additions & 1 deletion docs/source/snowpark/session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ Snowpark Session
Session.lineage
Session.read
Session.sproc
Session.sql_simplifier_enabled
Session.telemetry_enabled
Session.udaf
Session.udf
Expand Down
4 changes: 1 addition & 3 deletions src/snowflake/snowpark/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __post_init__(self):
class SessionParameter(Setting):
session: Session = field(default=None)
parameter_name: str = field(default=None)
synchronize: bool = field(default=True)
synchronize: bool = field(default=False)
telemetry_hook: Callable = field(default=None)

def __post_init__(self):
Expand Down Expand Up @@ -185,5 +185,3 @@ def __setitem__(self, instance, value):


GLOBAL_SETTINGS = SettingStore([])

GLOBAL2 = SettingStore([Setting("bar", default=False)], extend_from=GLOBAL_SETTINGS)
74 changes: 33 additions & 41 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,6 @@ def _initialize_config(
parameter_name="PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS",
session=self,
default=True,
synchronize=False,
),
SessionParameter(
"sql_simplifier_enabled",
Expand All @@ -566,38 +565,37 @@ def _initialize_config(
telemetry_hook=self._conn._telemetry_client.send_sql_simplifier_telemetry,
session=self,
default=True,
synchronize=True,
),
SessionParameter(
"_use_logical_type_for_create_df",
parameter_name="PYTHON_SNOWPARK_USE_LOGICAL_TYPE_FOR_CREATE_DATAFRAME",
session=self,
default=True,
synchronize=False,
),
SessionParameter(
"_query_compilation_stage_enabled",
dedent(
"""
parameter used to turn off the whole new query compilation stage in one shot. If turned
off, the plan won't go through the extra optimization and query generation steps.
"""
parameter used to turn off the whole new query compilation stage in one shot. If turned
off, the plan won't go through the extra optimization and query generation steps.
"""
),
parameter_name="PYTHON_SNOWPARK_COMPILATION_STAGE_ENABLED",
session=self,
default=False,
synchronize=False,
),
VersionedSessionParameter(
"cte_optimization_enabled",
dedent(
"""Set to ``True`` to enable the CTE optimization (defaults to ``False``).
The generated SQLs from ``DataFrame`` transformations would have duplicate subquery as CTEs if the CTE optimization is enabled.
"""
"""
Set to ``True`` to enable the CTE optimization (defaults to ``False``).
The generated SQLs from ``DataFrame`` transformations would have duplicate subquery as CTEs if the CTE optimization is enabled.
"""
),
parameter_name="PYTHON_SNOWPARK_USE_CTE_OPTIMIZATION_VERSION",
session=self,
default=False,
synchronize=False,
experimental_since="1.15.0",
),
SessionParameter(
Expand All @@ -606,66 +604,63 @@ def _initialize_config(
telemetry_hook=self._conn._telemetry_client.send_eliminate_numeric_sql_value_cast_telemetry,
session=self,
default=False,
synchronize=False,
experimental_since="1.20.0",
),
VersionedSessionParameter(
"auto_clean_up_temp_table_enabled",
dedent(
"""
When setting this parameter to ``True``, Snowpark will automatically clean up temporary tables created by
:meth:`DataFrame.cache_result` in the current session when the DataFrame is no longer referenced (i.e., gets garbage collected).
The default value is ``False``.
Note:
Temporary tables will only be dropped if this parameter is enabled during garbage collection.
If a temporary table is no longer referenced when the parameter is on, it will be dropped during garbage collection.
However, if garbage collection occurs while the parameter is off, the table will not be removed.
Note that Python's garbage collection is triggered opportunistically, with no guaranteed timing.
"""
When setting this parameter to ``True``, Snowpark will automatically clean up temporary tables created by
:meth:`DataFrame.cache_result` in the current session when the DataFrame is no longer referenced (i.e., gets garbage collected).
The default value is ``False``.
Note:
Temporary tables will only be dropped if this parameter is enabled during garbage collection.
If a temporary table is no longer referenced when the parameter is on, it will be dropped during garbage collection.
However, if garbage collection occurs while the parameter is off, the table will not be removed.
Note that Python's garbage collection is triggered opportunistically, with no guaranteed timing.
"""
),
parameter_name="PYTHON_SNOWPARK_AUTO_CLEAN_UP_TEMP_TABLE_ENABLED_VERSION",
telemetry_hook=self._conn._telemetry_client.send_auto_clean_up_temp_table_telemetry,
session=self,
default=False,
synchronize=False,
experimental_since="1.21.0",
),
SessionParameter(
"reduce_describe_query_enabled",
dedent(
"""
When setting this parameter to ``True``, Snowpark will infer the schema of DataFrame locally if possible,
instead of issuing an internal `describe query
<https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-example#retrieving-column-metadata>`_
to get the schema from the Snowflake server. This optimization improves the performance of your workloads by
reducing the number of describe queries issued to the server.
The default value is ``False``.
"""
When setting this parameter to ``True``, Snowpark will infer the schema of DataFrame locally if possible,
instead of issuing an internal `describe query
<https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-example#retrieving-column-metadata>`_
to get the schema from the Snowflake server. This optimization improves the performance of your workloads by
reducing the number of describe queries issued to the server.
The default value is ``False``.
"""
),
parameter_name="PYTHON_SNOWPARK_REDUCE_DESCRIBE_QUERY_ENABLED",
telemetry_hook=self._conn._telemetry_client.send_reduce_describe_query_telemetry,
session=self,
default=False,
synchronize=False,
experimental_since="1.24.0",
),
SettingGroup(
"large_query_breakdown_params",
settings=[
SessionParameter(
VersionedSessionParameter(
"large_query_breakdown_enabled",
dedent(
"""Set the value for large_query_breakdown_enabled. When enabled, the client will automatically detect large query plans and break them down into smaller partitions,
materialize the partitions, and then combine them to execute the query to improve
overall performance.
"""
"""
Set the value for large_query_breakdown_enabled. When enabled, the client will automatically detect large query plans and break them down into smaller partitions,
materialize the partitions, and then combine them to execute the query to improve
overall performance.
"""
),
parameter_name="PYTHON_SNOWPARK_USE_LARGE_QUERY_BREAKDOWN_OPTIMIZATION",
parameter_name="PYTHON_SNOWPARK_USE_LARGE_QUERY_BREAKDOWN_OPTIMIZATION_VERSION",
telemetry_hook=self._conn._telemetry_client.send_large_query_breakdown_telemetry,
session=self,
default=False,
synchronize=False,
experimental_since="1.22.0",
),
# The complexity score lower bound is set to match COMPILATION_MEMORY_LIMIT
Expand All @@ -675,15 +670,13 @@ def _initialize_config(
parameter_name="PYTHON_SNOWPARK_LARGE_QUERY_BREAKDOWN_COMPLEXITY_LOWER_BOUND",
session=self,
default=DEFAULT_COMPLEXITY_SCORE_LOWER_BOUND,
synchronize=False,
experimental_since="1.22.0",
),
SessionParameter(
"_large_query_breakdown_complexity_upper_bound",
parameter_name="PYTHON_SNOWPARK_LARGE_QUERY_BREAKDOWN_COMPLEXITY_UPPER_BOUND",
session=self,
default=DEFAULT_COMPLEXITY_SCORE_UPPER_BOUND,
synchronize=False,
experimental_since="1.22.0",
),
],
Expand All @@ -693,7 +686,6 @@ def _initialize_config(
parameter_name="PYTHON_SNOWPARK_ENABLE_SCOPED_TEMP_READ_ONLY_TABLE",
session=self,
default=DEFAULT_COMPLEXITY_SCORE_UPPER_BOUND,
synchronize=False,
experimental_since="1.22.0",
),
],
Expand All @@ -705,7 +697,7 @@ def _initialize_config(

return conf

def __getattribute__(self, name):
def __getattribute__(self, name) -> Any:
if name in COMPAT_PROPERTIES:
return self._conf.get(name)
return super().__getattribute__(name)
Expand Down
8 changes: 4 additions & 4 deletions tests/integ/test_cte.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def check_result(
high_query_count_reason=None,
):
df = df.sort(df.columns)
session._cte_optimization_enabled = False
session.cte_optimization_enabled = False
with SqlCounter(
query_count=query_count,
describe_count=describe_count,
Expand All @@ -89,7 +89,7 @@ def check_result(
result_count = df.count()
result_pandas = df.to_pandas() if installed_pandas else None

session._cte_optimization_enabled = True
session.cte_optimization_enabled = True
if cte_union_count is None:
cte_union_count = union_count
if cte_join_count is None:
Expand Down Expand Up @@ -236,10 +236,10 @@ def test_join_with_alias_dataframe(session):
.select(col("L", "col1"), col("R", "col2"))
)

session._cte_optimization_enabled = False
session.cte_optimization_enabled = False
result = df_res.collect()

session._cte_optimization_enabled = True
session.cte_optimization_enabled = True
cte_result = df_res.collect()

Utils.check_answer(cte_result, result)
Expand Down
21 changes: 11 additions & 10 deletions tests/integ/test_large_query_breakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from tests.integ.utils.sql_counter import SqlCounter, sql_count_checker
from tests.utils import IS_IN_STORED_PROC, Utils
from snowflake.snowpark._internal.utils import warning_dict

pytestmark = [
pytest.mark.xfail(
Expand Down Expand Up @@ -56,18 +57,18 @@ def setup(session):
cte_optimization_enabled = session.cte_optimization_enabled
is_query_compilation_stage_enabled = session._query_compilation_stage_enabled
session._query_compilation_stage_enabled = True
session._large_query_breakdown_enabled = True
session.large_query_breakdown_enabled = True
session.cte_optimization_enabled = False
set_bounds(session, 300, 600)
yield
session._query_compilation_stage_enabled = is_query_compilation_stage_enabled
session.cte_optimization_enabled = cte_optimization_enabled
session._large_query_breakdown_enabled = large_query_breakdown_enabled
session.large_query_breakdown_enabled = large_query_breakdown_enabled
reset_bounds(session)


def set_bounds(session: Session, lower_bound: int, upper_bound: int):
session._large_query_breakdown_complexity_bounds = (lower_bound, upper_bound)
session.large_query_breakdown_complexity_bounds = (lower_bound, upper_bound)


def reset_bounds(session: Session):
Expand All @@ -81,15 +82,15 @@ def reset_bounds(session: Session):
def check_result_with_and_without_breakdown(session, df):
large_query_enabled = session.large_query_breakdown_enabled
try:
session._large_query_breakdown_enabled = True
session.large_query_breakdown_enabled = True
enabled_result = df.collect()

session._large_query_breakdown_enabled = False
session.large_query_breakdown_enabled = False
disabled_result = df.collect()

Utils.check_answer(enabled_result, disabled_result)
finally:
session._large_query_breakdown_enabled = large_query_enabled
session.large_query_breakdown_enabled = large_query_enabled


def check_summary_breakdown_value(patch_send, expected_summary):
Expand Down Expand Up @@ -311,7 +312,7 @@ def test_update_delete_merge(session, large_query_df):
pytest.skip(
"without sql simplifier, the plan is too large and hits max recursion depth"
)
session._large_query_breakdown_enabled = True
session.large_query_breakdown_enabled = True
table_name = Utils.random_table_name()
# There is one SELECT CURRENT_TRANSACTION() query and one save_as_table query since large
# query breakdown is not triggered.
Expand Down Expand Up @@ -564,6 +565,7 @@ def test_optimization_skipped_with_views_and_dynamic_tables(session, caplog):
source_table = Utils.random_table_name()
table_name = Utils.random_table_name()
view_name = Utils.random_view_name()
session.large_query_breakdown_enabled = True
try:
session.sql("select 1 as a, 2 as b").write.save_as_table(source_table)
df = session.table(source_table)
Expand Down Expand Up @@ -781,9 +783,8 @@ def test_complexity_bounds_affect_num_partitions(session, large_query_df):

@sql_count_checker(query_count=0)
def test_large_query_breakdown_enabled_parameter(session, caplog):
with caplog.at_level(logging.WARNING):
session.large_query_breakdown_enabled = True
assert "large_query_breakdown_enabled is experimental" in caplog.text
session.large_query_breakdown_enabled = True
assert warning_dict["large_query_breakdown_enabled"].count >= 1


@pytest.mark.skipif(IS_IN_STORED_PROC, reason="requires graphviz")
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/test_reduce_describe_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def test_reduce_describe_query_enabled_on_session(db_parameters):

parameters = db_parameters.copy()
parameters["session_parameters"] = {
"reduce_describe_query_enabled": not default_value
"PYTHON_SNOWPARK_REDUCE_DESCRIBE_QUERY_ENABLED": not default_value
}
with Session.builder.configs(parameters).create() as new_session2:
assert new_session2.reduce_describe_query_enabled is not default_value
14 changes: 2 additions & 12 deletions tests/integ/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,27 @@
def test_runtime_config(db_parameters):
session = (
Session.builder.configs(db_parameters)
.config("client_prefetch_threads", 10)
.config("sql_simplifier_enabled", False)
.config("use_constant_subquery_alias", False)
.create()
)
# test conf.get
assert not session.conf.get("nonexistent_client_side_fix", default=False)
assert session.conf.get("client_prefetch_threads") == 10
assert not session.sql_simplifier_enabled
assert not session.conf.get("sql_simplifier_enabled")
assert not session.conf.get("use_constant_subquery_alias")
assert session.conf.get("password") is None

# test conf.is_mutable
assert session.conf.is_mutable("telemetry_enabled")
assert session.conf.is_mutable("sql_simplifier_enabled")
assert session.conf.is_mutable("use_constant_subquery_alias")
assert not session.conf.is_mutable("host")
assert not session.conf.is_mutable("is_pyformat")

# test conf.set
session.conf.set("sql_simplifier_enabled", True)
assert session.sql_simplifier_enabled
assert session.conf.get("sql_simplifier_enabled")
session.conf.set("use_constant_subquery_alias", True)
assert session.conf.get("use_constant_subquery_alias")
with pytest.raises(AttributeError) as err:
with pytest.raises(ValueError) as err:
session.conf.set("use_openssl_only", False)
assert (
'Configuration "use_openssl_only" does not exist or is not mutable in runtime'
in err.value.args[0]
"Unable to set setting. Unknown setting use_openssl_only" in err.value.args[0]
)

session.close()
Expand Down

0 comments on commit 72c910b

Please sign in to comment.