Skip to content

Rammearkitektur AMQP framework (FastAPI + RAMQP)

License

Notifications You must be signed in to change notification settings

OS2mo/fastrampqi

Repository files navigation

FastRAMQPI

FastRAMQPI is an opinionated library for OS2mo integrations.

Usage

from typing import Any

from fastapi import APIRouter
from fastapi import FastAPI
from pydantic import BaseSettings
from pydantic import Field

from fastramqpi import depends
from fastramqpi.config import Settings as FastRAMQPISettings
from fastramqpi.main import FastRAMQPI
from fastramqpi.ramqp.depends import Context
from fastramqpi.ramqp.depends import rate_limit
from fastramqpi.ramqp.mo import MORouter
from fastramqpi.ramqp.mo import PayloadUUID


class Settings(BaseSettings):
    class Config:
        frozen = True
        env_nested_delimiter = "__"

    fastramqpi: FastRAMQPISettings = Field(
        default_factory=FastRAMQPISettings, description="FastRAMQPI settings"
    )


fastapi_router = APIRouter()
amqp_router = MORouter()


@amqp_router.register("engagement", dependencies=[Depends(rate_limit(10))])
async def listen_to_engagements(
    context: Context,
    graphql_session: depends.LegacyGraphQLSession,
    uuid: PayloadUUID,
) -> None:
    print(uuid)


def create_fastramqpi(**kwargs: Any) -> FastRAMQPI:
    settings = Settings(**kwargs)
    fastramqpi = FastRAMQPI(
        application_name="os2mo-example-integration",
        settings=settings.fastramqpi,
        graphql_version=20,
    )
    fastramqpi.add_context(settings=settings)

    # Add our AMQP router(s)
    amqpsystem = fastramqpi.get_amqpsystem()
    amqpsystem.router.registry.update(amqp_router.registry)

    # Add our FastAPI router(s)
    app = fastramqpi.get_app()
    app.include_router(fastapi_router)

    return fastramqpi


def create_app(**kwargs: Any) -> FastAPI:
    fastramqpi = create_fastramqpi(**kwargs)
    return fastramqpi.get_app()

Metrics

FastRAMQPI Metrics are exported via prometheus/client_python on the FastAPI's /metrics.

Debugging

FastRAMQPI ships with support for debugging via DAP. To enable it set the DAP environmental variable to true, and expose the debugging port (5678).

For instance in a docker-compose.yaml file, by merging in:

version: "3"
services:
  mo_ldap_import_export:
    environment:
      DAP: "true"
    ports:
      - "127.0.0.0:5678:5678"

For ease of use, this should be the default for projects using FastRAMQPI.

Autogenerated GraphQL Client

FastRAMQPI exposes an authenticated httpx client through the dependency injection system. While it is possible to call the OS2mo API directly through it, the recommended approach is to define a properly-typed GraphQL client in the integration and configure it to make calls through the authenticated client. Instead of manually implementing such client, we strongly recommend to use the Ariadne Code Generator, which generates an integration-specific client based on the general OS2mo GraphQL schema and the exact queries and mutations the integration requires.

To integrate such client, first add and configure the codegen:

# pyproject.toml

[tool.poetry.group.dev.dependencies]
ariadne-codegen = "0.13.0"

[tool.ariadne-codegen]
# Ideally, the GraphQL client is generated as part of the build process and
# never committed to git. Unfortunately, most of our tools and CI analyses the
# project directly as it is in Git. In the future - when the CI templates
# operate on the built container image - only the definition of the schema and
# queries should be checked in.
#
# The default package name is `graphql_client`. Make it more obvious that the
# files are not to be modified manually.
target_package_name = "autogenerated_graphql_client"
target_package_path = "my_integration/"
client_name = "GraphQLClient"
schema_path = "schema.graphql"  # curl -O http://localhost:5000/graphql/v8/schema.graphql
queries_path = "queries.graphql"
include_all_inputs = false
include_all_enums = false
plugins = [
    # Return values directly when only a single top field is requested
    "ariadne_codegen.contrib.shorter_results.ShorterResultsPlugin",
]
[tool.ariadne-codegen.scalars.DateTime]
type = "datetime.datetime"
parse = "fastramqpi.ariadne.parse_graphql_datetime"
[tool.ariadne-codegen.scalars.UUID]
type = "uuid.UUID"

Where you replace "my_integration/" with the path to your integration.

Grab OS2mo's GraphQL schema:

