Skip to content

Commit

Permalink
Fix MessageBus pattern resolving
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Apr 13, 2024
1 parent e05f0c6 commit d28da76
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
1 change: 1 addition & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Released on TBD (UTC).
- Renamed `register_serializable_object` to `register_serializable_type` (also renames first param from `obj` to `cls`)

### Fixes
- Fixed `MessageBus` pattern resolving (fixes a performance regression where topics published with no subscribers would always re-resolve)
- Fixed `BacktestNode` streaming data management (was not clearing between chunks), thanks for the report @dpmabo
- Fixed `RiskEngine` cumulative notional calculations for margin accounts (was incorrectly using base currency when selling)
- Fixed selling `Equity` instruments with `CASH` account and `NETTING` OMS incorrectly rejecting (should be able to reduce position)
Expand Down
2 changes: 1 addition & 1 deletion nautilus_trader/common/component.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ cdef class MessageBus:
cdef dict[UUID4, object] _correlation_index
cdef tuple[type] _publishable_types
cdef bint _has_backing
cdef set _unresolved_topics
cdef bint _resolved

cdef readonly TraderId trader_id
"""The trader ID associated with the bus.\n\n:returns: `TraderId`"""
Expand Down
11 changes: 5 additions & 6 deletions nautilus_trader/common/component.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2074,7 +2074,7 @@ cdef class MessageBus:
if types_filter is not None:
self._publishable_types = tuple(o for o in _EXTERNAL_PUBLISHABLE_TYPES if o not in types_filter)
self._has_backing = config.database is not None
self._unresolved_topics = set()
self._resolved = False

# Counters
self.sent_count = 0
Expand Down Expand Up @@ -2417,7 +2417,7 @@ cdef class MessageBus:

self._subscriptions[sub] = sorted(matches)

self._unresolved_topics.add(topic)
self._resolved = False

self._log.debug(f"Added {sub}")

Expand Down Expand Up @@ -2462,7 +2462,7 @@ cdef class MessageBus:

del self._subscriptions[sub]

self._unresolved_topics.add(topic)
self._resolved = False

self._log.debug(f"Removed {sub}")

Expand Down Expand Up @@ -2492,10 +2492,10 @@ cdef class MessageBus:
# Get all subscriptions matching topic pattern
# Note: cannot use truthiness on array
cdef Subscription[:] subs = self._patterns.get(topic)
cdef str u_topic
if subs is None or any(is_matching(topic, u_topic) for u_topic in self._unresolved_topics):
if subs is None or (len(subs) == 0 and not self._resolved):
# Add the topic pattern and get matching subscribers
subs = self._resolve_subscriptions(topic)
self._resolved = True

# Send message to all matched subscribers
cdef:
Expand Down Expand Up @@ -2528,7 +2528,6 @@ cdef class MessageBus:
for existing_sub in self._subscriptions.copy():
if is_matching(topic, existing_sub.topic):
subs_list.append(existing_sub)
self._unresolved_topics.discard(existing_sub.topic)

subs_list = sorted(subs_list, reverse=True)
cdef Subscription[:] subs_array = np.ascontiguousarray(subs_list, dtype=Subscription)
Expand Down
4 changes: 2 additions & 2 deletions tests/unit_tests/persistence/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_feather_writer(self, catalog_betfair: ParquetDataCatalog) -> None:
expected = {
"AccountState": 400,
"BettingInstrument": 1,
"ComponentStateChanged": 21,
"ComponentStateChanged": 27,
"OrderAccepted": 189,
"OrderBookDelta": 1307,
"OrderDenied": 3,
Expand Down Expand Up @@ -298,7 +298,7 @@ def test_read_backtest(
expected = {
"AccountState": 400,
"BettingInstrument": 1,
"ComponentStateChanged": 21,
"ComponentStateChanged": 27,
"OrderAccepted": 189,
"OrderBookDelta": 1307,
"OrderDenied": 3,
Expand Down

0 comments on commit d28da76

Please sign in to comment.