Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reset user agent for each requests in local pfs #2284

Merged
merged 5 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/promptflow/promptflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
PROMPTFLOW_CONNECTIONS = "PROMPTFLOW_CONNECTIONS"
PROMPTFLOW_SECRETS_FILE = "PROMPTFLOW_SECRETS_FILE"
PF_NO_INTERACTIVE_LOGIN = "PF_NO_INTERACTIVE_LOGIN"
PF_RUN_AS_BUILT_BINARY = "PF_RUN_AS_BUILT_BINARY"
PF_LOGGING_LEVEL = "PF_LOGGING_LEVEL"
OPENAI_API_KEY = "openai-api-key"
BING_API_KEY = "bing-api-key"
Expand All @@ -17,6 +18,7 @@
ERROR_RESPONSE_COMPONENT_NAME = "promptflow"
EXTENSION_UA = "prompt-flow-extension"
LANGUAGE_KEY = "language"
USER_AGENT_OVERRIDE_KEY = "user_agent_override"

# Tool meta info
ICON_DARK = "icon_dark"
Expand Down
3 changes: 3 additions & 0 deletions src/promptflow/promptflow/_core/operation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ def append_user_agent(self, user_agent: str):
user_agent (str): The user agent information to append.
"""
if OperationContext.USER_AGENT_KEY in self:
# TODO: this judgement can be wrong when an user agent is a substring of another,
# e.g. "Mozilla/5.0" and "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
# however, changing this code may impact existing logic, so won't change it now
if user_agent not in self.user_agent:
self.user_agent = f"{self.user_agent.strip()} {user_agent.strip()}"
else:
Expand Down
51 changes: 45 additions & 6 deletions src/promptflow/promptflow/_sdk/_pf_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
from pathlib import Path
from typing import Any, Dict, List, Union

from .._constants import USER_AGENT_OVERRIDE_KEY
from .._utils.logger_utils import get_cli_sdk_logger
from ..exceptions import ErrorTarget, UserErrorException
from ._configuration import Configuration
from ._constants import MAX_SHOW_DETAILS_RESULTS
from ._constants import MAX_SHOW_DETAILS_RESULTS, ConnectionProvider
from ._load_functions import load_flow
from ._user_agent import USER_AGENT
from ._utils import ClientUserAgentUtil, get_connection_operation, setup_user_agent_to_operation_context
from ._utils import ClientUserAgentUtil, setup_user_agent_to_operation_context
from .entities import Run
from .entities._eager_flow import EagerFlow
from .operations import RunOperations
Expand All @@ -34,20 +36,25 @@ class PFClient:

def __init__(self, **kwargs):
logger.debug("PFClient init with kwargs: %s", kwargs)
self._runs = RunOperations(self)
# when this is set, telemetry from this client will use this user agent and ignore the one from OperationContext
self._user_agent_override = kwargs.pop(USER_AGENT_OVERRIDE_KEY, None)
self._connection_provider = kwargs.pop("connection_provider", None)
self._config = kwargs.get("config", None) or {}
# The credential is used as an option to override
# DefaultAzureCredential when using workspace connection provider
self._credential = kwargs.get("credential", None)

# user_agent_override will be applied to all TelemetryMixin operations
self._runs = RunOperations(self, user_agent_override=self._user_agent_override)
self._flows = FlowOperations(client=self, user_agent_override=self._user_agent_override)
self._experiments = ExperimentOperations(self, user_agent_override=self._user_agent_override)
# Lazy init to avoid azure credential requires too early
self._connections = None
self._flows = FlowOperations(client=self)

self._tools = ToolOperations()
# add user agent from kwargs if any
if isinstance(kwargs.get("user_agent"), str):
ClientUserAgentUtil.append_user_agent(kwargs["user_agent"])
self._experiments = ExperimentOperations(self)
self._traces = TraceOperations()
setup_user_agent_to_operation_context(USER_AGENT)

Expand Down Expand Up @@ -243,9 +250,41 @@ def connections(self) -> ConnectionOperations:
"""Connection operations that can manage connections."""
if not self._connections:
self._ensure_connection_provider()
self._connections = get_connection_operation(self._connection_provider, self._credential)
self._connections = PFClient._build_connection_operation(
self._connection_provider,
self._credential,
user_agent_override=self._user_agent_override,
)
return self._connections

@staticmethod
def _build_connection_operation(connection_provider: str, credential=None, **kwargs):
"""
Build a ConnectionOperation object based on connection provider.