curl -O http://localhost:5000/graphql/v20/schema.graphql

Define your queries:

# queries.graphql

# SPDX-FileCopyrightText: Magenta ApS <https://magenta.dk>
# SPDX-License-Identifier: MPL-2.0

query Version {
  version {
    mo_version
    mo_hash
  }
}

Generate the client - you may have to activate some virtual environment:

ariadne-codegen

The client class is passed to FastRAMQPI on startup as shown below. This will ensure it is automatically opened and closed and configured with authentication. NOTE: remember to update the graphql_version when you generate against a new schema version.

# app.py
from autogenerated_graphql_client import GraphQLClient


def create_app(**kwargs: Any) -> FastAPI:
    fastramqpi = FastRAMQPI(
        ...,
        graphql_version=20,
        graphql_client_cls=GraphQLClient,
    )

The FastRAMQPI framework cannot define the annotated type for the GraphQL client since its methods depend on the specific queries required by the integration. Therefore, each implementing integration needs to define their own:

# depends.py
from typing import Annotated

from fastapi import Depends
from fastramqpi.ramqp.depends import from_context

from my_integration.autogenerated_graphql_client import GraphQLClient as _GraphQLClient

GraphQLClient = Annotated[_GraphQLClient, Depends(from_context("graphql_client"))]

Finally, we can define our AMQP handler to use the GraphQL client:

# events.py
from . import depends


@router.register("*")
async def handler(mo: depends.GraphQLClient) -> None:
    version = await mo.version()
    print(version)

To get REUSE working, you might consider adding the following to .reuse/dep5:

