Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1805840: Augment telemetry with method_call_count #2804

Merged
merged 18 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9810e24
SNOW-1805840: add method_call_count and interchange_call_count to tel…
sfc-gh-lmukhopadhyay Dec 20, 2024
d6d1a6e
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Dec 20, 2024
733f4b0
update changelog
sfc-gh-lmukhopadhyay Dec 20, 2024
ed2bd25
fix telem tests
sfc-gh-lmukhopadhyay Dec 20, 2024
0801060
fix unit telem tests
sfc-gh-lmukhopadhyay Jan 4, 2025
13c3f5e
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 4, 2025
69a2cf2
fix telem unit error test
sfc-gh-lmukhopadhyay Jan 4, 2025
cccc1e2
remove interchange call count and address comments
sfc-gh-lmukhopadhyay Jan 6, 2025
857e249
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 6, 2025
e04b091
update changelog
sfc-gh-lmukhopadhyay Jan 6, 2025
05204b0
add test for multiple funcs on same qc
sfc-gh-lmukhopadhyay Jan 7, 2025
b38e304
address comments
sfc-gh-lmukhopadhyay Jan 8, 2025
2509c2e
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 8, 2025
7062bd0
resolve conf
sfc-gh-lmukhopadhyay Jan 8, 2025
b3caf57
Merge branch 'lmukhopadhyay-SNOW-1805840-telem-call-count' of github.…
sfc-gh-lmukhopadhyay Jan 8, 2025
4cb5589
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 9, 2025
f157851
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 13, 2025
19282ee
Merge branch 'main' into lmukhopadhyay-SNOW-1805840-telem-call-count
sfc-gh-lmukhopadhyay Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
- Updated integration testing for `session.lineage.trace` to exclude deleted objects
- Added documentation for `DataFrame.map`.
- Improve performance of `DataFrame.apply` by mapping numpy functions to snowpark functions if possible.
- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn
- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn.
- Added `method_call_count` and `interchange_call_count` to telemetry.
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved

## 1.26.0 (2024-12-05)

Expand Down
36 changes: 35 additions & 1 deletion src/snowflake/snowpark/modin/plugin/_internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from snowflake.snowpark.modin.plugin._internal.utils import (
is_snowpark_pandas_dataframe_or_series_type,
)
from collections import Counter
from snowflake.snowpark.query_history import QueryHistory
from snowflake.snowpark.session import Session

Expand All @@ -36,6 +37,8 @@ class SnowparkPandasTelemetryField(Enum):
ARGS = "argument"
# fallback flag
IS_FALLBACK = "is_fallback"
CALL_COUNT = "call_count"
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
INTERCHANGE_CALL_COUNT = "interchange_call_count"


