Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable users to register Processing Services & Pipelines #632

Merged
merged 62 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
caedaaa
Create backend model
vanessavmac Nov 6, 2024
49bea0b
Create backend status endpoint
vanessavmac Nov 7, 2024
a7116d5
Return server status and available pipelines
vanessavmac Nov 7, 2024
27d4792
Use pipeline slug
vanessavmac Nov 7, 2024
a8bb218
Fix .gitignore
vanessavmac Nov 7, 2024
a1dfcc0
Update backend status endpoint, test pipeline process images
vanessavmac Nov 11, 2024
a66ab33
fix: missing import in ml models
mihow Nov 20, 2024
c0ca4a3
Add Backend to admin, update pipeline/backend model, register_pipelin…
vanessavmac Nov 24, 2024
d5f43f5
Fix type checking
vanessavmac Nov 25, 2024
74825fe
Add backend id to test pipeline processing
vanessavmac Nov 25, 2024
fa6579a
Constant and Random pipeline processing
vanessavmac Nov 25, 2024
8ad8b57
Add test fixture
vanessavmac Nov 25, 2024
18f9b9e
Don't use same project id for all tests
vanessavmac Nov 26, 2024
d2a9acd
Added Backend created_at and updated_at serializer fields
vanessavmac Dec 16, 2024
6fe4267
Update models and display backends last checked
vanessavmac Dec 16, 2024
81c415d
Resolve merge conflicts
vanessavmac Dec 16, 2024
149f4cf
Merge branches 'feat/ml-pipeline-registry' and 'main' of https://gith…
vanessavmac Dec 16, 2024
a70a325
Remove unused variables
vanessavmac Dec 17, 2024
81d92fe
Remove unused file
vanessavmac Dec 17, 2024
20ca912
Register pipelines via frontend
vanessavmac Dec 17, 2024
6909f33
Add missing fields to backend, fix migration error after merging with…
vanessavmac Dec 18, 2024
53d08f0
Add backend details dialog
vanessavmac Dec 18, 2024
6fbdc3e
Display backend details
vanessavmac Dec 18, 2024
780c6df
Fix backend details displayed values
vanessavmac Dec 18, 2024
08eb418
Select first backend associated with pipeline
vanessavmac Dec 18, 2024
565e107
Fix linting errors
vanessavmac Dec 18, 2024
8f44493
Remove backend_id
vanessavmac Dec 18, 2024
b54e91f
Remove version/version name, fix adding project, make endpoint required
vanessavmac Dec 19, 2024
3a3cb74
Merge branch 'main' of https://github.com/RolnickLab/ami-platform int…
vanessavmac Dec 19, 2024
f09c04a
Use ErrorState component
vanessavmac Dec 19, 2024
bbc8b53
Add serializer details
vanessavmac Dec 19, 2024
661d6b1
API test to check that pipelines are created
vanessavmac Dec 19, 2024
7375e3a
Add edit backend default values
vanessavmac Dec 19, 2024
c759104
Process images using backend with lowest latency
vanessavmac Dec 19, 2024
5c2d90d
Remove projects from ML schemas
vanessavmac Dec 19, 2024
662fb1c
Resolve todos
vanessavmac Dec 19, 2024
6813219
Raise exception if no backends online
vanessavmac Dec 19, 2024
8f72fd3
Fail the job if no backends are online
vanessavmac Dec 19, 2024
ecccb79
Merge branch 'main' of https://github.com/RolnickLab/ami-platform int…
vanessavmac Jan 11, 2025
e65d210
Change MLBackend to ProcessingService
vanessavmac Jan 12, 2025
325a7c7
Change all instances of backend to processing service
vanessavmac Jan 12, 2025
78c9dec
Fix ui formatting, fix tests, and add migrations
vanessavmac Jan 12, 2025
abe6ea7
Update comment to processing service
vanessavmac Jan 12, 2025
f130bee
Update process_images error handling
vanessavmac Jan 14, 2025
f9ec5b8
Fix last_checked_live and processing services online
vanessavmac Jan 15, 2025
56a2773
Change Sync to Add Pipelines
vanessavmac Jan 15, 2025
3da4ac5
Remove updated at column for processing services
vanessavmac Jan 15, 2025
e292afa
Display column of num pipelines added
vanessavmac Jan 15, 2025
dc2e4be
Change status label of pipelines online to pipelines avaialble
vanessavmac Jan 15, 2025
6a98913
Use slugify to add processing service
vanessavmac Jan 15, 2025
c8183a3
Merge branch 'main' of github.com:RolnickLab/antenna into feat/ml-pip…
mihow Jan 17, 2025
f2a0484
fix: clean up some logging, type warnings and extra code
mihow Jan 17, 2025
dc0ac42
feat: remove slug field, update naming
mihow Jan 17, 2025
d1740b7
fix: update phrasing
mihow Jan 17, 2025
5a26508
Merge branch 'main' into feat/ml-pipeline-registry
vanessavmac Jan 18, 2025
7dc58ef
Remove print statements
vanessavmac Jan 18, 2025
4436299
Fix log formatting
vanessavmac Jan 18, 2025
4c092d1
Squash migrations
vanessavmac Jan 18, 2025
114718e
Filter processing services by project ID
vanessavmac Jan 19, 2025
fb0539a
Button indicates pipeline registration error
vanessavmac Jan 19, 2025
75911d9
Fix processing service error handling
vanessavmac Jan 25, 2025
2edcb89
Merge branch 'main' into feat/ml-pipeline-registry
mihow Jan 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions ami/jobs/migrations/0011_alter_job_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.10 on 2024-11-03 23:50

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("jobs", "0010_job_limit_job_shuffle"),
]

