Skip to content

Commit

Permalink
Cache fsp ShmArrays where possible
Browse files Browse the repository at this point in the history
Minimize calling `.data._shmarray.attach_shm_array()` as much as is
possible to avoid the crash from #332. This is the suggested hack from
issue #359.

Resolves #359
  • Loading branch information
goodboy committed Jul 19, 2022
1 parent 90bc9b9 commit e0491cf
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
25 changes: 20 additions & 5 deletions piker/fsp/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class Fsp:
# + the consuming fsp *to* the consumers output
# shm flow.
_flow_registry: dict[
tuple[_Token, str], _Token,
tuple[_Token, str],
tuple[_Token, Optional[ShmArray]],
] = {}

def __init__(
Expand Down Expand Up @@ -120,7 +121,6 @@ def __call__(
):
return self.func(*args, **kwargs)

# TODO: lru_cache this? prettty sure it'll work?
def get_shm(
self,
src_shm: ShmArray,
Expand All @@ -131,12 +131,27 @@ def get_shm(
for this "instance" of a signal processor for
the given ``key``.
The destination shm "token" and array are cached if possible to
minimize multiple stdlib/system calls.
'''
dst_token = self._flow_registry[
dst_token, maybe_array = self._flow_registry[
(src_shm._token, self.name)
]
shm = attach_shm_array(dst_token)
return shm
if maybe_array is None:
self._flow_registry[
(src_shm._token, self.name)
] = (
dst_token,
# "cache" the ``ShmArray`` such that
# we call the underlying "attach" code as few
# times as possible as per:
# - https://github.com/pikers/piker/issues/359
# - https://github.com/pikers/piker/issues/332
maybe_array := attach_shm_array(dst_token)
)

return maybe_array


def fsp(
Expand Down
7 changes: 4 additions & 3 deletions piker/fsp/_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ async def cascade(
# TODO: ugh i hate this wind/unwind to list over the wire
# but not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[
(_Token.from_msg(token), fsp_name)
] = _Token.from_msg(dst_token)
Fsp._flow_registry[(
_Token.from_msg(token),
fsp_name,
)] = _Token.from_msg(dst_token), None

fsp: Fsp = reg.get(
NamespacePath(ns_path)
Expand Down
7 changes: 4 additions & 3 deletions piker/ui/_fsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,10 @@ async def start_engine_task(
target=target,
readonly=True,
)
self._flow_registry[
(self.src_shm._token, target.name)
] = dst_shm._token
self._flow_registry[(
self.src_shm._token,
target.name
)] = dst_shm._token

# if not opened:
# raise RuntimeError(
Expand Down

0 comments on commit e0491cf

Please sign in to comment.