# Argument truncating size after converted to str. Size amount can be later specified after analysis and needs.
Expand All @@ -58,6 +61,8 @@ def _send_snowpark_pandas_telemetry_helper(
func_name: str,
query_history: Optional[QueryHistory],
api_calls: Union[str, list[dict[str, Any]]],
method_call_count: Counter[str],
interchange_call_count: Counter[str],
) -> None:
"""
A helper function that sends Snowpark pandas API telemetry data.
Expand All @@ -71,14 +76,30 @@ def _send_snowpark_pandas_telemetry_helper(
query_history: The query history context manager to record queries that are pushed down to the Snowflake
database in the session.
api_calls: Optional list of Snowpark pandas API calls made during the function execution.
method_call_count: Number of times a method has been called.
interchange_call_count: Number of times __dataframe__ has been called.

Returns:
None
"""
data: dict[str, Union[str, list[dict[str, Any]], list[str], Optional[str]]] = {
data: dict[
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
str, Union[str, list[dict[str, Any]], list[str], Optional[str], Counter[str]]
] = {
TelemetryField.KEY_FUNC_NAME.value: func_name,
TelemetryField.KEY_CATEGORY.value: SnowparkPandasTelemetryField.FUNC_CATEGORY_SNOWPARK_PANDAS.value,
TelemetryField.KEY_ERROR_MSG.value: error_msg,
**(
{SnowparkPandasTelemetryField.CALL_COUNT.value: method_call_count}
if method_call_count is not None
else {}
),
**(
{
SnowparkPandasTelemetryField.INTERCHANGE_CALL_COUNT.value: interchange_call_count
}
if interchange_call_count is not None
else {}
),
}
if len(api_calls) > 0:
data[TelemetryField.KEY_API_CALLS.value] = api_calls
Expand Down Expand Up @@ -274,6 +295,8 @@ def _telemetry_helper(
# Moving existing api call out first can avoid to generate duplicates.
existing_api_calls = []
need_to_restore_args0_api_calls = False
method_call_count = None
interchange_call_count = None

# If the decorated func is a class method or a standalone function, we need to get an active session:
if is_standalone_function or (len(args) > 0 and isinstance(args[0], type)):
Expand All @@ -295,6 +318,13 @@ def _telemetry_helper(
need_to_restore_args0_api_calls = True
session = args[0]._query_compiler._modin_frame.ordered_dataframe.session
class_prefix = args[0].__class__.__name__
args[0]._query_compiler._method_call_counts[func.__qualname__] += 1
sfc-gh-lmukhopadhyay marked this conversation as resolved.
Show resolved Hide resolved
method_call_count = args[0]._query_compiler._method_call_counts[
func.__qualname__
]
interchange_call_count = args[0]._query_compiler._method_call_counts[
"__dataframe__"
]
except (TypeError, IndexError, AttributeError):
# TypeError: args might not support indexing; IndexError: args is empty; AttributeError: args[0] might not
# have _query_compiler attribute.
Expand Down Expand Up @@ -337,6 +367,8 @@ def _telemetry_helper(
func_name=func_name,
query_history=query_history,
api_calls=existing_api_calls + [curr_api_call],
method_call_count=method_call_count,
interchange_call_count=interchange_call_count,
)
raise e

Expand Down Expand Up @@ -371,6 +403,8 @@ def _telemetry_helper(
func_name=func_name,
query_history=query_history,
api_calls=existing_api_calls + [curr_api_call],
method_call_count=method_call_count,
interchange_call_count=interchange_call_count,
)
if need_to_restore_args0_api_calls:
args[0]._query_compiler.snowpark_pandas_api_calls = existing_api_calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import json
import logging
import re
from collections import Counter
import typing
import uuid
from collections.abc import Hashable, Iterable, Mapping, Sequence
Expand Down Expand Up @@ -533,6 +534,7 @@ def __init__(self, frame: InternalFrame) -> None:
# Copying and modifying self.snowpark_pandas_api_calls is taken care of in telemetry decorators
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
self.snowpark_pandas_api_calls: list = []
self._attrs: dict[Any, Any] = {}
self._method_call_counts: Counter[str] = Counter[str]()

def _raise_not_implemented_error_for_timedelta(
self, frame: InternalFrame = None
Expand Down
85 changes: 85 additions & 0 deletions tests/integ/modin/test_telemetry.py
sfc-gh-lmukhopadhyay marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def test_snowpark_pandas_telemetry_method_decorator(test_table_name):
"sfqids",
"func_name",
"error_msg",
"call_count",
"interchange_call_count",
}
assert data["category"] == "snowpark_pandas"
assert data["api_calls"] == df1_expected_api_calls + [
Expand Down Expand Up @@ -178,6 +180,8 @@ def test_send_snowpark_pandas_telemetry_helper(send_mock):
func_name="test_send_func",
query_history=None,
api_calls=[],
method_call_count=None,
interchange_call_count=None,
)
send_mock.assert_called_with(
{
Expand Down Expand Up @@ -559,6 +563,87 @@ def test_telemetry_repr():
]


@sql_count_checker(query_count=6, join_count=4)
def test_telemetry_interchange_call_count():
s = pd.DataFrame([1, 2, 3, 4])
t = pd.DataFrame([5])
s.__dataframe__()
s.__dataframe__()
t.__dataframe__()

s.iloc[0, 0] = 7
s.__dataframe__()
s.__dataframe__()
t.__dataframe__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__dataframe__"
]
assert len(telemetry_data) == 6
# s calls __dataframe__() for the first time.
assert telemetry_data[0]["call_count"] == 1
assert telemetry_data[0]["interchange_call_count"] == 1
# s calls __dataframe__() for the second time.
sfc-gh-lmukhopadhyay marked this conversation as resolved.
Show resolved Hide resolved
assert telemetry_data[1]["call_count"] == 2
assert telemetry_data[1]["interchange_call_count"] == 2
# t calls __dataframe__() for the first time.
assert telemetry_data[2]["call_count"] == 1
assert telemetry_data[2]["interchange_call_count"] == 1
# the new version of s calls __dataframe__() for the first time.
assert telemetry_data[3]["call_count"] == 1
assert telemetry_data[3]["interchange_call_count"] == 1
# the new version of s calls __dataframe__() for the second time.
assert telemetry_data[4]["call_count"] == 2
assert telemetry_data[4]["interchange_call_count"] == 2
# t calls __dataframe__() for the second time.
assert telemetry_data[5]["call_count"] == 2
assert telemetry_data[5]["interchange_call_count"] == 2


@sql_count_checker(query_count=4)
def test_telemetry_func_call_count():
s = pd.DataFrame([1, 2, np.nan, 4])
t = pd.DataFrame([5])

s.__repr__()
s.__repr__()
s.__repr__()

t.__repr__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__repr__"
]

# second to last call from telemetry data
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
assert telemetry_data[-2]["call_count"] == 3
assert telemetry_data[-2]["interchange_call_count"] == 0

# last call from telemetry data
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
assert telemetry_data[-1]["call_count"] == 1
assert telemetry_data[-1]["interchange_call_count"] == 0


@sql_count_checker(query_count=0)
def test_telemetry_copy():
# copy() is defined in upstream Modin's BasePandasDataset class, and not overridden by any
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/modin/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def snowpark_pandas_error_test_helper(
query_history=ANY,
telemetry_type=telemetry_type,
error_msg=error_msg,
method_call_count=ANY,
interchange_call_count=ANY,
)


Expand Down Expand Up @@ -115,6 +117,8 @@ def raise_real_type_error(_):
query_history=ANY,
telemetry_type="snowpark_pandas_type_error",
error_msg=None,
method_call_count=ANY,
interchange_call_count=ANY,
)
assert len(mock_arg2._query_compiler.snowpark_pandas_api_calls) == 0

Expand All @@ -133,6 +137,8 @@ def raise_real_type_error(_):
query_history=ANY,
telemetry_type="snowpark_pandas_type_error",
error_msg=None,
method_call_count=ANY,
interchange_call_count=ANY,
)


Expand Down
Loading