Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal][Executor] Extract some common constants to promptflow._constants #2303

Merged
merged 5 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/promptflow/promptflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,11 @@ class MessageFormatType:


DEFAULT_OUTPUT_NAME = "output"

OUTPUT_FILE_NAME = "output.jsonl"
PeiwenGaoMS marked this conversation as resolved.
Show resolved Hide resolved


class OutputsFolderName:
FLOW_OUTPUTS = "flow_outputs"
FLOW_ARTIFACTS = "flow_artifacts"
NODE_ARTIFACTS = "node_artifacts"
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from filelock import FileLock

from promptflow._constants import OUTPUT_FILE_NAME, OutputsFolderName
from promptflow._sdk._constants import (
HOME_PROMPT_FLOW_DIR,
LINE_NUMBER,
Expand Down Expand Up @@ -45,6 +46,7 @@
load_multimedia_data_recursively,
resolve_multimedia_data_recursively,
)
from promptflow._utils.utils import prepare_folder
from promptflow._utils.yaml_utils import load_yaml
from promptflow.batch._result import BatchResult
from promptflow.contracts.multimedia import Image
Expand Down Expand Up @@ -191,13 +193,13 @@ class LocalStorageOperations(AbstractBatchRunStorage):

def __init__(self, run: Run, stream=False, run_mode=RunMode.Test):
self._run = run
self.path = self._prepare_folder(self._run._output_path)
self.path = prepare_folder(self._run._output_path)