Files: my_integration/autogenerated_graphql_client/*
Copyright: Magenta ApS <https://magenta.dk>
License: MPL-2.0

Database

If your integration requires access to a database, set it up as so: compose.yaml:

services:
  my_integration:
    environment:
      FASTRAMQPI__DATABASE__USER: "fastramqpi"
      FASTRAMQPI__DATABASE__PASSWORD: "fastramqpi"
      FASTRAMQPI__DATABASE__HOST: "db"
      FASTRAMQPI__DATABASE__NAME: "fastramqpi"
  db:
    image: postgres:16
    environment:
      POSTGRES_USER: "fastramqpi"
      POSTGRES_PASSWORD: "fastramqpi"
      POSTGRES_DB: "fastramqpi"

my_integration/database.py:

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]

Due to budgetary constraints FastRAMQPI does not yet facilitate database migrations through alembic. Instead, all defined tables are created on startup. For technical reasons, this requires us to pass the DeclarativeBase from above to the FastRAMQPI constructor in app.py as so:

from my_integration.database import Base


fastramqpi = FastRAMQPI(
    application_name="os2mo-example-integration",
    settings=set``tings.fastramqpi,
    ...
    database_metadata=Base.metadata,
)

The database can be used in a handler as follows:

from fastramqpi import depends
from my_integration.database import User


@amqp_router.register("person")
async def add(session: depends.Session) -> None:
    session.add(User(name="Alice"))

Multilayer exchanges

FastRAMQPI supports a multi-layer exchange setup, where all integration queues are bound to an intermediate exchange which itself is bound to an upstream exchange for instance OS2mo, the below Graph shows this setup with two integrations that each has their own intermediate exchange.

            ┌─────┐
         ┌──┤OS2mo├──┐
 R1+R2+R3│  └─────┘  │R4+R5
         │           │
      ┌──▼──┐     ┌──▼──┐
   ┌──┤LDAP ├──┐  │Omada├──┐
   │  └──┬──┘  │  └──┬──┘  │
 R1│   R2│   R3│   R4│   R5│
   │     │     │     │     │
┌──▼──┬──▼──┬──▼──┐ ┌▼─────▼┐
│ Q1  │ Q2  │ Q3  │ │  Q4   │
└─────┴─────┴─────┘ └───────┘

Here R1,...R5 are routing keys, and Q1,...Q4 are queues. Notice that it is still possible for a single queue to subscribe to multiple routing keys, and to have multiple queues subscribed to a single intermediate exchange.

Compared the above model using intermediate exchanges with the model below, that does not utilize intermediate exchanges.

            ┌─────┐
   ┌─────┬──┤OS2mo├──┬─────┐
   │     │  └──┬──┘  │     │
 R1│   R2│   R3│   R4│   R5│
   │     │     │     │     │
┌──▼──┬──▼──┬──▼──┐ ┌▼─────▼┐
│ Q1  │ Q2  │ Q3  │ │  Q4   │
└─────┴─────┴─────┘ └───────┘

Here it is no longer obvious which queues and routing keys belong to which integration, and as such it is no longer possible to use OS2mo's refresh_* mutators to request a specific integration to run its event handlers.

Requesting a specific integration to run its event handlers, using multi-layer exchanges with intermediate exchanges can be done using the exchange parameter on the refresh_* mutators, by passing the name of the intermediate exchange.

To start using intermediate exchanges in your FastRAMQPI integration, simply set the upstream_exchange value on the AMQPConnectionSettings to the upstream exchange you want to bind your intermediate exchange to, and set the exchange value to the name you want to allocate to your intermediate exchange.

It is generally recommended to use the upstream_exchange name as a prefix for the intermediate exchange set in the exchange value, such as:

from fastramqpi.config import Settings as _FastRAMQPISettings
from fastramqpi.ramqp.config import AMQPConnectionSettings as _AMQPConnectionSettings

class AMQPConnectionSettings(_AMQPConnectionSettings):
    upstream_exchange = "os2mo"
    exchange = "os2mo_ldap"
    queue_prefix = "os2mo_ldap"


class FastRAMQPISettings(_FastRAMQPISettings):
    amqp: AMQPConnectionSettings


class Settings(BaseSettings):
    fastramqpi: FastRAMQPISettings

Integration Testing

The goal of integration testing in our context is to minimise the amount of mocking and patching by testing the integration's behavior against a running OS2mo instance in a way that resembles a production environment as much as possible. To this end, the integration is configured for testing through its public interface -- environment variables -- in .gitlab-ci.yml wherever possible. The flow of each test follows the well-known "Arrange, Act and Assert" pattern, but with a sleight modification to avoid hooking into either OS2mo or the integration itself, and endure the nature of eventual consistency that is unavoidable with an event-based system.

To get started, add OS2mo's GitLab CI template to .gitlab-ci.yml:

include:
  - project: rammearkitektur/os2mo
    file:
      - gitlab-ci-templates/integration-test-meta.v1.yml

...

Test:
  variables:
    PYTEST_ADDOPTS: "-m 'not integration_test'"

Integration-test:
  extends:
    - .integration-test:mo
  variables:
    MY_INTEGRATION__FOO: "true"

The Integration-test CI job starts a full OS2mo stack, including all necessary services, and then runs all tests marked with integration_test. Auto-used fixtures automatically ensure test isolation in OS2mo's database, increase the AMQP messaging interval, and configure respx appropriately. By default, the Test job runs all tests in the project, including both unit- and integration-tests. We overwrite the Test CI job to exclude integration tests -- the Integration-test job already only runs integration tests by default.

Moving on, some boilerplate needs to be copied since a few types cannot be known a priori:

# tests/integration/conftest.py
from collections.abc import AsyncIterator

import pytest
from asgi_lifespan import LifespanManager
from asgi_lifespan._types import ASGIApp
from fastapi import FastAPI
from gql.client import AsyncClientSession
from httpx import AsyncClient
from httpx import ASGITransport
from pytest import MonkeyPatch

from my_integration.app import create_app


@pytest.fixture
async def _app(monkeypatch: MonkeyPatch) -> FastAPI:
    monkeypatch.setenv("SOME_CONFIG", "http://example.org")

    app = create_app()
    return app


@pytest.fixture
async def asgiapp(_app: FastAPI) -> AsyncIterator[ASGIApp]:
    """ASGI app with lifespan run."""
    async with LifespanManager(_app) as manager:
        yield manager.app


@pytest.fixture
async def app(_app: FastAPI, asgiapp: ASGIApp) -> FastAPI:
    """FastAPI app with lifespan run."""
    return _app


@pytest.fixture
async def test_client(asgiapp: ASGIApp) -> AsyncIterator[AsyncClient]:
    """Create test client with associated lifecycles."""
    transport = ASGITransport(app=asgiapp, client=("1.2.3.4", 123))  # type: ignore
    async with AsyncClient(
        transport=transport, base_url="http://example.com"
    ) as client:
        yield client


@pytest.fixture
async def graphql_client(app: FastAPI) -> AsyncClientSession:
    """Authenticated GraphQL codegen client for OS2mo."""
    return app.state.context["graphql_client"]

The test_client fixture refers to a running instance of the integration, and graphql_client to the integration's pre-authenticated instance of its autogenerated GraphQL client.

To make async tests and fixtures work, and avoid having to @pytest.mark.asyncio each test individually, the following must be added to pyproject.toml:

[tool.pytest.ini_options]
asyncio_mode="auto"

A sample integration test could be as follows:

# tests/integration/test_my_integration.py
import pytest
from fastapi.testclient import TestClient
from fastramqpi.pytest_util import retry
from more_itertools import one

from my_integration.autogenerated_graphql_client import GraphQLClient


@pytest.mark.integration_test
async def test_create_person(
        test_client: TestClient,
        graphql_client: GraphQLClient,
) -> None:
    # Precondition: The person does not already exist.
    # The auto-use fixtures should automatically ensure test isolation, but
    # sometimes, especially during local development, we might be a little too
    # fast on the ^C^C^C^C^C so pytest doesn't get a chance to clean up.
    cpr_number = "1234567890"
    employee = await graphql_client._testing__get_employee(cpr_number)
    assert employee.objects == []

    # The integration needs to be triggered to create the employee. How this is
    # done depends on the integration. We assume a /trigger/ endpoint here:
    test_client.post("/trigger")

    @retry()
    async def verify() -> None:
        employees = await graphql_client._testing__get_employee(cpr_number)
        employee_states = one(employees.objects)
        employee = one(employee_states.objects)
        assert employee.cpr_number == cpr_number
        assert employee.given_name == "Alice"
    
    await verify()

Through the use of the test_client fixture, the test begins by starting the integration, including initialising any associated lifecyles such as AMQP connections. We sanity-check, to ensure the test isn't trivially passing, and trigger the integration. Due to the nature of AMQP, we don't know how long the integration will need to reconcile the state of OS2mo or an external system. Therefore, all asserts which depend on eventual consistent state, are wrapped in a function with @retry from FastRAMQPI's pytest_util. This allows the assertions to fail and be retried up to 30 seconds, before the test fails.

To keep tests clear and concise, all GraphQL queries, which are required to arrange and assert in the test, are added to the autogenerated GraphQL client, using a _testing__ prefix by convention:

# queries.graphql
query _Testing_GetEmployee($cpr_number: CPR!) {
  employees(filter: {cpr_numbers: [$cpr_number]}) {
    objects {
      objects {
        cpr_number
        given_name
      }
    }
  }
}

Overriding OS2mo-init

It is possible to specific a custom OS2mo-init configuration by setting the OS2MO_INIT_CONFIG variable at the top of the project's .gitlab-ci.yml and declaring a init.config.yml:

# .gitlab-ci.yml
variables:
  OS2MO_INIT_CONFIG: "/builds/$CI_PROJECT_PATH/init.config.yml"

# init.config.yml
facets:
  org_unit_address_type: {}
  manager_address_type: {}
  address_property: {}
  engagement_job_function: {}
  org_unit_type: {}
  engagement_type:
    qa_engineer:
      title: "Software Integration Tester"
      scope: "TEXT"
  association_type: {}
  role_type: {}
  leave_type: {}
  manager_type: {}
  responsibility: {}
  manager_level: {}
  visibility: {}
  time_planning: {}
  org_unit_level: {}
  primary_type: {}
  org_unit_hierarchy: {}
  kle_number: {}
  kle_aspect: {}

it_systems:
  FOOBAR: "The Foo Bar"

Debugging

By default, only logs generated by the application are captured and output in the GitLab interface. Service logs (OS2mo, OS2mo-init, Keycloak, etc.) can be captured for debugging by adding the following to the project's .gitlab-ci.yml file:

variables:
  CI_DEBUG_SERVICES: "true"

This is especially useful if a service fails to start (Waiting for keycloak realm builder...). See the GitLab documentation for more information.

Development

Prerequisites

Getting Started

  1. Clone the repository:
git clone [email protected]:rammearkitektur/FastRAMQPI.git
  1. Install all dependencies:
poetry install
  1. Set up pre-commit:
poetry run pre-commit install

Running the tests

You use poetry and pytest to run the tests:

poetry run pytest

You can also run specific files

poetry run pytest tests/<test_folder>/<test_file.py>

and even use filtering with -k

poetry run pytest -k "Manager"

You can use the flags -vx where v prints the test & x makes the test stop if any tests fails (Verbose, X-fail)

Authors

Magenta ApS https://magenta.dk

License

This project uses: MPL-2.0

This project uses REUSE for licensing. All licenses can be found in the LICENSES folder of the project.