Skip to content

Commit

Permalink
feat: release changes for version 0.4.10
Browse files Browse the repository at this point in the history
  • Loading branch information
231y committed Dec 6, 2024
1 parent 1c60be9 commit 6bbe27a
Show file tree
Hide file tree
Showing 17 changed files with 828 additions and 106 deletions.
9 changes: 7 additions & 2 deletions galaxy/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import importlib
import logging
from datetime import datetime
Expand All @@ -12,7 +13,7 @@
from fastapi import Depends, FastAPI, HTTPException

from galaxy.core.galaxy import Integration, run_integration
from galaxy.core.logging import get_log_format
from galaxy.core.logging import get_log_format, get_magneto_logs
from galaxy.core.magneto import Magneto
from galaxy.core.mapper import Mapper
from galaxy.core.models import SchedulerJobStates
Expand Down Expand Up @@ -56,8 +57,12 @@ def job_listener(event):
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_SUBMITTED | EVENT_JOB_REMOVED)

async def _run_integration():
if (log_handler := get_magneto_logs(logger)) is not None:
log_handler.logs.clear()

fresh_instance = copy.deepcopy(instance)
async with Magneto(instance.config.rely.url, instance.config.rely.token, logger=logger) as magneto_client:
success = await run_integration(instance, magneto_client=magneto_client, logger=app.state.logger)
success = await run_integration(fresh_instance, magneto_client=magneto_client, logger=logger)
if success:
logger.info("Integration %r run completed successfully: %r", instance.type_, instance.id_)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
from logging import Logger

from fastapi import APIRouter, Depends

from galaxy.core.magneto import Magneto
from galaxy.core.mapper import Mapper
from galaxy.core.utils import get_mapper, get_logger, get_magneto_client

import logging

router = APIRouter(prefix="/{{cookiecutter.integration_name}}", tags=["{{cookiecutter.integration_name}}"])


@router.post("/webhook")
async def {{cookiecutter.integration_name}}_webhook(
event: dict,
mapper: Mapper = Depends(get_mapper),
logger: logging = Depends(get_logger),
logger: Logger = Depends(get_logger),
magneto_client: Magneto = Depends(get_magneto_client),
) -> dict:
entity = await mapper.process("entities", [event])
logger.info(f"Received entity: {entity}")
return {"message": "received gitlab webhook"}
) -> None:
try:
entity, *_ = await mapper.process("entities", [event])
logger.info("Received entity: %s", entity)
except Exception:
...
return

try:
await magneto_client.upsert_entity(entity)
except Exception:
...
123 changes: 72 additions & 51 deletions galaxy/core/mapper.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,66 @@
import asyncio
import re
from typing import Any

import jq
import yaml

from galaxy.core.resources import load_integration_resource
from galaxy.utils.concurrency import run_in_thread

__all__ = ["Mapper"]
__all__ = ["Mapper", "MapperError", "MapperNotFoundError", "MapperCompilationError"]


class Mapper:
MAPPINGS_FILE_PATH: str = ".rely/mappings.yaml"

def __init__(self, integration_name: str):
self.integration_name = integration_name
self.id_allowed_chars = "[^a-zA-Z0-9-]"

async def _load_mapping(self, mapping_kind: str) -> list[dict]:
mappings = yaml.safe_load(load_integration_resource(self.integration_name, ".rely/mappings.yaml"))
return [mapping for mapping in mappings.get("resources") if mapping["kind"] == mapping_kind]

def _compile_mappings(self, mapping: dict) -> dict:
compiled_mapping = {}
for key, value in mapping.items():
if isinstance(value, dict):
compiled_mapping[key] = self._compile_mappings(value)
elif isinstance(value, list):
compiled_mapping[key] = [
self._compile_mappings(item) if isinstance(item, dict) else item for item in value
]
else:
try:
compiled_mapping[key] = jq.compile(value) if isinstance(value, str) else value
except Exception as e:
raise Exception(f"Error compiling maps for key {key} with expression {value}: {e}")
self._mappings: dict[str, dict[str, Any]] | None = None
self._compiled_mappings: dict[str, dict[str, Any]] = {}

@property
def mappings(self) -> dict[str, dict[str, Any]]:
if self._mappings is None:
mappings = yaml.safe_load(load_integration_resource(self.integration_name, self.MAPPINGS_FILE_PATH))
self._mappings = {mapping["kind"]: mapping["mappings"] for mapping in mappings.get("resources") or []}
return self._mappings

