Skip to content

KhaosResearch/drama

Repository files navigation

CI Release GitHub Release Code style: black

drama is an asynchronous workflow executor engine supported by a command line tool and RESTful API.

Proudly based on Dramatiq. This project requires the following dependencies to work:

You can setup a minimal development environment using Docker Compose.

docker-compose [-f docker-compose-kafka.yml] up -d

🚀 Setup

Installation

Via source code using Poetry:

git clone https://github.com/KhaosResearch/drama.git
cd drama
poetry install

Before running drama, save a copy of .env.template as .env and insert your own values. drama will then look for a valid .env file in the current working directory. In its absence, it will attempt to determine the values from environmental variables (with prefix DRAMA_).

After that you can run:

poetry run drama -h

Spawn workers

Spawns multiple concurrent worker processes to execute tasks.

poetry run drama worker --processes 4

For a full list or valid command line arguments that can be passed to drama worker, checkout dramatiq -h

Deploy server

(optional)

Server can be deployed with uvicorn, a lightning-fast ASGI server, using the command-line client.

poetry run drama server

Alternatively, use the provided Dockerfile:

sudo docker build . -t drama-server
sudo docker run -p 8004:8004 drama-server

Online documentation is available at /api/docs.

✨ Getting started

Defining components

Take a look at the catalog for some examples on how to develop your own components.

Executing workflows

Using Python

To run a workflow from Python, provide a list of TaskRequests and wrap them up in a WorkflowRequest.

from drama.models.task import Task
from drama.models.workflow import Workflow
from drama.worker.scheduler import Scheduler

task_import_file = Task(
    name="LoadIrisDataset",
    module="drama.core.catalog.load.ImportFile",
    params={
        "url": "https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv"
    }
)

workflow_request = Workflow(tasks=[task_import_file])

with Scheduler() as scheduler:
    scheduler.run(workflow_request)

(This script is complete, it should run "as is")

Using requests

Alternatively, a workflow can also be run using the built-in server.

import requests

r = requests.post(
    url='http://localhost:8001/api/v2/workflow/run',
    json={
      "tasks": [
        {
          "name": "LoadIrisDataset",
          "module": "drama.core.catalog.load.ImportFile",
          "params": { 
            "url": "https://raw.githubusercontent.com/cs109/2014_data/master/countries.csv"
          }
        }
      ]
    })
print(r.text)

(This script is complete, it should run "as is")

Releases

No releases published

Packages

No packages published

Languages