:param connection_provider: Connection provider, e.g. local, azureml, azureml://subscriptions..., etc.
:type connection_provider: str
:param credential: Credential when remote provider, default to chained credential DefaultAzureCredential.
:type credential: object
"""
if connection_provider == ConnectionProvider.LOCAL.value:
from promptflow._sdk.operations._connection_operations import ConnectionOperations

logger.debug("PFClient using local connection operations.")
connection_operation = ConnectionOperations(**kwargs)
elif connection_provider.startswith(ConnectionProvider.AZUREML.value):
Stephen1993 marked this conversation as resolved.
Show resolved Hide resolved
from promptflow._sdk.operations._local_azure_connection_operations import LocalAzureConnectionOperations

logger.debug(f"PFClient using local azure connection operations with credential {credential}.")
connection_operation = LocalAzureConnectionOperations(connection_provider, credential=credential, **kwargs)
else:
raise UserErrorException(
target=ErrorTarget.CONTROL_PLANE_SDK,
message_format="Unsupported connection provider: {connection_provider}",
connection_provider=connection_provider,
)
return connection_operation

@property
def flows(self) -> FlowOperations:
"""Operations on the flow that can manage flows."""
Expand Down
8 changes: 2 additions & 6 deletions src/promptflow/promptflow/_sdk/_service/apis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import promptflow._sdk.schemas._connection as connection
from promptflow._sdk._configuration import Configuration
from promptflow._sdk._service import Namespace, Resource, fields
from promptflow._sdk._service.utils.utils import build_pfs_user_agent, local_user_only, make_response_no_content
from promptflow._sdk._service.utils.utils import get_client_from_request, local_user_only, make_response_no_content
from promptflow._sdk.entities._connection import _Connection

api = Namespace("Connections", description="Connections Management")
Expand Down Expand Up @@ -66,14 +66,10 @@ def validate_working_directory(value):


def _get_connection_operation(working_directory=None):
from promptflow._sdk._pf_client import PFClient

connection_provider = Configuration().get_connection_provider(path=working_directory)
# get_connection_operation is a shared function, so we build user agent based on request first and
# then pass it to the function
connection_operation = PFClient(
connection_provider=connection_provider, user_agent=build_pfs_user_agent()
).connections
connection_operation = get_client_from_request(connection_provider=connection_provider).connections
return connection_operation


Expand Down
6 changes: 3 additions & 3 deletions src/promptflow/promptflow/_sdk/_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
import sys
import threading
import time
from datetime import datetime, timedelta
Expand Down Expand Up @@ -30,6 +29,7 @@
FormattedException,
get_current_env_pfs_file,
get_port_from_config,
is_run_from_built_binary,
kill_exist_service,
)
from promptflow._sdk._utils import get_promptflow_sdk_version, overwrite_null_std_logger, read_write_by_user
Expand Down Expand Up @@ -74,7 +74,7 @@ def create_app():
# Enable log
app.logger.setLevel(logging.INFO)
# each env will have its own log file
if sys.executable.endswith("pfcli.exe"):
if is_run_from_built_binary():
log_file = HOME_PROMPT_FLOW_DIR / PF_SERVICE_LOG_FILE
log_file.touch(mode=read_write_by_user(), exist_ok=True)
else:
Expand Down Expand Up @@ -140,7 +140,7 @@ def monitor_request():
kill_exist_service(port)
break

if not sys.executable.endswith("pfcli.exe"):
if not is_run_from_built_binary():
monitor_thread = ThreadWithContextVars(target=monitor_request, daemon=True)
monitor_thread.start()
return app, api
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/promptflow/_sdk/_service/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
get_port_from_config,
get_started_service_info,
is_port_in_use,
is_run_from_built_binary,
kill_exist_service,
)
from promptflow._sdk._utils import get_promptflow_sdk_version, print_pf_version
Expand Down Expand Up @@ -104,7 +105,7 @@ def validate_port(port, force_start):
port = get_port_from_config(create_if_not_exists=True)
validate_port(port, args.force)

if sys.executable.endswith("pfcli.exe"):
if is_run_from_built_binary():
# For msi installer, use sdk api to start pfs since it's not supported to invoke waitress by cli directly
# after packaged by Pyinstaller.
app, _ = create_app()
Expand Down
38 changes: 30 additions & 8 deletions src/promptflow/promptflow/_sdk/_service/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import requests
from flask import abort, make_response, request

from promptflow._constants import PF_RUN_AS_BUILT_BINARY
from promptflow._sdk._constants import (
DEFAULT_ENCODING,
HOME_PROMPT_FLOW_DIR,
Expand Down Expand Up @@ -60,7 +61,7 @@ def get_current_env_pfs_file(file_name):


def get_port_from_config(create_if_not_exists=False):
if sys.executable.endswith("pfcli.exe"):
if is_run_from_built_binary():
port_file_path = HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE
port_file_path.touch(mode=read_write_by_user(), exist_ok=True)
else:
Expand All @@ -79,7 +80,7 @@ def get_port_from_config(create_if_not_exists=False):


def dump_port_to_config(port):
if sys.executable.endswith("pfcli.exe"):
if is_run_from_built_binary():
port_file_path = HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE
port_file_path.touch(mode=read_write_by_user(), exist_ok=True)
else:
Expand Down Expand Up @@ -231,13 +232,34 @@ def __post_init__(self, exception, status_code):


def build_pfs_user_agent():
extra_agent = f"local_pfs/{VERSION}"
if request.user_agent.string:
return f"{request.user_agent.string} {extra_agent}"
return extra_agent
user_agent = request.user_agent.string
user_agent_for_local_pfs = f"local_pfs/{VERSION}"
if user_agent:
return f"{user_agent} {user_agent_for_local_pfs}"
return user_agent_for_local_pfs


def get_client_from_request() -> "PFClient":
def get_client_from_request(*, connection_provider=None) -> "PFClient":
"""
Build a PFClient instance based on current request in local PFS.

