Skip to content

Commit

Permalink
Merge branch 'main' into task/changeAdvSimInit
Browse files Browse the repository at this point in the history
  • Loading branch information
nagkumar91 authored Jun 27, 2024
2 parents 5c91d3b + 9a8c10d commit 861581a
Show file tree
Hide file tree
Showing 21 changed files with 529 additions and 163 deletions.
3 changes: 2 additions & 1 deletion src/promptflow-devkit/promptflow/_cli/_pf/_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,14 @@ def _test_flow_multi_modal(args, pf_client, environment_variables):
else:
from promptflow._sdk._tracing import _invoke_pf_svc

pfs_port = _invoke_pf_svc()
pfs_port, service_host = _invoke_pf_svc()
serve_app_port = args.port or find_available_port()
enable_internal_features = Configuration.get_instance().is_internal_features_enabled()
start_chat_ui_service_monitor(
flow=args.flow,
serve_app_port=serve_app_port,
pfs_port=pfs_port,
service_host=service_host,
url_params=list_of_dict_to_dict(args.url_params),
init=list_of_dict_to_dict(args.init),
enable_internal_features=enable_internal_features,
Expand Down
66 changes: 34 additions & 32 deletions src/promptflow-devkit/promptflow/_cli/_pf/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from promptflow._sdk._constants import (
HOME_PROMPT_FLOW_DIR,
PF_SERVICE_DEBUG,
PF_SERVICE_HOST,
PF_SERVICE_LOG_FILE,
PF_SERVICE_WORKER_NUM,
)
Expand All @@ -28,6 +27,7 @@
check_pfs_service_status,
dump_port_to_config,
get_current_env_pfs_file,
get_pfs_host,
get_pfs_version,
get_port_from_config,
get_started_service_info,
Expand Down Expand Up @@ -169,64 +169,65 @@ def start_service(args):
# User Agent will be set based on header in request, so not set globally here.
os.environ[PF_NO_INTERACTIVE_LOGIN] = "true"
port = args.port
service_host = get_pfs_host()
if args.debug:
os.environ[PF_SERVICE_DEBUG] = "true"
if not is_run_from_built_binary():
add_executable_script_to_env_path()
port = _prepare_app_for_foreground_service(port, args.force)
waitress.serve(app, host=PF_SERVICE_HOST, port=port, threads=PF_SERVICE_WORKER_NUM)
port = _prepare_app_for_foreground_service(port, args.force, service_host)
waitress.serve(app, host=service_host, port=port, threads=PF_SERVICE_WORKER_NUM)
else:
if is_run_from_built_binary():
# For msi installer/executable, use sdk api to start pfs since it's not supported to invoke waitress by cli
# directly after packaged by Pyinstaller.
parent_dir = os.path.dirname(sys.executable)
output_path = os.path.join(parent_dir, "output.txt")
with redirect_stdout_to_file(output_path):
port = _prepare_app_for_foreground_service(port, args.force)
waitress.serve(app, host=PF_SERVICE_HOST, port=port, threads=PF_SERVICE_WORKER_NUM)
port = _prepare_app_for_foreground_service(port, args.force, service_host)
waitress.serve(app, host=service_host, port=port, threads=PF_SERVICE_WORKER_NUM)
else:
port = validate_port(port, args.force)
port = validate_port(port, args.force, service_host)
add_executable_script_to_env_path()
# Start a pfs process using detach mode. It will start a new process and create a new app. So we use
# environment variable to pass the debug mode, since it will inherit parent process environment variable.
if platform.system() == "Windows":
_start_background_service_on_windows(port)
_start_background_service_on_windows(port, service_host)
else:
_start_background_service_on_unix(port)
is_healthy = check_pfs_service_status(port)
_start_background_service_on_unix(port, service_host)
is_healthy = check_pfs_service_status(port, service_host)
if is_healthy:
message = f"Start prompt flow service on port {port}, version: {get_pfs_version()}."
message = f"Start prompt flow service on {service_host}:{port}, version: {get_pfs_version()}."
print(message)
logger.info(message)
else:
logger.warning(f"Prompt flow service start failed in {port}. {hint_stop_before_upgrade}")


def validate_port(port, force_start):
def validate_port(port, force_start, service_host):
if port:
_validate_port(port, force_start)
_validate_port(port, force_start, service_host)
# dump port to config file only when port is valid or force_start is True.
dump_port_to_config(port)
else:
port = get_port_from_config(create_if_not_exists=True)
_validate_port(port, force_start)
port = get_port_from_config(service_host, create_if_not_exists=True)
_validate_port(port, force_start, service_host)
return port


def _validate_port(port, force_start):
if is_port_in_use(port):
def _validate_port(port, force_start, service_host):
if is_port_in_use(port, service_host):
if force_start:
message = f"Force restart the service on the port {port}."
message = f"Force restart the service on the {service_host}:{port}."
if is_run_from_built_binary():
print(message)
logger.warning(message)
kill_exist_service(port)
else:
message = f"Service port {port} is used."
message = f"Service port {service_host}:{port} is used."
if is_run_from_built_binary():
print(message)
logger.warning(message)
raise UserErrorException(f"Service port {port} is used.")
raise UserErrorException(f"Service port {service_host}:{port} is used.")


@contextlib.contextmanager
Expand All @@ -246,22 +247,22 @@ def redirect_stdout_to_file(path):
sys.stderr = old_stderr


def _prepare_app_for_foreground_service(port, force_start):
port = validate_port(port, force_start)
def _prepare_app_for_foreground_service(port, force_start, service_host):
port = validate_port(port, force_start, service_host)
global app
if app is None:
app, _ = create_app()
if os.environ.get(PF_SERVICE_DEBUG) == "true":
app.logger.setLevel(logging.DEBUG)
else:
app.logger.setLevel(logging.INFO)
message = f"Starting prompt flow Service on port {port}, version: {get_pfs_version()}."
message = f"Starting prompt flow Service on {service_host}:{port}, version: {get_pfs_version()}."
app.logger.info(message)
print(message)
return port


def _start_background_service_on_windows(port):
def _start_background_service_on_windows(port, service_host):
try:
import win32api
import win32con
Expand All @@ -272,7 +273,7 @@ def _start_background_service_on_windows(port):
f"service start depends on pywin32.. {ex}"
)
command = (
f"waitress-serve --listen={PF_SERVICE_HOST}:{port} --threads={PF_SERVICE_WORKER_NUM} "
f"waitress-serve --listen={service_host}:{port} --threads={PF_SERVICE_WORKER_NUM} "
"promptflow._cli._pf._service:get_app"
)
logger.debug(f"Start prompt flow service in Windows: {command}")
Expand Down Expand Up @@ -301,11 +302,10 @@ def _start_background_service_on_windows(port):
win32api.CloseHandle(thread_handle)


