Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New pipeline deployment system #58

Merged
merged 23 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
98f62a0
Moved YAML-deployment test files in test_files/yaml folder
mpangrazzi Jan 17, 2025
b599061
Add save_pipeline_files and load_pipeline_module deploy utils
mpangrazzi Jan 20, 2025
b200a18
Add test ; cleanup
mpangrazzi Jan 20, 2025
fd9ce6d
Add self.pipeline in ABC
mpangrazzi Jan 20, 2025
4307346
Update registry to support BasePipelineWrapper instances ; update dra…
mpangrazzi Jan 20, 2025
597fa5c
Fix for python 3.9
mpangrazzi Jan 20, 2025
5fb3e29
Add custom formetter to print out extra info when using .bind()
mpangrazzi Jan 23, 2025
d135c38
fixed registry tests ; remove .get() use_pipeline arg
mpangrazzi Jan 23, 2025
6c94999
add /deploy_files route and implement new deployment system methods ;…
mpangrazzi Jan 23, 2025
fddcc1f
Fix / update tests
mpangrazzi Jan 23, 2025
62a0639
add CLI for deploy-files command
mpangrazzi Jan 23, 2025
61bb50e
handle exceptions during pipeline execution on dynamically added API …
mpangrazzi Jan 23, 2025
8a677aa
Ignore default pipelines folder
mpangrazzi Jan 23, 2025
c448a38
Rewrote pipeline loading logic at startup
mpangrazzi Jan 23, 2025
d9916b6
Add tests for pipelines loading at app startup
mpangrazzi Jan 23, 2025
08b45de
Update logger format
mpangrazzi Jan 23, 2025
2078bfa
Fix filename
mpangrazzi Jan 24, 2025
5f6809d
Unified tests on pipeline deploy at startup ; add test for yaml + fil…
mpangrazzi Jan 24, 2025
32c0266
Fix run_api args ; Update docstrings
mpangrazzi Jan 24, 2025
5986cfa
At least one between run_chat and run_api should be implemented ; Add…
mpangrazzi Jan 27, 2025
49af062
Updated docstrings
mpangrazzi Jan 27, 2025
da427f2
Refactoring
mpangrazzi Jan 27, 2025
67801e0
Add comment reference
mpangrazzi Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,7 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/


# Pipelines default folder
/pipelines
2 changes: 2 additions & 0 deletions src/hayhooks/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0
import click

from hayhooks.cli.deploy_files import deploy_files
from hayhooks.cli.run import run
from hayhooks.cli.deploy import deploy
from hayhooks.cli.status import status
Expand All @@ -22,3 +23,4 @@ def hayhooks(ctx, server, disable_ssl):
hayhooks.add_command(deploy)
hayhooks.add_command(status)
hayhooks.add_command(undeploy)
hayhooks.add_command(deploy_files)
34 changes: 34 additions & 0 deletions src/hayhooks/cli/deploy_files/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import click
import requests
from pathlib import Path
from urllib.parse import urljoin
from hayhooks.server.utils.deploy_utils import read_pipeline_files_from_folder


@click.command()
@click.pass_obj
@click.option('-n', '--name', required=True, help="Name of the pipeline to deploy")
@click.argument('folder', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path))
def deploy_files(server_conf, name, folder):
"""Deploy all pipeline files from a folder to the Hayhooks server."""
server, disable_ssl = server_conf

files_dict = {}
try:
files_dict = read_pipeline_files_from_folder(folder)

if not files_dict:
click.echo("Error: No valid files found in the specified folder")
return

resp = requests.post(
urljoin(server, "deploy_files"), json={"name": name, "files": files_dict}, verify=not disable_ssl
)

if resp.status_code >= 400:
click.echo(f"Error deploying pipeline: {resp.json().get('detail')}")
else:
click.echo(f"Pipeline successfully deployed with name: {resp.json().get('name')}")

except Exception as e:
click.echo(f"Error processing folder: {str(e)}")
99 changes: 85 additions & 14 deletions src/hayhooks/server/app.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,72 @@
import logging
import os
import glob
from fastapi import FastAPI
from pathlib import Path
from hayhooks.server.utils.deploy_utils import deploy_pipeline_def, PipelineDefinition
from hayhooks.server.utils.deploy_utils import (
deploy_pipeline_def,
PipelineDefinition,
deploy_pipeline_files,
read_pipeline_files_from_folder,
)
from hayhooks.server.routers import status_router, draw_router, deploy_router, undeploy_router
from hayhooks.settings import settings
from hayhooks.server.logger import log

logger = logging.getLogger("uvicorn.info")

