Skip to content

Commit

Permalink
Merge pull request #99 from r0fls/rd/pegged-order-type
Browse files Browse the repository at this point in the history
pegged order style
  • Loading branch information
r0fls authored Dec 10, 2024
2 parents 461b694 + 4d8c8bf commit 167a106
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 16 deletions.
21 changes: 15 additions & 6 deletions brokers/base_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _get_account_info(self):
pass

@abstractmethod
def _place_order(self, symbol, quantity, side, price=None, order_type='limit'):
def _place_order(self, symbol, quantity, side, price=None, order_type='limit', execution_style=''):
pass

def _place_future_option_order(
Expand Down Expand Up @@ -342,7 +342,9 @@ async def place_future_option_order(
side,
strategy,
price=None,
order_type='limit'):
order_type='limit',
execution_style=''
):
multiplier = futures_contract_size(symbol)
return await self._place_order_generic(
symbol, quantity, side, strategy, price, multiplier, self._place_future_option_order, order_type
Expand All @@ -355,7 +357,9 @@ async def place_option_order(
side,
strategy,
price=None,
order_type='limit'):
order_type='limit',
execution_style=''
):
multiplier = OPTION_MULTIPLIER
return await self._place_order_generic(
symbol, quantity, side, strategy, price, multiplier, self._place_option_order, order_type
Expand All @@ -368,7 +372,9 @@ async def place_order(
side,
strategy,
price=None,
order_type='limit'):
order_type='limit',
execution_style=''
):
multiplier = 1 # Regular stock orders don't have a multiplier
return await self._place_order_generic(
symbol, quantity, side, strategy, price, multiplier, self._place_order, order_type
Expand All @@ -383,7 +389,9 @@ async def _place_order_generic(
price,
multiplier,
broker_order_func,
order_type='limit'):
order_type='limit',
execution_style=''
):
'''Generic method to place an order and update database'''
logger.info(
'Placing order',
Expand Down Expand Up @@ -434,7 +442,8 @@ async def _place_order_generic(
broker=self.broker_name,
strategy=strategy,
profit_loss=0,
success='yes'
success='yes',
execution_style=execution_style
)

# Update the trade and positions in the database
Expand Down
4 changes: 4 additions & 0 deletions brokers/tastytrade_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ def _get_options_chain(self, symbol, expiration_date):
extra={'error': str(e)})

async def get_current_price(self, symbol):
# TODO: get last instead of mid
return self.get_mid_price(symbol)

async def get_mid_price(self, symbol):
if ':' in symbol:
# Looks like this is already a streamer symbol
pass
Expand Down
15 changes: 15 additions & 0 deletions brokers/tradier_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,21 @@ def _get_options_chain(self, symbol, expiration_date):
logger.error('Failed to retrieve options chain',
extra={'error': str(e)})

async def get_mid_price(self, symbol):
logger.info('Retrieving mid price', extra={'symbol': symbol})
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/markets/quotes?symbols={symbol}", headers=self.headers) as response:
response.raise_for_status()
data = await response.json()
bid = data.get('quotes', {}).get('quote', {}).get('bid')
ask = data.get('quotes', {}).get('quote', {}).get('ask')
mid_price = round((bid + ask) / 2, 2)
logger.info('Mid price retrieved', extra={'symbol': symbol, 'mid_price': mid_price})
return mid_price
except aiohttp.ClientError as e:
logger.error('Failed to retrieve mid price', extra={'error': str(e)})

async def get_current_price(self, symbol):
logger.info('Retrieving current price', extra={'symbol': symbol})
try:
Expand Down
1 change: 1 addition & 0 deletions database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Trade(Base):
strategy = Column(String, nullable=True)
profit_loss = Column(Float, nullable=True)
success = Column(String, nullable=True)
execution_style = Column(String, nullable=True)

class AccountInfo(Base):
__tablename__ = 'account_info'
Expand Down
22 changes: 21 additions & 1 deletion order_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from database.models import Position, Trade

MARK_ORDER_STALE_AFTER = 60 * 60 * 24 * 2 # 2 days
PEGGED_ORDER_CANCEL_AFTER = 15 # 15 seconds

class OrderManager:
def __init__(self, engine, brokers):
Expand Down Expand Up @@ -35,7 +36,7 @@ async def reconcile_order(self, order):
stale_threshold = datetime.utcnow() - timedelta(seconds=MARK_ORDER_STALE_AFTER)