def _start_background_service_on_unix(port):
# Set host to PF_SERVICE_HOST, only allow request from PF_SERVICE_HOST.
def _start_background_service_on_unix(port, service_host):
cmd = [
"waitress-serve",
f"--listen={PF_SERVICE_HOST}:{port}",
f"--listen={service_host}:{port}",
f"--threads={PF_SERVICE_WORKER_NUM}",
"promptflow._cli._pf._service:get_app",
]
Expand All @@ -314,25 +314,27 @@ def _start_background_service_on_unix(port):


def stop_service():
port = get_port_from_config()
if port is not None and is_port_in_use(port):
service_host = get_pfs_host()
port = get_port_from_config(service_host)
if port is not None and is_port_in_use(port, service_host):
kill_exist_service(port)
message = f"Prompt flow service stop in {port}."
message = f"Prompt flow service stop on {service_host}:{port}."
else:
message = "Prompt flow service is not started."
logger.debug(message)
print(message)


def show_service():
port = get_port_from_config()
service_host = get_pfs_host()
port = get_port_from_config(service_host)
status = get_started_service_info(port)
if is_run_from_built_binary():
log_file = HOME_PROMPT_FLOW_DIR / PF_SERVICE_LOG_FILE
else:
log_file = get_current_env_pfs_file(PF_SERVICE_LOG_FILE)
if status:
extra_info = {"log_file": log_file.as_posix(), "version": get_pfs_version()}
extra_info = {"service_host": service_host, "log_file": log_file.as_posix(), "version": get_pfs_version()}
status.update(extra_info)
dumped_status = json.dumps(status, ensure_ascii=False, indent=2, sort_keys=True, separators=(",", ": ")) + "\n"
print(dumped_status)
Expand Down
6 changes: 6 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DEFAULT_ENCODING,
FLOW_DIRECTORY_MACRO_IN_CONFIG,
HOME_PROMPT_FLOW_DIR,
PF_SERVICE_HOST,
REMOTE_URI_PREFIX,
SERVICE_CONFIG_FILE,
)
Expand Down Expand Up @@ -50,6 +51,7 @@ class Configuration(object):
USER_AGENT = "user_agent"
ENABLE_INTERNAL_FEATURES = "enable_internal_features"
TRACE_DESTINATION = "trace.destination"
PFS_HOST = "service.host"
_instance = None