User agent may be different for each request.
"""
from promptflow._sdk._pf_client import PFClient

return PFClient(user_agent=build_pfs_user_agent())
user_agent = build_pfs_user_agent()

if connection_provider:
pf_client = PFClient(connection_provider=connection_provider, user_agent_override=user_agent)
else:
pf_client = PFClient(user_agent_override=user_agent)
return pf_client


def is_run_from_built_binary():
"""
Use this function to trigger behavior difference between calling from promptflow sdk/cli and built binary.

Allow customer to use environment variable to control the triggering.
"""
return sys.executable.endswith("pfcli.exe") or os.environ.get(PF_RUN_AS_BUILT_BINARY, "").lower() == "true"
33 changes: 19 additions & 14 deletions src/promptflow/promptflow/_sdk/_telemetry/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def log_activity(
activity_name,
activity_type=ActivityType.INTERNALCALL,
custom_dimensions=None,
user_agent=None,
):
"""Log an activity.

Expand All @@ -121,12 +122,16 @@ def log_activity(
:type activity_type: str
:param custom_dimensions: The custom properties of the activity.
:type custom_dimensions: dict
:param user_agent: Specify user agent. If not specified, the user agent will be got from OperationContext.
:type user_agent: str
:return: None
"""
if not custom_dimensions:
custom_dimensions = {}

user_agent = ClientUserAgentUtil.get_user_agent()
# provided user agent will be respected even if it's ""
if user_agent is None:
user_agent = ClientUserAgentUtil.get_user_agent()
request_id = request_id_context.get()
if not request_id:
# public function call
Expand Down Expand Up @@ -179,17 +184,6 @@ def log_activity(
raise exception


def extract_telemetry_info(self):
"""Extract pf telemetry info from given telemetry mix-in instance."""
result = {}
try:
if isinstance(self, TelemetryMixin):
return self._get_telemetry_values()
except Exception:
pass
return result


def update_activity_name(activity_name, kwargs=None, args=None):
"""Update activity name according to kwargs. For flow test, we want to know if it's node test."""
if activity_name == "pf.flows.test":
Expand Down Expand Up @@ -233,10 +227,21 @@ def wrapper(self, *args, **kwargs):

logger = get_telemetry_logger()

custom_dimensions.update(extract_telemetry_info(self))
if isinstance(self, TelemetryMixin):
custom_dimensions.update(self._get_telemetry_values())
user_agent = self._get_user_agent_override()
else:
user_agent = None

# update activity name according to kwargs.
_activity_name = update_activity_name(activity_name, kwargs=kwargs)
with log_activity(logger, _activity_name, activity_type, custom_dimensions):
with log_activity(
logger=logger,
activity_name=_activity_name,
activity_type=activity_type,
custom_dimensions=custom_dimensions,
user_agent=user_agent,
):
if _activity_name in HINT_ACTIVITY_NAME:
hint_for_update()
# set check_latest_version as deamon thread to avoid blocking main thread
Expand Down
17 changes: 15 additions & 2 deletions src/promptflow/promptflow/_sdk/_telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,38 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
from typing import Optional

from promptflow._constants import USER_AGENT_OVERRIDE_KEY
from promptflow._sdk._configuration import Configuration

PROMPTFLOW_LOGGER_NAMESPACE = "promptflow._sdk._telemetry"


class TelemetryMixin(object):
def __init__(self, **kwargs):
self._user_agent_override = kwargs.pop(USER_AGENT_OVERRIDE_KEY, None)

# Need to call init for potential parent, otherwise it won't be initialized.
# TODO: however, object.__init__() takes exactly one argument (the instance to initialize), so this will fail
# if there are any kwargs left.
super().__init__(**kwargs)

def _get_telemetry_values(self, *args, **kwargs): # pylint: disable=unused-argument
"""Return the telemetry values of object.
"""Return the telemetry values of object, will be set as custom_dimensions in telemetry.

:return: The telemetry values
:rtype: Dict
"""
return {}

def _get_user_agent_override(self) -> Optional[str]:
"""If we have a bonded user agent passed in via the constructor, return it.

Telemetries from this object will use this user agent and ignore the one from OperationContext.
"""
return self._user_agent_override


class WorkspaceTelemetryMixin(TelemetryMixin):
def __init__(self, subscription_id, resource_group_name, workspace_name, **kwargs):
Expand All @@ -31,7 +44,7 @@ def __init__(self, subscription_id, resource_group_name, workspace_name, **kwarg
super().__init__(**kwargs)

def _get_telemetry_values(self, *args, **kwargs): # pylint: disable=unused-argument
"""Return the telemetry values of run operations.
"""Return the telemetry values of object, will be set as custom_dimensions in telemetry.

:return: The telemetry values
:rtype: Dict
Expand Down
Loading
Loading