Skip to content

Commit

Permalink
Refactor modules
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin-Molinero committed Feb 23, 2024
1 parent a031b02 commit 0d1a7d5
Show file tree
Hide file tree
Showing 26 changed files with 367 additions and 604 deletions.
11 changes: 6 additions & 5 deletions lean/commands/backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from lean.models.api import QCMinimalOrganization
from lean.models.utils import DebuggingMethod
from lean.models.logger import Option
from lean.models.data_providers import QuantConnectDataProvider, all_data_providers, DataProvider
from lean.models.cli import cli_data_downloaders
from lean.components.util.json_modules_handler import build_and_configure_modules, get_and_build_module
from lean.models.click_options import options_from_json, get_configs_for_options

Expand Down Expand Up @@ -252,7 +252,7 @@ def _select_organization() -> QCMinimalOrganization:
type=Choice(["pycharm", "ptvsd", "vsdbg", "rider", "local-platform"], case_sensitive=False),
help="Enable a certain debugging method (see --help for more information)")
@option("--data-provider-historical",
type=Choice([dp.get_name() for dp in all_data_providers], case_sensitive=False),
type=Choice([dp.get_name() for dp in cli_data_downloaders], case_sensitive=False),
default="Local",
help="Update the Lean configuration file to retrieve data from the given historical provider")
@options_from_json(get_configs_for_options("backtest"))
Expand Down Expand Up @@ -363,14 +363,15 @@ def backtest(project: Path,
lean_config = lean_config_manager.get_complete_lean_config("backtesting", algorithm_file, debugging_method)

if download_data:
data_provider_historical = QuantConnectDataProvider.get_name()
data_provider_historical = "QuantConnect"

organization_id = container.organization_manager.try_get_working_organization_id()

if data_provider_historical is not None:
data_provider_configurer: DataProvider = get_and_build_module(data_provider_historical, all_data_providers, kwargs, logger)
data_provider_configurer = get_and_build_module(data_provider_historical, cli_data_downloaders, kwargs, logger)
data_provider_configurer.ensure_module_installed(organization_id)
data_provider_configurer.configure(lean_config, "backtesting")
lean_config.update(data_provider_configurer.get_settings())
#data_provider_configurer.configure(lean_config, "backtesting")

lean_config_manager.configure_data_purchase_limit(lean_config, data_purchase_limit)

Expand Down
113 changes: 64 additions & 49 deletions lean/commands/cloud/live/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional
from click import prompt, option, argument, Choice, confirm
from lean.click import LeanCommand, ensure_options
from lean.components.api.api_client import APIClient
from lean.components.util.json_modules_handler import get_and_build_module
from lean.components.util.logger import Logger
from lean.container import container
from lean.models.api import (QCEmailNotificationMethod, QCNode, QCNotificationMethod, QCSMSNotificationMethod,
QCWebhookNotificationMethod, QCTelegramNotificationMethod, QCProject)
from lean.models.json_module import LiveInitialStateInput
from lean.models.json_module import LiveInitialStateInput, JsonModule
from lean.models.logger import Option
from lean.models.brokerages.cloud.cloud_brokerage import CloudBrokerage
from lean.models.configuration import InternalInputUserInput
from lean.models.click_options import options_from_json, get_configs_for_options
from lean.models.brokerages.cloud import all_cloud_brokerages, cloud_brokerage_data_feeds
from lean.models.cloud import cloud_brokerages, cloud_history_provider, cloud_data_queue_handlers
from lean.commands.cloud.live.live import live
from lean.components.util.live_utils import get_last_portfolio_cash_holdings, configure_initial_cash_balance, configure_initial_holdings,\
_configure_initial_cash_interactively, _configure_initial_holdings_interactively


def _log_notification_methods(methods: List[QCNotificationMethod]) -> None:
"""Logs a list of notification methods."""
logger = container.logger
Expand Down Expand Up @@ -98,7 +98,7 @@ def _prompt_notification_method() -> QCNotificationMethod:
return QCSMSNotificationMethod(phoneNumber=phone_number)


def _configure_brokerage(lean_config: Dict[str, Any], logger: Logger, user_provided_options: Dict[str, Any], show_secrets: bool) -> CloudBrokerage:
def _configure_brokerage(lean_config: Dict[str, Any], logger: Logger, user_provided_options: Dict[str, Any], show_secrets: bool) -> JsonModule:
"""Interactively configures the brokerage to use.
:param lean_config: the LEAN configuration that should be used
Expand All @@ -107,28 +107,31 @@ def _configure_brokerage(lean_config: Dict[str, Any], logger: Logger, user_provi
:param show_secrets: whether to show secrets on input
:return: the cloud brokerage the user configured
"""
brokerage_options = [Option(id=b, label=b.get_name()) for b in all_cloud_brokerages]
brokerage_options = [Option(id=b, label=b.get_name()) for b in cloud_brokerages]
return logger.prompt_list("Select a brokerage", brokerage_options).build(lean_config,
logger,
user_provided_options,
hide_input=not show_secrets)

def _configure_data_feed(brokerage: CloudBrokerage, logger: Logger) -> None:
"""Configures the live data provider to use based on the brokerage given.

:param brokerage: the cloud brokerage
def _configure_data_feed(lean_config: Dict[str, Any], logger: Logger, user_provided_options: Dict[str, Any], show_secrets: bool) -> [JsonModule]:
"""Configures the live data provider to use.
:param lean_config: the LEAN configuration that should be used
:param logger: the logger to use
:param user_provided_options: the dictionary containing user provided options
:param show_secrets: whether to show secrets on input
:return: the cloud live data provider the user configured
"""
if len(cloud_brokerage_data_feeds[brokerage]) != 0:
data_feed_selected = logger.prompt_list("Select a live data provider", [
Option(id=data_feed, label=data_feed) for data_feed in cloud_brokerage_data_feeds[brokerage]
], multiple=False)
data_feed_property_name = [name for name in brokerage.get_required_properties([InternalInputUserInput]) if ("data-feed" in name)]
data_feed_property_name = data_feed_property_name[0] if len(data_feed_property_name) != 0 else ""
brokerage.update_value_for_given_config(data_feed_property_name, data_feed_selected)
data_feeds_options = [Option(id=b, label=b.get_name()) for b in cloud_data_queue_handlers]

data_feeds = logger.prompt_list("Select a data feed", data_feeds_options, multiple=True)
for data_feed in data_feeds:
data_feed.build(lean_config, logger, user_provided_options, hide_input=not show_secrets)
return data_feeds


def _configure_live_node(logger: Logger, api_client: APIClient, cloud_project: QCProject) -> QCNode:
def _configure_live_node(node: str, logger: Logger, api_client: APIClient, cloud_project: QCProject) -> QCNode:
"""Interactively configures the live node to use.
:param logger: the logger to use
Expand All @@ -137,6 +140,15 @@ def _configure_live_node(logger: Logger, api_client: APIClient, cloud_project: Q
:return: the live node the user wants to start live trading on
"""
nodes = api_client.nodes.get_all(cloud_project.organizationId)
if node is not None:
live_node = next((n for n in nodes.live if n.id == node or n.name == node), None)

if live_node is None:
raise RuntimeError(f"You have no live node with name or id '{node}'")

if live_node.busy:
raise RuntimeError(f"The live node named '{live_node.name}' is already in use by '{live_node.usedBy}'")
return live_node

live_nodes = [node for node in nodes.live if not node.busy]
if len(live_nodes) == 0:
Expand Down Expand Up @@ -187,8 +199,12 @@ def _configure_auto_restart(logger: Logger) -> bool:
@live.command(cls=LeanCommand, default_command=True, name="deploy")
@argument("project", type=str)
@option("--brokerage",
type=Choice([b.get_name() for b in all_cloud_brokerages], case_sensitive=False),
type=Choice([b.get_name() for b in cloud_brokerages], case_sensitive=False),
help="The brokerage to use")
@option("--data-provider-live",
type=Choice([d.get_name() for d in cloud_data_queue_handlers], case_sensitive=False),
multiple=True,
help="The live data provider to use")
@options_from_json(get_configs_for_options("live-cloud"))
@option("--node", type=str, help="The name or id of the live node to run on")
@option("--auto-restart", type=bool, help="Whether automatic algorithm restarting must be enabled")
Expand All @@ -213,14 +229,15 @@ def _configure_auto_restart(logger: Logger) -> bool:
@option("--push",
is_flag=True,
default=False,
help="Push local modifications to the cloud before starting live trading")
help="Push cli modifications to the cloud before starting live trading")
@option("--open", "open_browser",
is_flag=True,
default=False,
help="Automatically open the live results in the browser once the deployment starts")
@option("--show-secrets", is_flag=True, show_default=True, default=False, help="Show secrets as they are input")
def deploy(project: str,
brokerage: str,
data_provider_live: Optional[str],
node: str,
auto_restart: bool,
notify_order_events: Optional[bool],
Expand Down Expand Up @@ -254,32 +271,12 @@ def deploy(project: str,
cloud_runner = container.cloud_runner
finished_compile = cloud_runner.compile_project(cloud_project)

live_data_provider_settings = {}

if brokerage is not None:
ensure_options(["brokerage", "node", "auto_restart", "notify_order_events", "notify_insights"])

brokerage_instance = None
[brokerage_instance] = [cloud_brokerage for cloud_brokerage in all_cloud_brokerages if cloud_brokerage.get_name() == brokerage]
# update essential properties from brokerage to datafeed
# needs to be updated before fetching required properties
essential_properties = [brokerage_instance.convert_lean_key_to_variable(prop) for prop in brokerage_instance.get_essential_properties()]
ensure_options(essential_properties)
essential_properties_value = {brokerage_instance.convert_variable_to_lean_key(prop) : kwargs[prop] for prop in essential_properties}
brokerage_instance.update_configs(essential_properties_value)
# now required properties can be fetched as per historical data provider from essential properties
required_properties = [brokerage_instance.convert_lean_key_to_variable(prop) for prop in brokerage_instance.get_required_properties([InternalInputUserInput])]
ensure_options(required_properties)
required_properties_value = {brokerage_instance.convert_variable_to_lean_key(prop) : kwargs[prop] for prop in required_properties}
brokerage_instance.update_configs(required_properties_value)

all_nodes = api_client.nodes.get_all(cloud_project.organizationId)
live_node = next((n for n in all_nodes.live if n.id == node or n.name == node), None)

if live_node is None:
raise RuntimeError(f"You have no live node with name or id '{node}'")

if live_node.busy:
raise RuntimeError(f"The live node named '{live_node.name}' is already in use by '{live_node.usedBy}'")

brokerage_instance = get_and_build_module(brokerage, cloud_brokerages, kwargs, logger)
notify_methods = []

if notify_emails is not None:
Expand Down Expand Up @@ -320,10 +317,10 @@ def deploy(project: str,
raise RuntimeError(f"Custom portfolio holdings setting is not available for {brokerage_instance.get_name()}")

else:
logger.debug(f'Deploy(): interactive configuration start')
lean_config = container.lean_config_manager.get_lean_config()
brokerage_instance = _configure_brokerage(lean_config, logger, kwargs, show_secrets=show_secrets)
_configure_data_feed(brokerage_instance, logger)
live_node = _configure_live_node(logger, api_client, cloud_project)
logger.debug(f'Deploy(): finished brokerage configuration, settings: {brokerage_instance.get_settings()}')
notify_order_events, notify_insights, notify_methods = _configure_notifications(logger)
auto_restart = _configure_auto_restart(logger)
cash_balance_option, holdings_option, last_cash, last_holdings = get_last_portfolio_cash_holdings(api_client, brokerage_instance, cloud_project.projectId, project)
Expand All @@ -332,15 +329,33 @@ def deploy(project: str,
if holdings_option != LiveInitialStateInput.NotSupported:
live_holdings = _configure_initial_holdings_interactively(logger, holdings_option, last_holdings)

live_node = _configure_live_node(node, logger, api_client, cloud_project)

if data_provider_live is not None and len(data_provider_live) > 0:
# the user sent the live data provider to use
for data_provider in data_provider_live:
data_provider_instance = get_and_build_module(data_provider, cloud_data_queue_handlers, kwargs, logger)

live_data_provider_settings.update({data_provider_instance.get_id(): data_provider_instance.get_settings()})

logger.debug(f'Configuration(\'{data_provider}\'): Settings: {str(live_data_provider_settings)}')
else:
# let's ask the user which live data providers to use
lean_config = container.lean_config_manager.get_lean_config()
data_feed_instances = _configure_data_feed(lean_config, logger, kwargs, show_secrets=show_secrets)
for data_feed in data_feed_instances:
settings = data_feed.get_settings()
logger.debug(f'Deploy(): finished data feed configuration, settings: {settings}')

live_data_provider_settings.update({data_feed.get_id(): settings})

brokerage_settings = brokerage_instance.get_settings()
price_data_handler = brokerage_instance.get_price_data_handler()

logger.info(f"Brokerage: {brokerage_instance.get_name()}")
logger.info(f"Brokerage: {brokerage_settings}")
logger.info(f"Project id: {cloud_project.projectId}")
logger.info(f"Environment: {brokerage_settings['environment'].title()}")
logger.info(f"Server name: {live_node.name}")
logger.info(f"Server type: {live_node.sku}")
logger.info(f"Live data provider: {price_data_handler.replace('Handler', '')}")
logger.info(f"Live data providers: {live_data_provider_settings}")
logger.info(f"LEAN version: {cloud_project.leanVersionId}")
logger.info(f"Order event notifications: {'Yes' if notify_order_events else 'No'}")
logger.info(f"Insight notifications: {'Yes' if notify_insights else 'No'}")
Expand All @@ -361,7 +376,7 @@ def deploy(project: str,
finished_compile.compileId,
live_node.id,
brokerage_settings,
price_data_handler,
live_data_provider_settings,
auto_restart,
cloud_project.leanVersionId,
notify_order_events,
Expand Down
8 changes: 2 additions & 6 deletions lean/commands/cloud/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
from lean.click import LeanCommand
from lean.container import container
from lean.models.api import QCLiveAlgorithmStatus
from lean.models.brokerages.cloud import all_cloud_brokerages, PaperTradingBrokerage

from lean.models.brokerage import cloud_brokerages

@command(cls=LeanCommand)
@argument("project", type=str)
Expand Down Expand Up @@ -49,12 +48,9 @@ def status(project: str) -> None:
QCLiveAlgorithmStatus.LoggingIn: "Logging in"
}.get(live_algorithm.status, live_algorithm.status.value)

brokerage_name = next((b.get_name() for b in all_cloud_brokerages if b.get_id() == live_algorithm.brokerage),
brokerage_name = next((b.get_name() for b in cloud_brokerages if b.get_id() == live_algorithm.brokerage),
live_algorithm.brokerage)

if brokerage_name == "PaperBrokerage":
brokerage_name = PaperTradingBrokerage.get_name()

logger.info(f"Live status: {live_status}")
logger.info(f"Live id: {live_algorithm.deployId}")
logger.info(f"Live url: {live_algorithm.get_url()}")
Expand Down
Loading

0 comments on commit 0d1a7d5

Please sign in to comment.