Skip to content

Commit

Permalink
Merge pull request #8 from hatchet-dev/feat--examples
Browse files Browse the repository at this point in the history
Feat: Fix examples, rework Context classes
  • Loading branch information
mrkaye97 authored Feb 6, 2025
2 parents 934f6fe + eff6138 commit 5e69585
Show file tree
Hide file tree
Showing 71 changed files with 951 additions and 1,589 deletions.
57 changes: 51 additions & 6 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import os
import subprocess
Expand All @@ -10,17 +11,58 @@
import pytest
import pytest_asyncio

from hatchet_sdk import Hatchet
from hatchet_sdk import ClientConfig, Hatchet
from hatchet_sdk.loader import ClientTLSConfig


@pytest.fixture(scope="session", autouse=True)
def token() -> str:
result = subprocess.run(
[
"docker",
"compose",
"run",
"--no-deps",
"setup-config",
"/hatchet/hatchet-admin",
"token",
"create",
"--config",
"/hatchet/config",
"--tenant-id",
"707d0855-80ab-4e1f-a156-f1c4546cbf52",
],
capture_output=True,
text=True,
)

token = result.stdout.strip()

os.environ["HATCHET_CLIENT_TOKEN"] = token

return token


@pytest_asyncio.fixture(scope="session")
async def aiohatchet() -> AsyncGenerator[Hatchet, None]:
yield Hatchet(debug=True)
async def aiohatchet(token: str) -> AsyncGenerator[Hatchet, None]:
yield Hatchet(
debug=True,
config=ClientConfig(
token=token,
tls_config=ClientTLSConfig(strategy="none"),
),
)


@pytest.fixture(scope="session")
def hatchet() -> Hatchet:
return Hatchet(debug=True)
def hatchet(token: str) -> Hatchet:
return Hatchet(
debug=True,
config=ClientConfig(
token=token,
tls_config=ClientTLSConfig(strategy="none"),
),
)


@pytest.fixture()
Expand All @@ -32,7 +74,10 @@ def worker(
command = ["poetry", "run", example]

logging.info(f"Starting background worker: {' '.join(command)}")
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

proc = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ.copy()
)

# Check if the process is still running
if proc.poll() is not None:
Expand Down
4 changes: 0 additions & 4 deletions examples/affinity-workers/event.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from dotenv import load_dotenv

from hatchet_sdk.clients.events import PushEventOptions
from hatchet_sdk.hatchet import Hatchet

load_dotenv()

hatchet = Hatchet(debug=True)