def __init__(self, overrides=None):
Expand Down Expand Up @@ -303,3 +305,7 @@ def set_temp_config_path(cls, temp_path: Union[str, Path]):
cls.CONFIG_PATH = temp_path / file_name
yield
cls.CONFIG_PATH = original_path

def get_pfs_host(self) -> Optional[str]:
"""Get the prompt flow service host."""
return self.get_config(key=self.PFS_HOST) or PF_SERVICE_HOST
2 changes: 0 additions & 2 deletions src/promptflow-devkit/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ def _prepare_home_dir() -> Path:
PF_SERVICE_LOG_FILE = "pfs.log"
PF_SERVICE_HOST = "127.0.0.1"
PF_SERVICE_DEFAULT_PORT = 23333
PF_SERVICE_HOUR_TIMEOUT = 1
PF_SERVICE_MONITOR_SECOND = 60
PF_SERVICE_WORKER_NUM = 16
PF_TRACE_CONTEXT = "PF_TRACE_CONTEXT"
PF_TRACE_CONTEXT_ATTR = "attributes"
Expand Down
45 changes: 2 additions & 43 deletions src/promptflow-devkit/promptflow/_sdk/_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,14 @@
import os
import threading
import time
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler
from pathlib import PurePath

from flask import Blueprint, Flask, current_app, g, jsonify, redirect, request, url_for
from flask_cors import CORS
from werkzeug.exceptions import HTTPException

from promptflow._sdk._constants import (
PF_SERVICE_DEBUG,
PF_SERVICE_HOUR_TIMEOUT,
PF_SERVICE_MONITOR_SECOND,
CreatedByFieldName,
)
from promptflow._sdk._constants import PF_SERVICE_DEBUG, CreatedByFieldName
from promptflow._sdk._errors import MissingAzurePackage
from promptflow._sdk._service import Api
from promptflow._sdk._service.apis.collector import trace_collector
Expand All @@ -31,16 +25,8 @@
from promptflow._sdk._service.apis.telemetry import api as telemetry_api
from promptflow._sdk._service.apis.ui import api as ui_api
from promptflow._sdk._service.apis.ui import serve_chat_ui, serve_trace_ui
from promptflow._sdk._service.utils.utils import (
FormattedException,
get_log_file_location,
get_pfs_version,
get_port_from_config,
is_run_from_built_binary,
kill_exist_service,
)
from promptflow._sdk._service.utils.utils import FormattedException, get_log_file_location, get_pfs_version
from promptflow._sdk._utilities.general_utils import overwrite_null_std_logger
from promptflow._utils.thread_utils import ThreadWithContextVars

overwrite_null_std_logger()

Expand Down Expand Up @@ -138,7 +124,6 @@ def handle_path_object(obj):

@app.before_request
def log_before_request_info():
app.config["last_request_time"] = datetime.now()
g.start = time.perf_counter()
if "/v1.0/Connections" in request.url:
request_body = "Request body not recorded for Connections API"
Expand All @@ -160,32 +145,6 @@ def log_after_request_info(response):
)
return response

# Start a monitor process using detach mode. It will stop pfs service if no request to pfs service in 1h in
# python scenario. For C# scenario, pfs will live until the process is killed manually.
def monitor_request():
with app.app_context():
while True:
time.sleep(PF_SERVICE_MONITOR_SECOND)
if "last_request_time" in app.config and datetime.now() - app.config[
"last_request_time"
] > timedelta(hours=PF_SERVICE_HOUR_TIMEOUT):
# Todo: check if we have any not complete work? like persist all traces.
app.logger.warning(
f"Last http request time: {app.config['last_request_time']} was made "
f"{PF_SERVICE_HOUR_TIMEOUT}h ago"
)
port = get_port_from_config()
if port:
app.logger.info(
f"Try auto stop promptflow service in port {port} since no request to app within "
f"{PF_SERVICE_HOUR_TIMEOUT}h."
)
kill_exist_service(port)
break

if not is_run_from_built_binary():
monitor_thread = ThreadWithContextVars(target=monitor_request, daemon=True)
monitor_thread.start()
return app, api


Expand Down
Loading

0 comments on commit 861581a

Please sign in to comment.