def deploy_yaml_pipeline(app: FastAPI, pipeline_file_path: Path) -> dict:
"""
Deploy a pipeline from a YAML file.

Args:
app: FastAPI application instance
pipeline_file_path: Path to the YAML pipeline definition

Returns:
dict: Deployment result containing pipeline name
"""
name = pipeline_file_path.stem
with open(pipeline_file_path, "r") as pipeline_file:
source_code = pipeline_file.read()

pipeline_definition = PipelineDefinition(name=name, source_code=source_code)
deployed_pipeline = deploy_pipeline_def(app, pipeline_definition)
log.info(f"Deployed pipeline from yaml: {deployed_pipeline['name']}")
return deployed_pipeline


def deploy_files_pipeline(app: FastAPI, pipeline_dir: Path) -> dict:
"""
Deploy a pipeline from a directory containing multiple files.

Args:
app: FastAPI application instance
pipeline_dir: Path to the pipeline directory

Returns:
dict: Deployment result containing pipeline name
"""
name = pipeline_dir.name
files = read_pipeline_files_from_folder(pipeline_dir)

if files:
deployed_pipeline = deploy_pipeline_files(app, name, files)
log.info(f"Deployed pipeline from directory: {deployed_pipeline['name']}")
return deployed_pipeline
return {"name": name}


def create_app() -> FastAPI:
"""
Create and configure a FastAPI application.

This function initializes a FastAPI application with the following features:
- Configures root path from settings if provided
- Includes all router endpoints (status, draw, deploy, undeploy)
- Auto-deploys pipelines from the configured pipelines directory:
- YAML pipeline definitions (*.yml, *.yaml)
- Pipeline folders containing multiple files

Returns:
FastAPI: Configured FastAPI application instance
"""
if root_path := settings.root_path:
app = FastAPI(root_path=root_path)
else:
Expand All @@ -26,15 +82,30 @@ def create_app() -> FastAPI:
pipelines_dir = settings.pipelines_dir

if pipelines_dir:
logger.info(f"Pipelines dir set to: {pipelines_dir}")
for pipeline_file_path in glob.glob(f"{pipelines_dir}/*.y*ml"):
name = Path(pipeline_file_path).stem
with open(pipeline_file_path, "r") as pipeline_file:
source_code = pipeline_file.read()

pipeline_defintion = PipelineDefinition(name=name, source_code=source_code)
deployed_pipeline = deploy_pipeline_def(app, pipeline_defintion)
logger.info(f"Deployed pipeline: {deployed_pipeline['name']}")
log.info(f"Pipelines dir set to: {pipelines_dir}")
pipelines_path = Path(pipelines_dir)

yaml_files = list(pipelines_path.glob("*.y*ml"))
pipeline_dirs = [d for d in pipelines_path.iterdir() if d.is_dir()]

mpangrazzi marked this conversation as resolved.
Show resolved Hide resolved
if yaml_files:
log.info(f"Deploying {len(yaml_files)} pipeline(s) from YAML files")
for pipeline_file_path in yaml_files:
try:
deploy_yaml_pipeline(app, pipeline_file_path)
except Exception as e:
log.warning(f"Skipping pipeline file {pipeline_file_path}: {str(e)}")
continue

if pipeline_dirs:
log.info(f"Deploying {len(pipeline_dirs)} pipeline(s) from folders")
for pipeline_dir in pipeline_dirs:
try:
deploy_files_pipeline(app, pipeline_dir)
except Exception as e:
log.warning(f"Skipping pipeline folder {pipeline_dir}: {str(e)}")
continue

return app


Expand Down
20 changes: 20 additions & 0 deletions src/hayhooks/server/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class PipelineFilesError(Exception):
"""Exception for errors saving pipeline files."""

pass


class PipelineWrapperError(Exception):
"""Exception for errors loading pipeline wrapper."""

pass


class PipelineModuleLoadError(Exception):
"""Exception for errors loading pipeline module."""


class PipelineAlreadyExistsError(Exception):
"""Exception for errors when a pipeline already exists."""

pass
16 changes: 13 additions & 3 deletions src/hayhooks/server/logger.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
# logger.py

import os
import sys
from loguru import logger as log


def formatter(record):
if record["extra"]:
return "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | <level>{message}</level> - <magenta>{extra}</magenta>\n"

return "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | <level>{message}</level>\n"


log.remove()
log.add(sys.stderr, level=os.getenv("LOG", "INFO").upper())
log.add(
sys.stderr,
level=os.getenv("LOG", "INFO").upper(),
format=formatter
)
27 changes: 17 additions & 10 deletions src/hayhooks/server/pipelines/registry.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
from typing import Optional
from typing import Optional, Union

from haystack import Pipeline
from haystack.core.errors import PipelineError
from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper

PipelineType = Union[Pipeline, BasePipelineWrapper]


class _PipelineRegistry:
def __init__(self) -> None:
self._pipelines: dict[str, Pipeline] = {}
self._pipelines: dict[str, PipelineType] = {}

