Skip to content

Commit

Permalink
Integration fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed Jan 8, 2025
1 parent a141b2a commit 346adf3
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 36 deletions.
40 changes: 11 additions & 29 deletions mula/logging.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,72 +9,54 @@
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"level": "DEBUG",
"formatter": "default",
"stream": "ext://sys.stdout"
}
},
"root": {
"level": "INFO",
"handlers": [
"console"
]
"level": "DEBUG",
"handlers": ["console"]
},
"loggers": {
"alembic.runtime.migration": {
"level": "CRITICAL",
"handlers": [
"console"
],
"handlers": ["console"],
"propagate": 0
},
"urllib3.connectionpool": {
"level": "CRITICAL",
"handlers": [
"console"
],
"handlers": ["console"],
"propagate": 0
},
"uvicorn.error": {
"level": "CRITICAL",
"handlers": [
"console"
],
"handlers": ["console"],
"propagate": 0
},
"uvicorn.access": {
"level": "CRITICAL",
"handlers": [
"console"
],
"handlers": ["console"],
"propagate": 0
},
"pika": {
"level": "CRITICAL",
"handlers": [
"console"
],
"handlers": ["console"],
"propagate": 0
},
"sqlalchemy.engine": {
"level": "CRITICAL",
"handlers": [
"console"
],
"handlers": ["console"],
"propagate": 0
},
"httpx": {
"level": "CRITICAL",
"handlers": [
"console"
],
"handlers": ["console"],
"propagate": 0
},
"httpcore": {
"level": "CRITICAL",
"handlers": [
"console"
],
"handlers": ["console"],
"propagate": 0
}
}
Expand Down
2 changes: 1 addition & 1 deletion mula/scheduler/models/ooi.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ class ScanProfileMutation(BaseModel):
operation: MutationOperationType
primary_key: str
value: OOI | None
organisation: str
client_id: str
9 changes: 5 additions & 4 deletions mula/scheduler/schedulers/schedulers/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def process_mutations(self, body: bytes) -> None:
"""
try:
# Convert body into a ScanProfileMutation
self.logger.info(body)
mutation = models.ScanProfileMutation.model_validate_json(body)
self.logger.debug(
"Received scan level mutation %s for: %s",
Expand Down Expand Up @@ -115,7 +116,7 @@ def process_mutations(self, body: bytes) -> None:
return

# What available boefjes do we have for this ooi?
boefjes = self.get_boefjes_for_ooi(ooi, mutation.organisation)
boefjes = self.get_boefjes_for_ooi(ooi, mutation.client_id)
if not boefjes:
self.logger.debug("No boefjes available for %s", ooi.primary_key, scheduler_id=self.scheduler_id)
return
Expand Down Expand Up @@ -151,7 +152,7 @@ def process_mutations(self, body: bytes) -> None:
"Based on boefje run on type, skipping",
boefje_id=boefje.id,
ooi_primary_key=ooi.primary_key,
organisation_id=mutation.organisation,
organisation_id=mutation.client_id,
scheduler_id=self.scheduler_id,
)
continue
Expand All @@ -160,7 +161,7 @@ def process_mutations(self, body: bytes) -> None:
models.BoefjeTask(
boefje=models.Boefje.model_validate(boefje.model_dump()),
input_ooi=ooi.primary_key if ooi else None,
organization=mutation.organisation,
organization=mutation.client_id,
)
)

Expand All @@ -171,7 +172,7 @@ def process_mutations(self, body: bytes) -> None:
executor.submit(
self.push_boefje_task,
boefje_task,
mutation.organisation,
mutation.client_id,
create_schedule,
self.process_mutations.__name__,
)
Expand Down
14 changes: 13 additions & 1 deletion mula/scheduler/schedulers/schedulers/normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def run(self) -> None:
"Normalizer scheduler started", scheduler_id=self.scheduler_id, item_type=self.queue.item_type.__name__
)

# TODO: exception handling
@tracer.start_as_current_span("process_raw_data")
def process_raw_data(self, body: bytes) -> None:
"""Create tasks for the received raw data.
Expand Down Expand Up @@ -85,12 +86,23 @@ def process_raw_data(self, body: bytes) -> None:
)
return

# TODO: deduplication
# Get all normalizers for the mime types of the raw data
normalizers = []
for mime_type in latest_raw_data.raw_data.mime_types:
normalizers_by_mime_type = self.get_normalizers_for_mime_type(mime_type, latest_raw_data.organization)
normalizers_by_mime_type = self.get_normalizers_for_mime_type(
mime_type.get("value"), latest_raw_data.organization
)
normalizers.extend(normalizers_by_mime_type)

self.logger.debug(
"Found normalizers for raw data",
raw_data_id=latest_raw_data.raw_data.id,
mime_types=[mime_type.get("value") for mime_type in latest_raw_data.raw_data.mime_types],
normalizers=[normalizer.id for normalizer in normalizers],
scheduler_id=self.scheduler_id,
)

# Create tasks for the normalizers
normalizer_tasks = []
for normalizer in normalizers:
Expand Down
5 changes: 4 additions & 1 deletion mula/scheduler/server/handlers/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastapi import status

from scheduler import context, models, schedulers, storage
from scheduler.models.scheduler import SchedulerType
from scheduler.schedulers.queue import NotAllowedError, QueueFullError
from scheduler.server import serializers, utils
from scheduler.server.errors import BadRequestError, ConflictError, NotFoundError, TooManyRequestsError
Expand Down Expand Up @@ -71,7 +72,9 @@ def pop(
limit: int = 100,
filters: storage.filters.FilterRequest | None = None,
) -> utils.PaginatedResponse:
results, count = self.ctx.datastores.pq_store.pop(offset=offset, limit=limit, filters=filters)
results, count = self.ctx.datastores.pq_store.pop(
scheduler_id=scheduler_id, offset=offset, limit=limit, filters=filters
)

# Update status for popped items
self.ctx.datastores.pq_store.bulk_update_status(
Expand Down

0 comments on commit 346adf3

Please sign in to comment.