operations = [
migrations.AlterField(
model_name="job",
name="limit",
field=models.IntegerField(
blank=True,
default=None,
help_text="Limit the number of images to process",
null=True,
verbose_name="Limit",
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Generated by Django 4.2.10 on 2024-12-17 22:28

from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("jobs", "0011_alter_job_limit"),
("jobs", "0012_alter_job_limit"),
]

operations = []
2 changes: 1 addition & 1 deletion ami/main/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ class Detection(BaseModel):
null=True,
blank=True,
)
# Time that the detection was created by the algorithm in the ML backend
# Time that the detection was created by the algorithm in the processing service
detection_time = models.DateTimeField(null=True, blank=True)
# @TODO not sure if this detection score is ever used
# I think it was intended to be the score of the detection algorithm (bbox score)
Expand Down
11 changes: 11 additions & 0 deletions ami/ml/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .models.algorithm import Algorithm
from .models.pipeline import Pipeline
from .models.processing_service import ProcessingService


@admin.register(Algorithm)
Expand Down Expand Up @@ -57,3 +58,13 @@ class PipelineAdmin(AdminBase):
# See https://pypi.org/project/django-json-widget/
# models.JSONField: {"widget": JSONInput},
}


@admin.register(ProcessingService)
class ProcessingServiceAdmin(AdminBase):
list_display = [
"id",
"name",
"endpoint_url",
"created_at",
]
69 changes: 69 additions & 0 deletions ami/ml/migrations/0007_add_processing_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Generated by Django 4.2.10 on 2025-01-17 19:40

import ami.base.schemas
from django.db import migrations, models
import django_pydantic_field.fields


class Migration(migrations.Migration):
replaces = [
("ml", "0007_backend"),
("ml", "0008_remove_pipeline_endpoint_url_pipeline_backend"),
("ml", "0009_remove_pipeline_backend_backend_pipelines"),
("ml", "0010_backend_created_at_backend_updated_at"),
("ml", "0011_alter_pipeline_stages"),
("ml", "0012_backend_last_checked_backend_last_checked_live"),
("ml", "0013_backend_description_backend_name_backend_slug_and_more"),
("ml", "0014_remove_backend_version_remove_backend_version_name_and_more"),
("ml", "0015_processingservice_delete_backend"),
("ml", "0016_alter_processingservice_options"),
("ml", "0017_remove_processingservice_slug_and_more"),
]

dependencies = [
("main", "0038_alter_detection_path_alter_sourceimage_event_and_more"),
("ml", "0006_alter_pipeline_endpoint_url_alter_pipeline_projects"),
]