# Check if the order is stale
if order.timestamp < stale_threshold and order.status not in ['filled', 'canceled']:
if order.timestamp < stale_threshold and order.status not in ['filled', 'cancelled']:
try:
logger.info(f'Marking order {order.id} as stale', extra={'order_id': order.id})
await self.db_manager.update_trade_status(order.id, 'stale')
Expand All @@ -59,6 +60,25 @@ async def reconcile_order(self, order):
await broker.update_positions(order.id, session)
except Exception as e:
logger.error(f'Error reconciling order {order.id}', extra={'error': str(e)})
elif order.execution_style == 'pegged':
cancel_threshold = datetime.utcnow() - timedelta(seconds=PEGGED_ORDER_CANCEL_AFTER)
if order.timestamp < cancel_threshold:
try:
logger.info(f'Cancelling pegged order {order.id}', extra={'order_id': order.id})
await broker.cancel_order(order.broker_id)
await self.db_manager.update_trade_status(order.id, 'cancelled')
mid_price = await broker.get_mid_price(order.symbol)
await broker.place_order(
symbol=order.symbol,
quantity=order.quantity,
side=order.side,
strategy=order.strategy,
price=round(mid_price, 2),
order_type='limit',
execution_style=order.execution_style
)
except Exception as e:
logger.error(f'Error cancelling pegged order {order.id}', extra={'error': str(e)})

async def run(self):
logger.info('Running OrderManager')
Expand Down
21 changes: 14 additions & 7 deletions strategies/base_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@


class BaseStrategy(ABC):
def __init__(self, broker, strategy_name, starting_capital, rebalance_interval_minutes=5):
def __init__(self, broker, strategy_name, starting_capital, rebalance_interval_minutes=5, execution_style=''):
self.broker = broker
self.strategy_name = strategy_name
self.starting_capital = starting_capital
self.rebalance_interval_minutes = rebalance_interval_minutes
self.initialized = False
self.execution_style = execution_style

@abstractmethod
async def rebalance(self):
Expand Down Expand Up @@ -259,27 +260,33 @@ async def fetch_current_db_positions(self):
'strategy_name': self.strategy_name})
return current_db_positions_dict

async def place_future_option_order(self, symbol, quantity, side, price, wait_till_open=True, order_type='limit'):
async def place_future_option_order(self, symbol, quantity, side, price, wait_till_open=True, order_type='limit', execution_style=''):
if execution_style == '':
execution_style = self.execution_style
if is_futures_market_open() or not wait_till_open:
await self.broker.place_future_option_order(symbol, quantity, side, self.strategy_name, price, order_type)
await self.broker.place_future_option_order(symbol, quantity, side, self.strategy_name, price, order_type, execution_style=execution_style)
logger.info(f"Placed {side} order for {symbol}: {quantity} shares", extra={
'strategy_name': self.strategy_name, 'symbol': symbol, 'quantity': quantity, 'side': side, 'price': price, 'order_type': order_type})
else:
logger.info(f"Market is closed, not placing {side} order for {symbol}: {quantity} shares", extra={
'strategy_name': self.strategy_name, 'symbol': symbol, 'quantity': quantity, 'side': side, 'price': price, 'order_type': order_type})

async def place_option_order(self, symbol, quantity, side, price, wait_till_open=True, order_type='limit'):
async def place_option_order(self, symbol, quantity, side, price, wait_till_open=True, order_type='limit', execution_style=''):
if execution_style == '':
execution_style = self.execution_style
if is_market_open() or not wait_till_open:
await self.broker.place_option_order(symbol, quantity, side, self.strategy_name, price, order_type)
await self.broker.place_option_order(symbol, quantity, side, self.strategy_name, price, order_type, execution_style=execution_style)
logger.info(f"Placed {side} order for {symbol}: {quantity} shares", extra={
'strategy_name': self.strategy_name, 'symbol': symbol, 'quantity': quantity, 'side': side, 'price': price, 'order_type': order_type})
else:
logger.info(f"Market is closed, not placing {side} order for {symbol}: {quantity} shares", extra={
'strategy_name': self.strategy_name, 'symbol': symbol, 'quantity': quantity, 'side': side, 'price': price, 'order_type': order_type})

async def place_order(self, stock, quantity, side, price, wait_till_open=True, order_type='limit'):
async def place_order(self, stock, quantity, side, price, wait_till_open=True, order_type='limit', execution_style=''):
if execution_style == '':
execution_style = self.execution_style
if is_market_open() or not wait_till_open:
await self.broker.place_order(stock, quantity, side, self.strategy_name, price, order_type)
await self.broker.place_order(stock, quantity, side, self.strategy_name, price, order_type, execution_style=execution_style)
logger.info(f"Placed {side} order for {stock}: {quantity} shares", extra={
'strategy_name': self.strategy_name, 'stock': stock, 'quantity': quantity, 'side': side, 'price': price, 'order_type': order_type})
else:
Expand Down
104 changes: 104 additions & 0 deletions tests/test_order_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from database.models import Trade
from order_manager.manager import OrderManager

PEGGED_ORDER_CANCEL_AFTER = 15


@pytest_asyncio.fixture
def mock_db_manager():
Expand Down Expand Up @@ -123,3 +125,105 @@ async def test_run(order_manager, mock_db_manager):
# Verify that open trades are fetched and reconciled
mock_db_manager.get_open_trades.assert_called_once()
order_manager.reconcile_orders.assert_called_once_with(trades)

