Skip to content

Commit

Permalink
SNOW-1801330: Map numpy functions in apply to builtin snowflake funct…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
sfc-gh-nkumar committed Dec 12, 2024
1 parent ae1fe68 commit 5e77256
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 55 deletions.
126 changes: 74 additions & 52 deletions src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,7 @@
pandas_lit,
is_compatible_snowpark_types,
)
from snowflake.snowpark.functions import (
builtin,
col,
dense_rank,
ln,
log,
_log2,
_log10,
sin,
snowflake_cortex_summarize,
udf,
to_variant,
when,
udtf,
exp,
cos,
tan,
sinh,
cosh,
tanh,
ceil,
floor,
trunc,
sqrt,
)
from snowflake.snowpark import functions as sp_func
from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame
from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import (
OrderedDataFrame,
Expand Down Expand Up @@ -95,22 +71,66 @@
cloudpickle.register_pickle_by_value(sys.modules[__name__])

SUPPORTED_SNOWPARK_PYTHON_FUNCTIONS_IN_APPLY = {
exp,
ln,
log,
_log2,
_log10,
sin,
cos,
tan,
sinh,
cosh,
tanh,
ceil,
floor,
trunc,
sqrt,
snowflake_cortex_summarize,
sp_func.exp,
sp_func.ln,
sp_func.log,
sp_func._log2,
sp_func._log10,
sp_func.sin,
sp_func.cos,
sp_func.tan,
sp_func.sinh,
sp_func.cosh,
sp_func.tanh,
sp_func.ceil,
sp_func.floor,
sp_func.trunc,
sp_func.sqrt,
sp_func.snowflake_cortex_summarize,
}

NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION = {
# Math operations
np.absolute: sp_func.abs,
np.sign: sp_func.sign,
np.negative: sp_func.negate,
np.positive: lambda col: col,
np.sqrt: sp_func.sqrt,
np.square: lambda col: sp_func.builtin("square")(col),
np.cbrt: lambda col: sp_func.builtin("cbrt")(col),
np.reciprocal: lambda col: 1 / col,
np.exp: sp_func.exp,
np.exp2: lambda col: sp_func.pow(2, col),
np.expm1: lambda col: sp_func.exp(col) - 1,
np.log: sp_func.ln,
np.log2: sp_func._log2,
np.log10: sp_func._log10,
np.log1p: lambda col: sp_func.ln(col + 1),
# Aggregate functions translate to identity functions when applied element wise
np.sum: lambda col: col,
np.min: lambda col: col,
np.max: lambda col: col,
# Trigonometric functions
np.sin: sp_func.sin,
np.cos: sp_func.cos,
np.tan: sp_func.tan,
np.sinh: sp_func.sinh,
np.cosh: sp_func.cosh,
np.tanh: sp_func.tanh,
np.arcsin: lambda col: sp_func.builtin("asin")(col),
np.arccos: lambda col: sp_func.builtin("acos")(col),
np.arctan: lambda col: sp_func.builtin("atan")(col),
np.arctan2: lambda col: sp_func.builtin("atan2")(col),
np.arcsinh: lambda col: sp_func.builtin("asinh")(col),
np.arccosh: lambda col: sp_func.builtin("acosh")(col),
np.arctanh: lambda col: sp_func.builtin("atanh")(col),
np.degrees: lambda col: sp_func.builtin("degrees")(col),
np.radians: lambda col: sp_func.builtin("radians")(col),
# Floating functions
np.ceil: sp_func.ceil,
np.floor: sp_func.floor,
np.trunc: sp_func.trunc,
np.isnan: sp_func.is_null,
}


Expand Down Expand Up @@ -285,7 +305,7 @@ def end_partition(self, df): # type: ignore[no-untyped-def] # pragma: no cover
ApplyFunc.end_partition._sf_vectorized_input = native_pd.DataFrame # type: ignore[attr-defined]

packages = list(session.get_packages().values()) + udf_packages
func_udtf = udtf(
func_udtf = sp_func.udtf(
ApplyFunc,
output_schema=PandasDataFrameType(
[LongType(), StringType(), VariantType()],
Expand Down Expand Up @@ -707,7 +727,7 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
excluded=existing_identifiers,
wrap_double_underscore=False,
)
return udtf(
return sp_func.udtf(
ApplyFunc,
output_schema=PandasDataFrameType(
[StringType(), IntegerType(), VariantType(), IntegerType(), IntegerType()],
Expand Down Expand Up @@ -781,7 +801,7 @@ def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover
def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover
return x.apply(func, args=args, **kwargs)

func_udf = udf(
func_udf = sp_func.udf(
apply_func,
return_type=PandasSeriesType(return_type),
input_types=[PandasSeriesType(input_type)],
Expand Down Expand Up @@ -1185,12 +1205,12 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe(
# in GROUP_KEY_APPEARANCE_ORDER) and assign the
# label i to all rows that came from func(group_i).
[
col(original_row_position_snowflake_quoted_identifier).as_(
sp_func.col(original_row_position_snowflake_quoted_identifier).as_(
new_index_identifier
)
if sort_method is GroupbyApplySortMethod.ORIGINAL_ROW_ORDER
else (
dense_rank().over(
sp_func.dense_rank().over(
Window.order_by(
*(
SnowparkColumn(col).asc_nulls_last()
Expand All @@ -1211,9 +1231,11 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe(
),
*[
(
col(old_quoted_identifier).as_(quoted_identifier)
sp_func.col(old_quoted_identifier).as_(quoted_identifier)
if return_variant
else col(old_quoted_identifier).cast(return_type).as_(quoted_identifier)
else sp_func.col(old_quoted_identifier)
.cast(return_type)
.as_(quoted_identifier)
)
for old_quoted_identifier, quoted_identifier in zip(
data_column_snowflake_quoted_identifiers
Expand Down Expand Up @@ -1398,7 +1420,7 @@ def groupby_apply_sort_method(
# Need to wrap column name in IDENTIFIER, or else bool agg function
# will treat the name as a string literal
is_transform: bool = not ordered_dataframe_before_sort.agg(
builtin("boolor_agg")(
sp_func.builtin("boolor_agg")(
SnowparkColumn(original_row_position_quoted_identifier) == -1
).as_("is_transform")
).collect()[0][0]
Expand Down Expand Up @@ -1473,7 +1495,7 @@ def make_condition(key: Any) -> SnowparkColumn:
# Cast one of the values in the comparison to variant so that we
# we can compare types that are otherwise not comparable in
# Snowflake, like timestamp and int.
return col.equal_null(to_variant(pandas_lit(key)))
return col.equal_null(sp_func.to_variant(pandas_lit(key)))

# If any of the values we are mapping to have types that are
# incompatible with the current column's type, we have to cast the new
Expand All @@ -1496,7 +1518,7 @@ def make_condition(key: Any) -> SnowparkColumn:
def make_result(value: Any) -> SnowparkColumn:
value_expression = pandas_lit(value)
return (
to_variant(value_expression)
sp_func.to_variant(value_expression)
if should_cast_result_to_variant
else value_expression
)
Expand All @@ -1508,7 +1530,7 @@ def make_result(value: Any) -> SnowparkColumn:
make_condition(key_and_value[0]), make_result(key_and_value[1])
),
itertools.islice(map_items, 1, None),
when(make_condition(first_key), make_result(first_value)),
sp_func.when(make_condition(first_key), make_result(first_value)),
)
if isinstance(mapping, defaultdict):
case_expression = case_expression.otherwise(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
APPLY_LABEL_COLUMN_QUOTED_IDENTIFIER,
APPLY_VALUE_COLUMN_QUOTED_IDENTIFIER,
DEFAULT_UDTF_PARTITION_SIZE,
NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION,
GroupbyApplySortMethod,
check_return_variant_and_get_return_type,
create_udf_for_series_apply,
Expand Down Expand Up @@ -8755,6 +8756,12 @@ def applymap(
f"Snowpark pandas applymap API doesn't yet support Snowpark Python function `{func.__name__}` with args = '{args}'."
)
return self._apply_snowpark_python_function_to_columns(func, kwargs)

# Check if the function is a known numpy function that can be translated to Snowflake function.
sf_func = NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION.get(func)
if sf_func is not None:
return self._apply_snowpark_python_function_to_columns(sf_func, kwargs)

# Currently, NULL values are always passed into the udtf even if strict=True,
# which is a bug on the server side SNOW-880105.
# The fix will not land soon, so we are going to raise not implemented error for now.
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_applymap.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_applymap_numpy(func):
native_df = native_pd.DataFrame(data)
snow_df = pd.DataFrame(data)

with SqlCounter(query_count=7, udf_count=1):
with SqlCounter(query_count=1):
eval_snowpark_pandas_result(snow_df, native_df, lambda x: x.applymap(func))


Expand Down
4 changes: 2 additions & 2 deletions tests/integ/modin/series/test_apply_and_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def create_func_with_return_type_hint(func: Callable, return_type: str) -> Calla
return d["f"]


TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.median]
TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.square, np.log1p, np.exp2]


@pytest.mark.parametrize("method", ["apply", "map"])
Expand Down Expand Up @@ -412,7 +412,7 @@ def test_builtin_function(self, method, func):
)

@pytest.mark.parametrize("func", TEST_NUMPY_FUNCS)
@sql_count_checker(query_count=4, udf_count=1)
@sql_count_checker(query_count=1)
def test_apply_and_map_numpy(self, method, func):
data = [1.0, 2.0, 3.0]
native_series = native_pd.Series(data)
Expand Down

0 comments on commit 5e77256

Please sign in to comment.