Skip to content

Commit

Permalink
Update version to 0.2b1, require Python 3.9+, and enhance GitHub Acti…
Browse files Browse the repository at this point in the history
…ons workflow (#1) (#35)

- Bump version in `pyproject.toml` to 0.2b1 and update Python requirement to >=3.9.
- Add `protobuf` dependency in `requirements.txt`.
- Update GitHub Actions workflow to support Python versions 3.9 to 3.13 and upgrade action versions.
- Refactor type hints in various files to use `Optional` and `list` instead of `Union` and `List`.
- Improve handling of custom status in orchestration context and related functions.
- Fix purge implementation to pass required parameters.
  • Loading branch information
cgillum authored Jan 6, 2025
1 parent c17a3e8 commit ccb8a63
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 97 deletions.
19 changes: 16 additions & 3 deletions .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand All @@ -35,3 +35,16 @@ jobs:
- name: Pytest unit tests
run: |
pytest -m "not e2e" --verbose
# Sidecar for running e2e tests requires Go SDK
- name: Install Go SDK
uses: actions/setup-go@v5
with:
go-version: 'stable'

# Install and run the durabletask-go sidecar for running e2e tests
- name: Pytest e2e tests
run: |
go install github.com/microsoft/durabletask-go@main
durabletask-go --port 4001 &
pytest -m "e2e" --verbose
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"editor.defaultFormatter": "ms-python.autopep8",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true,
"source.organizeImports": "explicit"
},
"editor.rulers": [
119
Expand All @@ -29,5 +29,6 @@
"coverage.xml",
"jacoco.xml",
"coverage.cobertura.xml"
]
],
"makefile.configureOnOpen": false
}
40 changes: 20 additions & 20 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, List, Tuple, TypeVar, Union
from typing import Any, Optional, TypeVar, Union

import grpc
from google.protobuf import wrappers_pb2
Expand Down Expand Up @@ -42,10 +42,10 @@ class OrchestrationState:
runtime_status: OrchestrationStatus
created_at: datetime
last_updated_at: datetime
serialized_input: Union[str, None]
serialized_output: Union[str, None]
serialized_custom_status: Union[str, None]
failure_details: Union[task.FailureDetails, None]
serialized_input: Optional[str]
serialized_output: Optional[str]
serialized_custom_status: Optional[str]
failure_details: Optional[task.FailureDetails]

def raise_if_failed(self):
if self.failure_details is not None:
Expand All @@ -64,7 +64,7 @@ def failure_details(self):
return self._failure_details


def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Union[OrchestrationState, None]:
def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Optional[OrchestrationState]:
if not res.exists:
return None

Expand Down Expand Up @@ -92,20 +92,20 @@ def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Un
class TaskHubGrpcClient:

def __init__(self, *,
host_address: Union[str, None] = None,
metadata: Union[List[Tuple[str, str]], None] = None,
log_handler = None,
log_formatter: Union[logging.Formatter, None] = None,
host_address: Optional[str] = None,
metadata: Optional[list[tuple[str, str]]] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False):
channel = shared.get_grpc_channel(host_address, metadata, secure_channel=secure_channel)
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
input: Union[TInput, None] = None,
instance_id: Union[str, None] = None,
start_at: Union[datetime, None] = None,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None) -> str:
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -122,14 +122,14 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId

def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Union[OrchestrationState, None]:
def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
res: pb.GetInstanceResponse = self._stub.GetInstance(req)
return new_orchestration_state(req.instanceId, res)

