Skip to content

Commit

Permalink
Refactor modules
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin-Molinero committed Feb 26, 2024
1 parent a031b02 commit 9143472
Show file tree
Hide file tree
Showing 27 changed files with 423 additions and 790 deletions.
13 changes: 7 additions & 6 deletions lean/commands/backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from lean.models.api import QCMinimalOrganization
from lean.models.utils import DebuggingMethod
from lean.models.logger import Option
from lean.models.data_providers import QuantConnectDataProvider, all_data_providers, DataProvider
from lean.components.util.json_modules_handler import build_and_configure_modules, get_and_build_module
from lean.models.cli import cli_data_downloaders
from lean.components.util.json_modules_handler import build_and_configure_modules, non_interactive_config_build
from lean.models.click_options import options_from_json, get_configs_for_options

# The _migrate_* methods automatically update launch configurations for a given debugging method.
Expand Down Expand Up @@ -252,7 +252,7 @@ def _select_organization() -> QCMinimalOrganization:
type=Choice(["pycharm", "ptvsd", "vsdbg", "rider", "local-platform"], case_sensitive=False),
help="Enable a certain debugging method (see --help for more information)")
@option("--data-provider-historical",
type=Choice([dp.get_name() for dp in all_data_providers], case_sensitive=False),
type=Choice([dp.get_name() for dp in cli_data_downloaders], case_sensitive=False),
default="Local",
help="Update the Lean configuration file to retrieve data from the given historical provider")
@options_from_json(get_configs_for_options("backtest"))
Expand Down Expand Up @@ -363,14 +363,15 @@ def backtest(project: Path,
lean_config = lean_config_manager.get_complete_lean_config("backtesting", algorithm_file, debugging_method)

if download_data:
data_provider_historical = QuantConnectDataProvider.get_name()
data_provider_historical = "QuantConnect"

organization_id = container.organization_manager.try_get_working_organization_id()

if data_provider_historical is not None:
data_provider_configurer: DataProvider = get_and_build_module(data_provider_historical, all_data_providers, kwargs, logger)
data_provider_configurer = non_interactive_config_build(data_provider_historical, cli_data_downloaders, kwargs, logger)
data_provider_configurer.ensure_module_installed(organization_id)
data_provider_configurer.configure(lean_config, "backtesting")
lean_config.update(data_provider_configurer.get_settings())
#data_provider_configurer.configure(lean_config, "backtesting")

lean_config_manager.configure_data_purchase_limit(lean_config, data_purchase_limit)

Expand Down
123 changes: 53 additions & 70 deletions lean/commands/cloud/live/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional
from typing import List, Tuple, Optional
from click import prompt, option, argument, Choice, confirm
from lean.click import LeanCommand, ensure_options
from lean.components.api.api_client import APIClient
from lean.components.util.json_modules_handler import non_interactive_config_build, \
non_interactive_config_build_for_name, save_settings
from lean.components.util.logger import Logger
from lean.container import container
from lean.models.api import (QCEmailNotificationMethod, QCNode, QCNotificationMethod, QCSMSNotificationMethod,
QCWebhookNotificationMethod, QCTelegramNotificationMethod, QCProject)
from lean.models.json_module import LiveInitialStateInput
from lean.models.logger import Option
from lean.models.brokerages.cloud.cloud_brokerage import CloudBrokerage
from lean.models.configuration import InternalInputUserInput
from lean.models.click_options import options_from_json, get_configs_for_options
from lean.models.brokerages.cloud import all_cloud_brokerages, cloud_brokerage_data_feeds
from lean.models.cloud import cloud_brokerages, cloud_history_provider, cloud_data_queue_handlers
from lean.commands.cloud.live.live import live
from lean.commands.deploy import interactive_data_queue_handlers_config, interactive_brokerage_config
from lean.components.util.live_utils import get_last_portfolio_cash_holdings, configure_initial_cash_balance, configure_initial_holdings,\
_configure_initial_cash_interactively, _configure_initial_holdings_interactively


def _log_notification_methods(methods: List[QCNotificationMethod]) -> None:
"""Logs a list of notification methods."""
logger = container.logger
Expand Down Expand Up @@ -98,37 +99,7 @@ def _prompt_notification_method() -> QCNotificationMethod:
return QCSMSNotificationMethod(phoneNumber=phone_number)


def _configure_brokerage(lean_config: Dict[str, Any], logger: Logger, user_provided_options: Dict[str, Any], show_secrets: bool) -> CloudBrokerage:
"""Interactively configures the brokerage to use.
:param lean_config: the LEAN configuration that should be used
:param logger: the logger to use
:param user_provided_options: the dictionary containing user provided options
:param show_secrets: whether to show secrets on input
:return: the cloud brokerage the user configured
"""
brokerage_options = [Option(id=b, label=b.get_name()) for b in all_cloud_brokerages]
return logger.prompt_list("Select a brokerage", brokerage_options).build(lean_config,
logger,
user_provided_options,
hide_input=not show_secrets)

def _configure_data_feed(brokerage: CloudBrokerage, logger: Logger) -> None:
"""Configures the live data provider to use based on the brokerage given.
:param brokerage: the cloud brokerage
:param logger: the logger to use
"""
if len(cloud_brokerage_data_feeds[brokerage]) != 0:
data_feed_selected = logger.prompt_list("Select a live data provider", [
Option(id=data_feed, label=data_feed) for data_feed in cloud_brokerage_data_feeds[brokerage]
], multiple=False)
data_feed_property_name = [name for name in brokerage.get_required_properties([InternalInputUserInput]) if ("data-feed" in name)]
data_feed_property_name = data_feed_property_name[0] if len(data_feed_property_name) != 0 else ""
brokerage.update_value_for_given_config(data_feed_property_name, data_feed_selected)


def _configure_live_node(logger: Logger, api_client: APIClient, cloud_project: QCProject) -> QCNode:
def _configure_live_node(node: str, logger: Logger, api_client: APIClient, cloud_project: QCProject) -> QCNode:
"""Interactively configures the live node to use.
:param logger: the logger to use
Expand All @@ -137,6 +108,15 @@ def _configure_live_node(logger: Logger, api_client: APIClient, cloud_project: Q
:return: the live node the user wants to start live trading on
"""
nodes = api_client.nodes.get_all(cloud_project.organizationId)
if node is not None:
live_node = next((n for n in nodes.live if n.id == node or n.name == node), None)

if live_node is None:
raise RuntimeError(f"You have no live node with name or id '{node}'")

if live_node.busy:
raise RuntimeError(f"The live node named '{live_node.name}' is already in use by '{live_node.usedBy}'")
return live_node

live_nodes = [node for node in nodes.live if not node.busy]
if len(live_nodes) == 0:
Expand Down Expand Up @@ -187,8 +167,12 @@ def _configure_auto_restart(logger: Logger) -> bool:
@live.command(cls=LeanCommand, default_command=True, name="deploy")
@argument("project", type=str)
@option("--brokerage",
type=Choice([b.get_name() for b in all_cloud_brokerages], case_sensitive=False),
type=Choice([b.get_name() for b in cloud_brokerages], case_sensitive=False),
help="The brokerage to use")
@option("--data-provider-live",
type=Choice([d.get_name() for d in cloud_data_queue_handlers], case_sensitive=False),
multiple=True,
help="The live data provider to use")
@options_from_json(get_configs_for_options("live-cloud"))
@option("--node", type=str, help="The name or id of the live node to run on")
@option("--auto-restart", type=bool, help="Whether automatic algorithm restarting must be enabled")
Expand All @@ -213,14 +197,15 @@ def _configure_auto_restart(logger: Logger) -> bool:
@option("--push",
is_flag=True,
default=False,
help="Push local modifications to the cloud before starting live trading")
help="Push cli modifications to the cloud before starting live trading")
@option("--open", "open_browser",
is_flag=True,
default=False,
help="Automatically open the live results in the browser once the deployment starts")
@option("--show-secrets", is_flag=True, show_default=True, default=False, help="Show secrets as they are input")
def deploy(project: str,
brokerage: str,
data_provider_live: Optional[str],
node: str,
auto_restart: bool,
notify_order_events: Optional[bool],
Expand Down Expand Up @@ -254,32 +239,13 @@ def deploy(project: str,
cloud_runner = container.cloud_runner
finished_compile = cloud_runner.compile_project(cloud_project)

live_data_provider_settings = {}
lean_config = container.lean_config_manager.get_lean_config()

if brokerage is not None:
ensure_options(["brokerage", "node", "auto_restart", "notify_order_events", "notify_insights"])

brokerage_instance = None
[brokerage_instance] = [cloud_brokerage for cloud_brokerage in all_cloud_brokerages if cloud_brokerage.get_name() == brokerage]
# update essential properties from brokerage to datafeed
# needs to be updated before fetching required properties
essential_properties = [brokerage_instance.convert_lean_key_to_variable(prop) for prop in brokerage_instance.get_essential_properties()]
ensure_options(essential_properties)
essential_properties_value = {brokerage_instance.convert_variable_to_lean_key(prop) : kwargs[prop] for prop in essential_properties}
brokerage_instance.update_configs(essential_properties_value)
# now required properties can be fetched as per historical data provider from essential properties
required_properties = [brokerage_instance.convert_lean_key_to_variable(prop) for prop in brokerage_instance.get_required_properties([InternalInputUserInput])]
ensure_options(required_properties)
required_properties_value = {brokerage_instance.convert_variable_to_lean_key(prop) : kwargs[prop] for prop in required_properties}
brokerage_instance.update_configs(required_properties_value)

all_nodes = api_client.nodes.get_all(cloud_project.organizationId)
live_node = next((n for n in all_nodes.live if n.id == node or n.name == node), None)

if live_node is None:
raise RuntimeError(f"You have no live node with name or id '{node}'")

if live_node.busy:
raise RuntimeError(f"The live node named '{live_node.name}' is already in use by '{live_node.usedBy}'")

brokerage_instance = non_interactive_config_build_for_name(brokerage, cloud_brokerages, kwargs, logger)
notify_methods = []

if notify_emails is not None:
Expand Down Expand Up @@ -320,10 +286,9 @@ def deploy(project: str,
raise RuntimeError(f"Custom portfolio holdings setting is not available for {brokerage_instance.get_name()}")

else:
lean_config = container.lean_config_manager.get_lean_config()
brokerage_instance = _configure_brokerage(lean_config, logger, kwargs, show_secrets=show_secrets)
_configure_data_feed(brokerage_instance, logger)
live_node = _configure_live_node(logger, api_client, cloud_project)
brokerage_instance = interactive_brokerage_config(lean_config, cloud_brokerages,
logger, kwargs, show_secrets=show_secrets)

notify_order_events, notify_insights, notify_methods = _configure_notifications(logger)
auto_restart = _configure_auto_restart(logger)
cash_balance_option, holdings_option, last_cash, last_holdings = get_last_portfolio_cash_holdings(api_client, brokerage_instance, cloud_project.projectId, project)
Expand All @@ -332,15 +297,33 @@ def deploy(project: str,
if holdings_option != LiveInitialStateInput.NotSupported:
live_holdings = _configure_initial_holdings_interactively(logger, holdings_option, last_holdings)

live_node = _configure_live_node(node, logger, api_client, cloud_project)

if data_provider_live is not None and len(data_provider_live) > 0:
# the user sent the live data provider to use
for data_provider in data_provider_live:
data_provider_instance = non_interactive_config_build(data_provider, cloud_data_queue_handlers, kwargs, logger)

live_data_provider_settings.update({data_provider_instance.get_id(): data_provider_instance.get_settings()})
else:
# let's ask the user which live data providers to use
data_feed_instances = interactive_data_queue_handlers_config(lean_config, cloud_data_queue_handlers, logger, kwargs, show_secrets=show_secrets)
for data_feed in data_feed_instances:
settings = data_feed.get_settings()

live_data_provider_settings.update({data_feed.get_id(): settings})

brokerage_settings = brokerage_instance.get_settings()
price_data_handler = brokerage_instance.get_price_data_handler()

logger.info(f"Brokerage: {brokerage_instance.get_name()}")
# save them for next time
save_settings(live_data_provider_settings)
save_settings(brokerage_settings)

logger.info(f"Brokerage: {brokerage_settings}")
logger.info(f"Project id: {cloud_project.projectId}")
logger.info(f"Environment: {brokerage_settings['environment'].title()}")
logger.info(f"Server name: {live_node.name}")
logger.info(f"Server type: {live_node.sku}")
logger.info(f"Live data provider: {price_data_handler.replace('Handler', '')}")
logger.info(f"Live data providers: {live_data_provider_settings}")
logger.info(f"LEAN version: {cloud_project.leanVersionId}")
logger.info(f"Order event notifications: {'Yes' if notify_order_events else 'No'}")
logger.info(f"Insight notifications: {'Yes' if notify_insights else 'No'}")
Expand All @@ -361,7 +344,7 @@ def deploy(project: str,
finished_compile.compileId,
live_node.id,
brokerage_settings,
price_data_handler,
live_data_provider_settings,
auto_restart,
cloud_project.leanVersionId,
notify_order_events,
Expand Down
8 changes: 2 additions & 6 deletions lean/commands/cloud/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
from lean.click import LeanCommand
from lean.container import container
from lean.models.api import QCLiveAlgorithmStatus
from lean.models.brokerages.cloud import all_cloud_brokerages, PaperTradingBrokerage

from lean.models.brokerage import cloud_brokerages

@command(cls=LeanCommand)
@argument("project", type=str)
Expand Down Expand Up @@ -49,12 +48,9 @@ def status(project: str) -> None:
QCLiveAlgorithmStatus.LoggingIn: "Logging in"
}.get(live_algorithm.status, live_algorithm.status.value)

brokerage_name = next((b.get_name() for b in all_cloud_brokerages if b.get_id() == live_algorithm.brokerage),
brokerage_name = next((b.get_name() for b in cloud_brokerages if b.get_id() == live_algorithm.brokerage),
live_algorithm.brokerage)

if brokerage_name == "PaperBrokerage":
brokerage_name = PaperTradingBrokerage.get_name()

logger.info(f"Live status: {live_status}")
logger.info(f"Live id: {live_algorithm.deployId}")
logger.info(f"Live url: {live_algorithm.get_url()}")
Expand Down
60 changes: 60 additions & 0 deletions lean/commands/deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, Any

from lean.models.logger import Option
from lean.models.json_module import JsonModule
from lean.components.util.logger import Logger


def interactive_data_queue_handlers_config(lean_config: Dict[str, Any], models: [JsonModule], logger: Logger,
user_provided_options: Dict[str, Any], show_secrets: bool) -> [JsonModule]:
"""Configures the live data provider to use.
:param lean_config: the LEAN configuration that should be used
:param models: the modules to choose from
:param logger: the logger to use
:param user_provided_options: the dictionary containing user provided options
:param show_secrets: whether to show secrets on input
:return: the live data providers the user configured
"""
data_feeds_options = [Option(id=b, label=b.get_name()) for b in models]

data_feeds: [JsonModule] = logger.prompt_list("Select a live data feed", data_feeds_options, multiple=True)
for data_feed in data_feeds:
data_feed.interactive_config_build(lean_config, logger, user_provided_options, hide_input=not show_secrets)

logger.debug(f'interactive_data_queue_handlers_config(\'{data_feed}\'): Settings: {data_feed.get_settings()}')

return data_feeds


def interactive_brokerage_config(lean_config: Dict[str, Any], models: [JsonModule], logger: Logger,
user_provided_options: Dict[str, Any], show_secrets: bool) -> JsonModule:
"""Interactively configures the brokerage to use.
:param lean_config: the LEAN configuration that should be used
:param models: the modules to choose from
:param logger: the logger to use
:param user_provided_options: the dictionary containing user provided options
:param show_secrets: whether to show secrets on input
:return: the brokerage the user configured
"""
brokerage_options = [Option(id=b, label=b.get_name()) for b in models]
brokerage: JsonModule = logger.prompt_list("Select a brokerage", brokerage_options)
brokerage.interactive_config_build(lean_config, logger, user_provided_options, hide_input=not show_secrets)

logger.debug(f'interactive_brokerage_config(\'{brokerage}\'): Settings: {brokerage.get_settings()}')

return brokerage
Loading

0 comments on commit 9143472

Please sign in to comment.