def get_compiled_mappings(self, mapping_kind: str) -> list[Any]:
if mapping_kind not in self._compiled_mappings:
try:
self._compiled_mappings[mapping_kind] = self._compile_mappings(self.mappings.get(mapping_kind) or {})
except Exception as e:
raise MapperCompilationError(mapping_kind) from e
return self._compiled_mappings[mapping_kind]

def _compile_mappings(self, item: Any) -> Any:
if isinstance(item, dict):
return {key: self._compile_mappings(value) for key, value in item.items()}
if isinstance(item, list | tuple | set):
return [self._compile_mappings(value) for value in item]
if isinstance(item, str):
try:
return jq.compile(item)
except Exception as e:
raise Exception(f"Error compiling maps with expression {item}: {e}") from e
return item

def _map_data(self, compiled_mapping: Any, context: dict[str, Any]) -> Any:
if isinstance(compiled_mapping, dict):
return {key: self._map_data(value, context) for key, value in compiled_mapping.items()}
if isinstance(compiled_mapping, list):
return [self._map_data(item, context) for item in compiled_mapping]
if isinstance(compiled_mapping, jq._Program):
try:
return compiled_mapping.input(context).first()
except Exception as e:
raise Exception(f"Error mapping with expression {compiled_mapping} and payload {compiled_mapping}: {e}")
return compiled_mapping

def _map_entity(self, compiled_mapping: dict, json_data: dict) -> dict:
entity = {}

for key, value in compiled_mapping.items():
if isinstance(value, dict):
entity[key] = self._map_entity(value, json_data)
elif isinstance(value, list):
entity[key] = [self._map_entity(item, json_data) if isinstance(item, dict) else item for item in value]
else:
try:
entity[key] = value.input(json_data).first() if isinstance(value, jq._Program) else value
except Exception as e:
raise Exception(f"Error mapping key {key} with expression {value} and payload {json_data}: {e}")

return self._sanitize(entity)
def _map_entity(self, compiled_mapping: dict, json_data: dict[str, Any]) -> dict:
return self._sanitize(self._map_data(compiled_mapping, json_data))

def _replace_non_matching_characters(self, input_string: str, regex_pattern: str) -> str:
res = re.sub(regex_pattern, ".", input_string)
Expand Down Expand Up @@ -77,21 +88,31 @@ def _sanitize(self, entity: dict) -> dict:

return entity

async def process(self, mapping_kind: str, json_data: list[dict], context=None) -> tuple[Any]:
try:
mappings = await self._load_mapping(mapping_kind)
if not mappings:
raise Exception(f"Unknown Mapper {mapping_kind}")
compiled_mappings = self._compile_mappings(mappings[0]["mappings"])
def process_sync(self, mapping_kind: str, json_data: list[dict], context: Any | None = None) -> list[Any]:
mappings = self.get_compiled_mappings(mapping_kind)
if not mappings:
raise MapperNotFoundError(mapping_kind)
return [self._map_entity(mappings, {**each, "context": context}) for each in json_data]

loop = asyncio.get_running_loop()
async def process(self, mapping_kind: str, json_data: list[dict], context: Any | None = None) -> tuple[Any]:
# There is no advantage in using async here as all the work is done in a thread.
# Keeping it as async for now to avoid breaking existing code that calls this as `await mapper.process(...)`.
return await run_in_thread(self.process_sync, mapping_kind, json_data, context)

entities = await asyncio.gather(
*[
loop.run_in_executor(None, self._map_entity, compiled_mappings, {**each, "context": context})
for each in json_data
]
)
return entities
except Exception as e:
raise e

class MapperError(Exception):
"""Base class for Mapper errors."""


class MapperNotFoundError(MapperError):
"""Mapper not found error."""

def __init__(self, mapping_kind: str):
super().__init__(f"Unknown Mapper {mapping_kind}")


class MapperCompilationError(MapperError):
"""Mapper compilation error."""

def __init__(self, mapping_kind: str):
super().__init__(f"Error compiling mappings for kind {mapping_kind}")
1 change: 1 addition & 0 deletions galaxy/integrations/flux/.rely/automations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Loading

0 comments on commit 6bbe27a

Please sign in to comment.