def add(self, name: str, source: str) -> Pipeline:
def add(self, name: str, source_or_pipeline: Union[str, PipelineType]) -> PipelineType:
if name in self._pipelines:
msg = f"A pipeline with name {name} is already in the registry."
raise ValueError(msg)

try:
self._pipelines[name] = Pipeline.loads(source)
except PipelineError as e:
msg = f"Unable to parse Haystack Pipeline {name}: {e}"
raise ValueError(msg) from e
if isinstance(source_or_pipeline, (Pipeline, BasePipelineWrapper)):
pipeline = source_or_pipeline
else:
try:
pipeline = Pipeline.loads(source_or_pipeline)
except PipelineError as e:
msg = f"Unable to parse Haystack Pipeline {name}: {e}"
raise ValueError(msg) from e

return self._pipelines[name]
self._pipelines[name] = pipeline
return pipeline

def remove(self, name: str):
if name in self._pipelines:
del self._pipelines[name]

def get(self, name: str) -> Optional[Pipeline]:
def get(self, name: str) -> Optional[PipelineType]:
return self._pipelines.get(name)

def get_names(self) -> list[str]:
Expand Down
35 changes: 33 additions & 2 deletions src/hayhooks/server/routers/deploy.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,40 @@
from fastapi import APIRouter, Request
from hayhooks.server.utils.deploy_utils import deploy_pipeline_def, PipelineDefinition
from fastapi import APIRouter, Request, HTTPException
from pydantic import BaseModel
from typing import Dict
from hayhooks.server.exceptions import PipelineAlreadyExistsError
from hayhooks.server.utils.deploy_utils import (
deploy_pipeline_def,
PipelineDefinition,
deploy_pipeline_files,
PipelineFilesError,
PipelineModuleLoadError,
PipelineWrapperError,
)

router = APIRouter()


class PipelineFilesRequest(BaseModel):
name: str
files: Dict[str, str]


@router.post("/deploy", tags=["config"])
async def deploy(pipeline_def: PipelineDefinition, request: Request):
return deploy_pipeline_def(request.app, pipeline_def)


@router.post("/deploy_files", tags=["config"])
async def deploy_files(pipeline_files: PipelineFilesRequest, request: Request):
try:
return deploy_pipeline_files(request.app, pipeline_files.name, pipeline_files.files)
except PipelineFilesError as e:
raise HTTPException(status_code=500, detail=str(e))
except PipelineModuleLoadError as e:
raise HTTPException(status_code=422, detail=str(e))
except PipelineWrapperError as e:
raise HTTPException(status_code=422, detail=str(e))
except PipelineAlreadyExistsError as e:
raise HTTPException(status_code=409, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error deploying pipeline: {str(e)}")
5 changes: 5 additions & 0 deletions src/hayhooks/server/routers/draw.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
from fastapi import APIRouter, HTTPException
from fastapi.responses import FileResponse
from hayhooks.server.pipelines import registry
from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper

router = APIRouter()


@router.get("/draw/{pipeline_name}", tags=["config"])
async def draw(pipeline_name):
pipeline = registry.get(pipeline_name)

if isinstance(pipeline, BasePipelineWrapper):
pipeline = pipeline.pipeline

if not pipeline:
raise HTTPException(status_code=404)

Expand Down
46 changes: 46 additions & 0 deletions src/hayhooks/server/utils/base_pipeline_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from abc import ABC, abstractmethod
from typing import List


class BasePipelineWrapper(ABC):
def __init__(self):
self.pipeline = None

@abstractmethod
def setup(self) -> None:
"""
Setup the pipeline.

This method should be called before using the pipeline.
"""
pass

@abstractmethod
def run_api(self, urls: List[str], question: str) -> dict:
anakin87 marked this conversation as resolved.
Show resolved Hide resolved
"""
Run the pipeline in API mode.

Args:
urls: List of URLs to fetch content from
question: Question to be answered

Returns:
dict: Pipeline execution results
"""
anakin87 marked this conversation as resolved.
Show resolved Hide resolved
pass

@abstractmethod
def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

user_message and messages sound confusing to me...
user_message and history would be better, for example.
But maybe there is some reason I'm overlooking...


Let's also discuss on support/conversion of Haystack ChatMessage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The incoming request looks like this one, but I agree, user_message (which is the last message) and history will be less confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a dev, I can still return generator here? Or somewhere else?

"""
Run the pipeline in chat mode.

Args:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watch out for Google pydoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct about run_api. I am about to rewrite base_pipeline_wrapper docstrings because they're not very precise.

user_message: Message from the user
model_id: ID of the model to use
messages: List of previous messages
body: Additional request body parameters

Returns:
dict: Pipeline execution results
"""
pass
Loading
Loading