hatchet.event.push(
Expand Down
13 changes: 6 additions & 7 deletions examples/affinity-workers/worker.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from dotenv import load_dotenv

from hatchet_sdk import Context, Hatchet, WorkerLabelComparator
from hatchet_sdk import BaseWorkflow, Context, Hatchet, WorkerLabelComparator
from hatchet_sdk.labels import DesiredWorkerLabel

load_dotenv()

hatchet = Hatchet(debug=True)

wf = hatchet.declare_workflow(on_events=["affinity:run"])


class AffinityWorkflow(BaseWorkflow):
config = wf.config

@hatchet.workflow(on_events=["affinity:run"])
class AffinityWorkflow:
@hatchet.step(
desired_worker_labels={
"model": DesiredWorkerLabel(value="fancy-ai-model-v2", weight=10),
Expand Down
4 changes: 0 additions & 4 deletions examples/api/api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
from dotenv import load_dotenv

from hatchet_sdk import Hatchet, WorkflowList

load_dotenv()

hatchet = Hatchet(debug=True)


Expand Down
4 changes: 0 additions & 4 deletions examples/api/async_api.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import asyncio
from typing import cast

from dotenv import load_dotenv

from hatchet_sdk import Hatchet, WorkflowList

load_dotenv()

hatchet = Hatchet(debug=True)


Expand Down
8 changes: 0 additions & 8 deletions examples/async/event.py

This file was deleted.

21 changes: 0 additions & 21 deletions examples/async/test_async.py

This file was deleted.

37 changes: 0 additions & 37 deletions examples/async/worker.py

This file was deleted.

4 changes: 0 additions & 4 deletions examples/blocked_async/event.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
from dotenv import load_dotenv

from hatchet_sdk import PushEventOptions, new_client

load_dotenv()

client = new_client()

client.event.push(
Expand Down
16 changes: 7 additions & 9 deletions examples/blocked_async/worker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import hashlib
import time

from dotenv import load_dotenv

from hatchet_sdk import Context, Hatchet

load_dotenv()
from hatchet_sdk import BaseWorkflow, Context, Hatchet

hatchet = Hatchet(debug=True)

Expand All @@ -15,9 +11,12 @@
#
# You do not want to run long sync functions in an async def function

wf = hatchet.declare_workflow(on_events=["user:create"])


class Blocked(BaseWorkflow):
config = wf.config

@hatchet.workflow(on_events=["user:create"])
class Blocked:
@hatchet.step(timeout="11s", retries=3)
async def step1(self, context: Context) -> dict[str, str | int | float]:
print("Executing step1")
Expand All @@ -43,9 +42,8 @@ async def step1(self, context: Context) -> dict[str, str | int | float]:


def main() -> None:
workflow = Blocked()
worker = hatchet.worker("blocked-worker", max_runs=3)
worker.register_workflow(workflow)
worker.register_workflow(Blocked())
worker.start()


Expand Down
4 changes: 1 addition & 3 deletions examples/bulk_fanout/bulk_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
import os
from typing import Any

from dotenv import load_dotenv

from hatchet_sdk import new_client
from hatchet_sdk.clients.admin import TriggerWorkflowOptions, WorkflowRunDict
from hatchet_sdk.clients.rest.models.workflow_run import WorkflowRun
from hatchet_sdk.clients.run_event_listener import StepRunEventType


async def main() -> None:
load_dotenv()

hatchet = new_client()

workflow_runs = [
Expand Down
4 changes: 1 addition & 3 deletions examples/bulk_fanout/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import os
import random

from dotenv import load_dotenv

from hatchet_sdk import Hatchet, new_client
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
from hatchet_sdk.clients.run_event_listener import StepRunEventType


async def main() -> None:
load_dotenv()

hatchet = Hatchet()

# Generate a random stream key to use to track all
Expand Down
4 changes: 1 addition & 3 deletions examples/bulk_fanout/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import json
import os

from dotenv import load_dotenv

from hatchet_sdk import new_client
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
from hatchet_sdk.clients.events import PushEventOptions
Expand All @@ -13,7 +11,7 @@


async def main() -> None:
load_dotenv()

hatchet = new_client()

hatchet.event.push(
Expand Down
45 changes: 30 additions & 15 deletions examples/bulk_fanout/worker.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,45 @@
import asyncio
from typing import Any
from typing import Any, cast

from dotenv import load_dotenv
from pydantic import BaseModel

from hatchet_sdk import Context, Hatchet
from hatchet_sdk import BaseWorkflow, Context, Hatchet
from hatchet_sdk.clients.admin import ChildTriggerWorkflowOptions, ChildWorkflowRunDict

load_dotenv()

hatchet = Hatchet(debug=True)


@hatchet.workflow(on_events=["parent:create"])
class BulkParent:
class ParentInput(BaseModel):
n: int = 100


class ChildInput(BaseModel):
a: str


bulk_parent_wf = hatchet.declare_workflow(
on_events=["parent:create"], input_validator=ParentInput
)
bulk_child_wf = hatchet.declare_workflow(
on_events=["child:create"], input_validator=ChildInput
)


class BulkParent(BaseWorkflow):
config = bulk_parent_wf.config

@hatchet.step(timeout="5m")
async def spawn(self, context: Context) -> dict[str, list[Any]]:
print("spawning child")

context.put_stream("spawning...")
results = []

n = context.workflow_input().get("n", 100)
n = bulk_parent_wf.get_workflow_input(context).n

child_workflow_runs = [
ChildWorkflowRunDict(
workflow_name="BulkChild",
input={"a": str(i)},
bulk_child_wf.construct_spawn_workflow_input(
input=ChildInput(a=str(i)),
key=f"child{i}",
options=ChildTriggerWorkflowOptions(
additional_metadata={"hello": "earth"}
Expand All @@ -37,7 +51,7 @@ async def spawn(self, context: Context) -> dict[str, list[Any]]:
if len(child_workflow_runs) == 0:
return {}

spawn_results = await context.aio.spawn_workflows(child_workflow_runs)
spawn_results = await bulk_child_wf.spawn_many(context, child_workflow_runs)

results = await asyncio.gather(
*[workflowRunRef.result() for workflowRunRef in spawn_results],
Expand All @@ -55,11 +69,12 @@ async def spawn(self, context: Context) -> dict[str, list[Any]]:
return {"results": results}


@hatchet.workflow(on_events=["child:create"])
class BulkChild:
class BulkChild(BaseWorkflow):
config = bulk_child_wf.config

@hatchet.step()
def process(self, context: Context) -> dict[str, str]:
a = context.workflow_input()["a"]
a = bulk_child_wf.get_workflow_input(context).a
print(f"child process {a}")
context.put_stream("child 1...")
return {"status": "success " + a}
Expand Down
Loading

0 comments on commit 5e69585

Please sign in to comment.