Skip to content

Commit

Permalink
Merge pull request #32 from hatchet-dev/belanger/fix-admin-aio
Browse files Browse the repository at this point in the history
fix: move heartbeats back to separate thread
  • Loading branch information
abelanger5 authored May 31, 2024
2 parents e3c2f96 + 94426f5 commit ddbf0d5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
33 changes: 25 additions & 8 deletions hatchet_sdk/clients/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,20 @@ def unregister(self):
START_GET_GROUP_KEY = 2


async def read_action(listener: Any, interrupt: asyncio.Event):
class Event_ts(asyncio.Event):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._loop is None:
self._loop = asyncio.get_event_loop()

def set(self):
self._loop.call_soon_threadsafe(super().set)

def clear(self):
self._loop.call_soon_threadsafe(super().clear)


async def read_action(listener: Any, interrupt: Event_ts):
assigned_action = await listener.read()
interrupt.set()
return assigned_action
Expand All @@ -130,25 +143,26 @@ def __init__(
worker_id,
):
self.config = config
self.client = DispatcherStub(new_conn(config))
self.aio_client = DispatcherStub(new_conn(config, True))
self.token = config.token
self.worker_id = worker_id
self.retries = 0
self.last_connection_attempt = 0
self.heartbeat_task: asyncio.Task = None
self.heartbeat_thread: threading.Thread = None
self.run_heartbeat = True
self.listen_strategy = "v2"
self.stop_signal = False
self.logger = logger.bind(worker_id=worker_id)

async def heartbeat(self):
def heartbeat(self):
# send a heartbeat every 4 seconds
while True:
if not self.run_heartbeat:
break

try:
await self.aio_client.Heartbeat(
self.client.Heartbeat(
HeartbeatRequest(
workerId=self.worker_id,
heartbeatAt=proto_timestamp_now(),
Expand All @@ -166,14 +180,17 @@ async def heartbeat(self):
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
break

await asyncio.sleep(4)
time.sleep(4)

def start_heartbeater(self):
if self.heartbeat_task is not None:
if self.heartbeat_thread is not None:
return

# create a new thread to send heartbeats
self.heartbeat_task = asyncio.create_task(self.heartbeat())
heartbeat_thread = threading.Thread(target=self.heartbeat)
heartbeat_thread.start()

self.heartbeat_thread = heartbeat_thread

def __aiter__(self):
return self._generator()
Expand All @@ -192,7 +209,7 @@ async def _generator(self) -> AsyncGenerator[Action, None]:

try:
while True:
self.interrupt = asyncio.Event()
self.interrupt = Event_ts()
t = asyncio.create_task(read_action(listener, self.interrupt))
await self.interrupt.wait()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "0.25.2"
version = "0.25.3"
description = ""
authors = ["Alexander Belanger <[email protected]>"]
readme = "README.md"
Expand Down

0 comments on commit ddbf0d5

Please sign in to comment.