From b3658849f57b563d89118df7eceda81fd6521f5f Mon Sep 17 00:00:00 2001 From: Raphael Deem Date: Fri, 25 Oct 2024 23:48:13 -0700 Subject: [PATCH] stop reconciling --- data/sync_worker.py | 9 +++++++-- tests/test_sync_worker.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/data/sync_worker.py b/data/sync_worker.py index 72ce83e5..e66dcffa 100644 --- a/data/sync_worker.py +++ b/data/sync_worker.py @@ -10,6 +10,8 @@ import sqlalchemy UPDATE_UNCATEGORIZED_POSITIONS = False +# TODO: harden/fix this (super buggy right now) +RECONCILE_POSITIONS = False TIMEOUT_DURATION = 120 class BrokerService: @@ -173,6 +175,7 @@ async def update_position_prices_and_volatility(self, session, positions, timest def _strip_timezone(self, timestamp): return timestamp.replace(tzinfo=None) + # TODO: fix or remove async def update_cost_basis(self, session, position): broker_instance = await self.broker_service.get_broker_instance(position.broker) await self.update_position_cost_basis(session, position, broker_instance) @@ -181,7 +184,8 @@ async def _update_prices_and_volatility(self, session, positions, now_naive): for position in positions: try: await self._update_position_price(session, position, now_naive) - await self.update_cost_basis(session, position) + if RECONCILE_POSITIONS: + await self.update_cost_basis(session, position) except Exception: logger.exception(f"Error processing position {position.symbol}") @@ -391,5 +395,6 @@ async def _fetch_and_update_positions(session, position_service, now): async def _reconcile_brokers_and_update_balances(session, position_service, balance_service, brokers, now): for broker in brokers: - await position_service.reconcile_positions(session, broker) + if RECONCILE_POSITIONS: + await position_service.reconcile_positions(session, broker) await balance_service.update_all_strategy_balances(session, broker, now) diff --git a/tests/test_sync_worker.py b/tests/test_sync_worker.py index 2c2831b9..6726b8f3 100644 --- a/tests/test_sync_worker.py +++ b/tests/test_sync_worker.py @@ -9,6 +9,8 @@ from data.sync_worker import PositionService, BalanceService, BrokerService, _get_async_engine, _run_sync_worker_iteration, _fetch_and_update_positions, _reconcile_brokers_and_update_balances from database.models import Position, Balance +import data.sync_worker + # Mock data for testing MOCK_POSITIONS = [ Position(symbol='AAPL', broker='tradier', latest_price=0, last_updated=datetime.now(), underlying_volatility=None), @@ -44,9 +46,42 @@ async def test_update_position_prices_and_volatility(): mock_broker_service.get_latest_price.assert_any_call('tradier', 'AAPL') mock_broker_service.get_latest_price.assert_any_call('tastytrade', 'GOOG') + # Assert that the session commit was called + assert mock_session.commit.called + +@pytest.mark.asyncio +async def test_update_position_prices_and_volatility_with_reconcile(): + # Mock the broker service + data.sync_worker.RECONCILE_POSITIONS = True + mock_broker_service = AsyncMock() + mock_broker_instance = AsyncMock() + mock_broker_instance.get_latest_price = AsyncMock(return_value=150.0) + mock_broker_instance.get_cost_basis = MagicMock(return_value=100.0) # Synchronous function + + # Mock get_broker_instance to return the mock broker instance + mock_broker_service.get_broker_instance = AsyncMock(return_value=mock_broker_instance) + + # Initialize PositionService with the mocked broker service + position_service = PositionService(mock_broker_service) + + # Mock session and positions + mock_session = AsyncMock(spec=AsyncSession) # Ensure we are using AsyncSession + mock_positions = MOCK_POSITIONS + + # Test the method + timestamp = datetime.now(timezone.utc) + await position_service.update_position_prices_and_volatility(mock_session, mock_positions, timestamp) + + # Assert that the broker service was called to get the latest price for each position + mock_broker_service.get_latest_price.assert_any_call('tradier', 'AAPL') + mock_broker_service.get_latest_price.assert_any_call('tastytrade', 'GOOG') + mock_broker_instance.get_cost_basis.assert_any_call('AAPL') mock_broker_instance.get_cost_basis.assert_any_call('GOOG') + # Reset the reconcile positions flag + data.sync_worker.RECONCILE_POSITIONS = False + # Assert that the session commit was called assert mock_session.commit.called @@ -225,6 +260,7 @@ async def test_reconcile_brokers_and_update_balances(mock_logger): mock_balance_service = AsyncMock() mock_brokers = ['broker1', 'broker2'] mock_now = datetime.now() # Capture datetime once + data.sync_worker.RECONCILE_POSITIONS = True await _reconcile_brokers_and_update_balances(mock_session, mock_position_service, mock_balance_service, mock_brokers, mock_now) @@ -233,6 +269,7 @@ async def test_reconcile_brokers_and_update_balances(mock_logger): mock_position_service.reconcile_positions.assert_any_await(mock_session, 'broker2') mock_balance_service.update_all_strategy_balances.assert_any_await(mock_session, 'broker1', mock_now) mock_balance_service.update_all_strategy_balances.assert_any_await(mock_session, 'broker2', mock_now) + data.sync_worker.RECONCILE_POSITIONS = False # TODO: Fix this test or refactor @pytest.mark.skip @@ -488,6 +525,7 @@ async def test_run_sync_worker_iteration_timeout(mock_logger): mock_position_service = AsyncMock() mock_balance_service = AsyncMock() mock_brokers = ['mock_broker'] + data.sync_worker.RECONCILE_POSITIONS = True # Set a very short timeout for testing short_timeout = 0.01 # 10 milliseconds @@ -507,3 +545,4 @@ async def slow_reconcile(*args, **kwargs): # Ensure reconcile and update methods were attempted mock_position_service.reconcile_positions.assert_called_once_with(mock_session, 'mock_broker') mock_balance_service.update_all_strategy_balances.assert_not_called() # Should not reach this due to timeout + data.sync_worker.RECONCILE_POSITIONS = False