operations = [
migrations.RemoveField(
model_name="pipeline",
name="endpoint_url",
),
migrations.AlterField(
model_name="pipeline",
name="stages",
field=django_pydantic_field.fields.PydanticSchemaField(
config=None,
default=ami.base.schemas.default_stages,
help_text="The stages of the pipeline. This is mainly for display. The backend implementation of the pipeline may process data in any way.",
schema="list[PipelineStage]",
),
),
migrations.CreateModel(
name="ProcessingService",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("name", models.CharField(max_length=255)),
("description", models.TextField(blank=True)),
("endpoint_url", models.CharField(max_length=1024)),
("last_checked", models.DateTimeField(null=True)),
("last_checked_live", models.BooleanField(null=True)),
(
"pipelines",
models.ManyToManyField(blank=True, related_name="processing_services", to="ml.pipeline"),
),
(
"projects",
models.ManyToManyField(blank=True, related_name="processing_services", to="main.project"),
),
("last_checked_latency", models.FloatField(null=True)),
],
options={
"verbose_name": "Processing Service",
"verbose_name_plural": "Processing Services",
},
),
]
6 changes: 4 additions & 2 deletions ami/ml/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from .algorithm import Algorithm
from .pipeline import Pipeline
from ami.ml.models.algorithm import Algorithm
from ami.ml.models.pipeline import Pipeline
from ami.ml.models.processing_service import ProcessingService

__all__ = [
"Algorithm",
"Pipeline",
"ProcessingService",
]
3 changes: 3 additions & 0 deletions ami/ml/models/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class Meta:
["name", "version"],
]

def __str__(self):
return f'#{self.pk} "{self.name}" ({self.key}) v{self.version}'

def save(self, *args, **kwargs):
if not self.key:
self.key = slugify(self.name)
Expand Down
114 changes: 90 additions & 24 deletions ami/ml/models/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ami.ml.models import ProcessingService

import logging
import typing
from urllib.parse import urljoin

import requests
from django.db import models, transaction
Expand All @@ -22,17 +30,16 @@
TaxonRank,
update_calculated_fields_for_events,
)
from ami.ml.models.algorithm import Algorithm
from ami.ml.schemas import PipelineRequest, PipelineResponse, SourceImageRequest, SourceImageResponse
from ami.ml.tasks import celery_app, create_detection_images

from ..schemas import PipelineRequest, PipelineResponse, SourceImageRequest
from .algorithm import Algorithm

logger = logging.getLogger(__name__)