self.logger = LoggerOperations(
file_path=self.path / LocalStorageFilenames.LOG, stream=stream, run_mode=run_mode
)
# snapshot
self._snapshot_folder_path = self._prepare_folder(self.path / LocalStorageFilenames.SNAPSHOT_FOLDER)
self._snapshot_folder_path = prepare_folder(self.path / LocalStorageFilenames.SNAPSHOT_FOLDER)
self._dag_path = self._snapshot_folder_path / LocalStorageFilenames.DAG
self._flow_tools_json_path = (
self._snapshot_folder_path / PROMPT_FLOW_DIR_NAME / LocalStorageFilenames.FLOW_TOOLS_JSON
Expand All @@ -214,10 +216,10 @@ def __init__(self, run: Run, stream=False, run_mode=RunMode.Test):
# for line run records, store per line
# for normal node run records, store per node per line;
# for reduce node run records, store centralized in 000000000.jsonl per node
self.outputs_folder = self._prepare_folder(self.path / "flow_outputs")
self._outputs_path = self.outputs_folder / "output.jsonl" # dumped by executor
self._node_infos_folder = self._prepare_folder(self.path / "node_artifacts")
self._run_infos_folder = self._prepare_folder(self.path / "flow_artifacts")
self.outputs_folder = prepare_folder(self.path / OutputsFolderName.FLOW_OUTPUTS)
self._outputs_path = self.outputs_folder / OUTPUT_FILE_NAME # dumped by executor
self._node_infos_folder = prepare_folder(self.path / OutputsFolderName.NODE_ARTIFACTS)
self._run_infos_folder = prepare_folder(self.path / OutputsFolderName.FLOW_ARTIFACTS)
self._data_path = Path(run.data) if run.data is not None else None

self._meta_path = self.path / LocalStorageFilenames.META
Expand Down Expand Up @@ -379,7 +381,7 @@ def load_metrics(self, *, parse_const_as_str: bool = False) -> Dict[str, Union[i

def persist_node_run(self, run_info: NodeRunInfo) -> None:
"""Persist node run record to local storage."""
node_folder = self._prepare_folder(self._node_infos_folder / run_info.node)
node_folder = prepare_folder(self._node_infos_folder / run_info.node)
self._persist_run_multimedia(run_info, node_folder)
node_run_record = NodeRunRecord.from_run_info(run_info)
# for reduce nodes, the line_number is None, store the info in the 000000000.jsonl
Expand Down Expand Up @@ -482,12 +484,6 @@ def _serialize_multimedia(self, value, folder_path: Path, relative_path: Path =
serialization_funcs = {Image: partial(Image.serialize, **{"encoder": pfbytes_file_reference_encoder})}
return serialize(value, serialization_funcs=serialization_funcs)

@staticmethod
def _prepare_folder(path: Union[str, Path]) -> Path:
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path

@staticmethod
def _outputs_padding(df: "DataFrame", inputs_line_numbers: List[int]) -> "DataFrame":
import pandas as pd
Expand Down
7 changes: 7 additions & 0 deletions src/promptflow/promptflow/_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,10 @@ def copy_file_except(src_dir, dst_dir, exclude_file):
src_file_path = os.path.join(root, file)
dst_file_path = os.path.join(current_dst_dir, file)
shutil.copy2(src_file_path, dst_file_path)


def prepare_folder(path: Union[str, Path]) -> Path:
PeiwenGaoMS marked this conversation as resolved.
Show resolved Hide resolved
"""Create folder if not exists and return the folder path."""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
7 changes: 3 additions & 4 deletions src/promptflow/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional

from promptflow._constants import LANGUAGE_KEY, LINE_NUMBER_KEY, LINE_TIMEOUT_SEC, FlowLanguage
from promptflow._constants import LANGUAGE_KEY, LINE_NUMBER_KEY, LINE_TIMEOUT_SEC, OUTPUT_FILE_NAME, FlowLanguage
from promptflow._core._errors import ResumeCopyError, UnexpectedError
from promptflow._core.operation_context import OperationContext
from promptflow._utils.async_utils import async_run_allowing_running_loop
Expand Down Expand Up @@ -46,7 +46,6 @@
from promptflow.executor.flow_validator import FlowValidator
from promptflow.storage import AbstractBatchRunStorage, AbstractRunStorage

OUTPUT_FILE_NAME = "output.jsonl"
DEFAULT_CONCURRENCY = 10


Expand Down Expand Up @@ -239,13 +238,13 @@ def _copy_previous_run_result(
return the list of previous line results for the usage of aggregation and summarization.
"""
# Load the previous flow run output from output.jsonl
previous_run_output = load_list_from_jsonl(resume_from_run_output_dir / "output.jsonl")
previous_run_output = load_list_from_jsonl(resume_from_run_output_dir / OUTPUT_FILE_NAME)
previous_run_output_dict = {
each_line_output[LINE_NUMBER_KEY]: each_line_output for each_line_output in previous_run_output
}

# Copy other files from resume_from_run_output_dir to output_dir in case there are images
copy_file_except(resume_from_run_output_dir, output_dir, "output.jsonl")
copy_file_except(resume_from_run_output_dir, output_dir, OUTPUT_FILE_NAME)

try:
previous_run_results = []
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_activate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._utils.logger_utils import LogContext
from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._result import BatchResult
from promptflow.contracts._errors import FlowDefinitionError
from promptflow.contracts.run_info import FlowRunInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._sdk.entities._run import Run
from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations
from promptflow._utils.utils import dump_list_to_jsonl
from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._errors import EmptyInputsData
from promptflow.batch._result import BatchResult
from promptflow.contracts.run_info import Status
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_eager_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import pytest

from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow._constants import OUTPUT_FILE_NAME
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._result import BatchResult, LineResult
from promptflow.contracts.run_info import Status
from promptflow.executor._script_executor import ScriptExecutor
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._utils.multimedia_utils import MIME_PATTERN, _create_image_from_file, _is_url, is_multimedia_dict
from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._result import BatchResult
from promptflow.contracts.multimedia import Image
from promptflow.contracts.run_info import FlowRunInfo, RunInfo, Status
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow/tests/executor/e2etests/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._utils.logger_utils import LogContext
from promptflow.batch import BatchEngine
from promptflow.batch._result import BatchResult
Expand All @@ -21,7 +22,6 @@

TEST_LOGS_FLOW = ["print_input_flow"]
SAMPLE_FLOW_WITH_TEN_INPUTS = "simple_flow_with_ten_inputs"
OUTPUT_FILE_NAME = "output.jsonl"


def submit_batch_run(
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._core.operation_context import OperationContext
from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._result import BatchResult
from promptflow.contracts.run_mode import RunMode
from promptflow.executor import FlowExecutor
Expand Down
Loading