def wait_for_orchestration_start(self, instance_id: str, *,
fetch_payloads: bool = False,
timeout: int = 60) -> Union[OrchestrationState, None]:
timeout: int = 60) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to start.")
Expand All @@ -144,7 +144,7 @@ def wait_for_orchestration_start(self, instance_id: str, *,

def wait_for_orchestration_completion(self, instance_id: str, *,
fetch_payloads: bool = True,
timeout: int = 60) -> Union[OrchestrationState, None]:
timeout: int = 60) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to complete.")
Expand All @@ -170,7 +170,7 @@ def wait_for_orchestration_completion(self, instance_id: str, *,
raise

def raise_orchestration_event(self, instance_id: str, event_name: str, *,
data: Union[Any, None] = None):
data: Optional[Any] = None):
req = pb.RaiseEventRequest(
instanceId=instance_id,
name=event_name,
Expand All @@ -180,7 +180,7 @@ def raise_orchestration_event(self, instance_id: str, event_name: str, *,
self._stub.RaiseEvent(req)

def terminate_orchestration(self, instance_id: str, *,
output: Union[Any, None] = None,
output: Optional[Any] = None,
recursive: bool = True):
req = pb.TerminateRequest(
instanceId=instance_id,
Expand All @@ -203,4 +203,4 @@ def resume_orchestration(self, instance_id: str):
def purge_orchestration(self, instance_id: str, recursive: bool = True):
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
self._logger.info(f"Purging instance '{instance_id}'.")
self._stub.PurgeInstances()
self._stub.PurgeInstances(req)
3 changes: 1 addition & 2 deletions durabletask/internal/grpc_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Licensed under the MIT License.

from collections import namedtuple
from typing import List, Tuple

import grpc

Expand All @@ -26,7 +25,7 @@ class DefaultClientInterceptorImpl (
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
interceptor to add additional headers to all calls as needed."""

def __init__(self, metadata: List[Tuple[str, str]]):
def __init__(self, metadata: list[tuple[str, str]]):
super().__init__()
self._metadata = metadata

Expand Down
32 changes: 16 additions & 16 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import traceback
from datetime import datetime
from typing import List, Union
from typing import Optional

from google.protobuf import timestamp_pb2, wrappers_pb2

Expand All @@ -12,14 +12,14 @@
# TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere


def new_orchestrator_started_event(timestamp: Union[datetime, None] = None) -> pb.HistoryEvent:
def new_orchestrator_started_event(timestamp: Optional[datetime] = None) -> pb.HistoryEvent:
ts = timestamp_pb2.Timestamp()
if timestamp is not None:
ts.FromDatetime(timestamp)
return pb.HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb.OrchestratorStartedEvent())


def new_execution_started_event(name: str, instance_id: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand Down Expand Up @@ -49,15 +49,15 @@ def new_timer_fired_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent:
)


def new_task_scheduled_event(event_id: int, name: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_task_scheduled_event(event_id: int, name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=event_id,
timestamp=timestamp_pb2.Timestamp(),
taskScheduled=pb.TaskScheduledEvent(name=name, input=get_string_value(encoded_input))
)


def new_task_completed_event(event_id: int, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_task_completed_event(event_id: int, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -77,7 +77,7 @@ def new_sub_orchestration_created_event(
event_id: int,
name: str,
instance_id: str,
encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=event_id,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -88,7 +88,7 @@ def new_sub_orchestration_created_event(
)


def new_sub_orchestration_completed_event(event_id: int, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_sub_orchestration_completed_event(event_id: int, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand Down Expand Up @@ -116,7 +116,7 @@ def new_failure_details(ex: Exception) -> pb.TaskFailureDetails:
)


def new_event_raised_event(name: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -140,7 +140,7 @@ def new_resume_event() -> pb.HistoryEvent:
)


def new_terminated_event(*, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_terminated_event(*, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -150,7 +150,7 @@ def new_terminated_event(*, encoded_output: Union[str, None] = None) -> pb.Histo
)


def get_string_value(val: Union[str, None]) -> Union[wrappers_pb2.StringValue, None]:
def get_string_value(val: Optional[str]) -> Optional[wrappers_pb2.StringValue]:
if val is None:
return None
else:
Expand All @@ -160,9 +160,9 @@ def get_string_value(val: Union[str, None]) -> Union[wrappers_pb2.StringValue, N
def new_complete_orchestration_action(
id: int,
status: pb.OrchestrationStatus,
result: Union[str, None] = None,
failure_details: Union[pb.TaskFailureDetails, None] = None,
carryover_events: Union[List[pb.HistoryEvent], None] = None) -> pb.OrchestratorAction:
result: Optional[str] = None,
failure_details: Optional[pb.TaskFailureDetails] = None,
carryover_events: Optional[list[pb.HistoryEvent]] = None) -> pb.OrchestratorAction:
completeOrchestrationAction = pb.CompleteOrchestrationAction(
orchestrationStatus=status,
result=get_string_value(result),
Expand All @@ -178,7 +178,7 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
return pb.OrchestratorAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp))


def new_schedule_task_action(id: int, name: str, encoded_input: Union[str, None]) -> pb.OrchestratorAction:
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
name=name,
input=get_string_value(encoded_input)
Expand All @@ -194,8 +194,8 @@ def new_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
def new_create_sub_orchestration_action(
id: int,
name: str,
instance_id: Union[str, None],
encoded_input: Union[str, None]) -> pb.OrchestratorAction:
instance_id: Optional[str],
encoded_input: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
name=name,
instanceId=instance_id,
Expand Down
15 changes: 9 additions & 6 deletions durabletask/internal/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json
import logging
from types import SimpleNamespace
from typing import Any, Dict, List, Tuple, Union
from typing import Any, Optional

import grpc

Expand All @@ -20,7 +20,10 @@ def get_default_host_address() -> str:
return "localhost:4001"


def get_grpc_channel(host_address: Union[str, None], metadata: Union[List[Tuple[str, str]], None], secure_channel: bool = False) -> grpc.Channel:
def get_grpc_channel(
host_address: Optional[str],
metadata: Optional[list[tuple[str, str]]],
secure_channel: bool = False) -> grpc.Channel:
if host_address is None:
host_address = get_default_host_address()

Expand All @@ -36,8 +39,8 @@ def get_grpc_channel(host_address: Union[str, None], metadata: Union[List[Tuple[

def get_logger(
name_suffix: str,
log_handler: Union[logging.Handler, None] = None,
log_formatter: Union[logging.Formatter, None] = None) -> logging.Logger:
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None) -> logging.Logger:
logger = logging.Logger(f"durabletask-{name_suffix}")

# Add a default log handler if none is provided
Expand Down Expand Up @@ -78,7 +81,7 @@ def default(self, obj):
if dataclasses.is_dataclass(obj):
# Dataclasses are not serializable by default, so we convert them to a dict and mark them for
# automatic deserialization by the receiver
d = dataclasses.asdict(obj)
d = dataclasses.asdict(obj) # type: ignore
d[AUTO_SERIALIZED] = True
return d
elif isinstance(obj, SimpleNamespace):
Expand All @@ -94,7 +97,7 @@ class InternalJSONDecoder(json.JSONDecoder):
def __init__(self, *args, **kwargs):
super().__init__(object_hook=self.dict_to_object, *args, **kwargs)

def dict_to_object(self, d: Dict[str, Any]):
def dict_to_object(self, d: dict[str, Any]):
# If the object was serialized by the InternalJSONEncoder, deserialize it as a SimpleNamespace
if d.pop(AUTO_SERIALIZED, False):
return SimpleNamespace(**d)
Expand Down
Loading

0 comments on commit ccb8a63

Please sign in to comment.