def filter_processed_images(
images: typing.Iterable[SourceImage],
pipeline: "Pipeline",
pipeline: Pipeline,
) -> typing.Iterable[SourceImage]:
"""
Return only images that need to be processed by a given pipeline for the first time (have no detections)
Expand Down Expand Up @@ -78,7 +85,7 @@ def collect_images(
source_images: list[SourceImage] | None = None,
deployment: Deployment | None = None,
job_id: int | None = None,
pipeline: "Pipeline | None" = None,
pipeline: Pipeline | None = None,
skip_processed: bool = True,
) -> typing.Iterable[SourceImage]:
"""
Expand Down Expand Up @@ -123,7 +130,7 @@ def collect_images(


def process_images(
pipeline: "Pipeline",
pipeline: Pipeline,
endpoint_url: str,
images: typing.Iterable[SourceImage],
job_id: int | None = None,
Expand Down Expand Up @@ -157,37 +164,47 @@ def process_images(
detections=[],
total_time=0,
)
task_logger.info(f"Sending {len(images)} images to ML backend {pipeline.slug}")
task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}")
urls = [source_image.public_url() for source_image in images if source_image.public_url()]

source_images = [
SourceImageRequest(
id=str(source_image.pk),
url=url,
)
for source_image, url in zip(images, urls)
if url
]

request_data = PipelineRequest(
pipeline=pipeline.slug,
source_images=[
SourceImageRequest(
id=str(source_image.pk),
url=url,
)
for source_image, url in zip(images, urls)
if url
],
source_images=source_images,
)

resp = requests.post(endpoint_url, json=request_data.dict())
if not resp.ok:
try:
msg = resp.json()["detail"]
except Exception:
msg = resp.content
msg = str(resp.content)
if job:
job.logger.error(msg)
else:
logger.error(msg)

resp.raise_for_status()
results = PipelineResponse(
pipeline=pipeline.slug,
total_time=0,
source_images=[
SourceImageResponse(id=source_image.id, url=source_image.url) for source_image in source_images
],
detections=[],
errors=msg,
)
return results

results = resp.json()
results = PipelineResponse(**results)

if job:
job.logger.debug(f"Results: {results}")
detections = results.detections
Expand Down Expand Up @@ -217,7 +234,7 @@ def save_results(results: PipelineResponse | None = None, results_json: str | No

pipeline, _created = Pipeline.objects.get_or_create(slug=results.pipeline, defaults={"name": results.pipeline})
if _created:
logger.warning(f"Pipeline choice returned by the ML backend was not recognized! {pipeline}")
logger.warning(f"Pipeline choice returned by the Processing Service was not recognized! {pipeline}")
created_objects.append(pipeline)
algorithms_used = set()

Expand Down Expand Up @@ -396,7 +413,7 @@ class Pipeline(BaseModel):
version = models.IntegerField(default=1)
version_name = models.CharField(max_length=255, blank=True)
# @TODO the algorithms list be retrieved by querying the pipeline endpoint
algorithms = models.ManyToManyField(Algorithm, related_name="pipelines")
algorithms = models.ManyToManyField("ml.Algorithm", related_name="pipelines")
stages: list[PipelineStage] = SchemaField(
default=default_stages,
help_text=(
Expand All @@ -405,7 +422,7 @@ class Pipeline(BaseModel):
),
)
projects = models.ManyToManyField("main.Project", related_name="pipelines", blank=True)
endpoint_url = models.CharField(max_length=1024, null=True, blank=True)
processing_services: models.QuerySet[ProcessingService]

class Meta:
ordering = ["name", "version"]
Expand All @@ -414,6 +431,9 @@ class Meta:
["name", "version"],
]

def __str__(self):
return f'#{self.pk} "{self.name}" ({self.slug}) v{self.version}'

def collect_images(
self,
collection: SourceImageCollection | None = None,
Expand All @@ -431,11 +451,57 @@ def collect_images(
skip_processed=skip_processed,
)

def choose_processing_service_for_pipeline(self, job_id, pipeline_name) -> ProcessingService:
# @TODO use the cached `last_checked_latency` and a max age to avoid checking every time

job = None
task_logger = logger
if job_id:
from ami.jobs.models import Job

job = Job.objects.get(pk=job_id)
task_logger = job.logger

processing_services = self.processing_services.all()

# check the status of all processing services
timeout = 5 * 60.0 # 5 minutes
lowest_latency = timeout
processing_services_online = False

for processing_service in processing_services:
status_response = processing_service.get_status() # @TODO pass timeout to get_status()
if status_response.server_live:
processing_services_online = True
if status_response.latency < lowest_latency:
lowest_latency = status_response.latency
# pick the processing service that has lowest latency
processing_service_lowest_latency = processing_service

# if all offline then throw error
if not processing_services_online:
msg = f'No processing services are online for the pipeline "{pipeline_name}".'
task_logger.error(msg)

raise Exception(msg)
else:
task_logger.info(
f"Using processing service with latency {round(lowest_latency, 4)}: "
f"{processing_service_lowest_latency}"
)

return processing_service_lowest_latency

def process_images(self, images: typing.Iterable[SourceImage], job_id: int | None = None):
if not self.endpoint_url:
raise ValueError("No endpoint URL configured for this pipeline")
processing_service = self.choose_processing_service_for_pipeline(job_id, self.name)

if not processing_service.endpoint_url:
raise ValueError(
f"No endpoint URL configured for this pipeline's processing service ({processing_service})"
)

return process_images(
endpoint_url=self.endpoint_url,
endpoint_url=urljoin(processing_service.endpoint_url, "/process_images"),
pipeline=self,
images=images,
job_id=job_id,
Expand Down
Loading
Loading