@pytest.mark.asyncio
async def test_reconcile_order_pegged_expired(order_manager, mock_db_manager, mock_broker):
"""
Test that a pegged order older than PEGGED_ORDER_CANCEL_AFTER is canceled
and a new limit order is placed at the mid price.
"""
old_timestamp = datetime.utcnow() - timedelta(seconds=PEGGED_ORDER_CANCEL_AFTER + 1)
pegged_order = Trade(
id=1,
broker="dummy_broker",
broker_id="123",
symbol="AAPL",
quantity=10,
side="buy",
strategy="test_strategy",
timestamp=old_timestamp,
status="open",
execution_style="pegged"
)

# Mock placing a new order after cancellation
# Assume place_order returns a Trade object or something similar
order_manager.brokers['dummy_broker'].place_order = AsyncMock()

await order_manager.reconcile_order(pegged_order)

# The pegged order should be canceled
mock_broker.cancel_order.assert_called_once_with("123")
# The status should be updated to 'cancelled'
mock_db_manager.update_trade_status.assert_called_once_with(1, "cancelled")

# A new order should be placed using the mid_price (mocked as 100.00)
order_manager.brokers['dummy_broker'].place_order.assert_called_once()
args, kwargs = order_manager.brokers['dummy_broker'].place_order.call_args
assert kwargs['symbol'] == 'AAPL'
assert kwargs['quantity'] == 10
assert kwargs['side'] == 'buy'
# TODO: Check that the price is the mid price
# (need to mock the mid price func return)
# assert kwargs['price'] == 100.00
assert kwargs['order_type'] == 'limit'
assert kwargs['execution_style'] == 'pegged'


@pytest.mark.asyncio
async def test_reconcile_order_pegged_not_expired(order_manager, mock_db_manager, mock_broker):
"""
Test that a pegged order that is not yet expired does not get cancelled
and no new order is placed.
"""
recent_timestamp = datetime.utcnow() - timedelta(seconds=PEGGED_ORDER_CANCEL_AFTER - 5)
pegged_order = Trade(
id=1,
broker="dummy_broker",
broker_id="123",
symbol="AAPL",
quantity=10,
side="buy",
strategy="test_strategy",
timestamp=recent_timestamp,
status="open",
execution_style="pegged"
)

order_manager.place_order = AsyncMock()

await order_manager.reconcile_order(pegged_order)

# The pegged order should not be cancelled or replaced because it's not old enough
mock_broker.cancel_order.assert_not_called()
mock_db_manager.update_trade_status.assert_not_called()
order_manager.place_order.assert_not_called()


@pytest.mark.asyncio
async def test_reconcile_order_with_execution_style(order_manager, mock_db_manager, mock_broker):
"""
Test that when an order with a specific execution_style (other than pegged)
is reconciled and not stale, not filled, nothing else changes.
This ensures execution_style does not break existing logic.
"""
recent_order = Trade(
id=2,
broker="dummy_broker",
broker_id="456",
symbol="TSLA",
quantity=5,
side="sell",
strategy="test_strategy",
timestamp=datetime.utcnow(),
status="open",
execution_style="some_custom_style"
)
mock_broker.is_order_filled.return_value = False

await order_manager.reconcile_order(recent_order)

# Verify that no changes are made for an open order that is not stale and not filled
mock_db_manager.set_trade_filled.assert_not_called()
mock_broker.update_positions.assert_not_called()
mock_broker.cancel_order.assert_not_called()
2 changes: 1 addition & 1 deletion tests/test_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ async def skip_test_fetch_current_db_positions(strategy):
async def test_place_order(mock_iscoroutinefunction, mock_is_market_open, strategy):
strategy.broker.place_order = AsyncMock()
await strategy.place_order('AAPL', 10, 'buy', 150)
strategy.broker.place_order.assert_called_once_with('AAPL', 10, 'buy', strategy.strategy_name, 150, 'limit')
strategy.broker.place_order.assert_called_once_with('AAPL', 10, 'buy', strategy.strategy_name, 150, 'limit', execution_style='')
3 changes: 2 additions & 1 deletion utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ def load_custom_strategy(broker, strategy_name, config):
class_name = config['class_name']
starting_capital = config['starting_capital']
rebalance_interval_minutes = config['rebalance_interval_minutes']
execution_style = config.get('execution_style', '')
strategy_class = load_strategy_class(file_path, class_name)
logger.info(f"Initializing custom strategy '{class_name}' with config: {config}")
return strategy_class(broker, strategy_name, starting_capital, rebalance_interval_minutes, **config.get('strategy_params', {}))
return strategy_class(broker, strategy_name, starting_capital, rebalance_interval_minutes, execution_style, **config.get('strategy_params', {}))
except Exception as e:
logger.error(f"Error initializing custom strategy '{config['class_name']}': {e}")
raise
Expand Down

0 comments on commit 167a106

Please sign in to comment.