diff --git a/lean/commands/__init__.py b/lean/commands/__init__.py index d26e0a4b..dfa40a41 100644 --- a/lean/commands/__init__.py +++ b/lean/commands/__init__.py @@ -33,6 +33,7 @@ from lean.commands.whoami import whoami from lean.commands.gui import gui from lean.commands.object_store import object_store +from lean.commands.private_cloud import private_cloud lean.add_command(config) lean.add_command(cloud) @@ -55,3 +56,4 @@ lean.add_command(logs) lean.add_command(gui) lean.add_command(object_store) +lean.add_command(private_cloud) diff --git a/lean/commands/private_cloud/__init__.py b/lean/commands/private_cloud/__init__.py new file mode 100644 index 00000000..2f042bfd --- /dev/null +++ b/lean/commands/private_cloud/__init__.py @@ -0,0 +1,29 @@ +# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. +# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from click import group + +from lean.commands.private_cloud.start import start +from lean.commands.private_cloud.stop import stop + + +@group() +def private_cloud() -> None: + """Interact with a QuantConnect private cloud.""" + # This method is intentionally empty + # It is used as the command group for all `lean private-cloud ` commands + pass + + +private_cloud.add_command(start) +private_cloud.add_command(stop) diff --git a/lean/commands/private_cloud/start.py b/lean/commands/private_cloud/start.py new file mode 100644 index 00000000..171dcb7a --- /dev/null +++ b/lean/commands/private_cloud/start.py @@ -0,0 +1,180 @@ +# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. +# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import Optional +from json import loads + +from click import command, option +from docker.errors import APIError +from docker.types import Mount + +from lean.click import LeanCommand +from lean.commands.private_cloud.stop import get_private_cloud_containers +from lean.container import container +from lean.models.cli import cli_compute +from lean.models.docker import DockerImage +from lean.constants import COMPUTE_MASTER, COMPUTE_SLAVE, COMPUTE_MESSAGING + + +def get_free_port(): + from socket import socket + for i in range(0, 3): + try: + port = 32787 + i + with socket() as s: + s.bind(('', port)) + return port + except: + pass + return 0 + + +def deploy(ip: str, port: int, token: str, slave: bool, update: bool, no_update: bool, + image: str, lean_config: dict, extra_docker_config: str, counter: int = 0): + logger = container.logger + + compute_node_name = f"{COMPUTE_SLAVE}{counter}" if slave else COMPUTE_MASTER + logger.info(f"Starting {compute_node_name}...") + compute_directory = Path(f"~/.lean/compute/{compute_node_name}").expanduser() + lean_config["node-name"] = compute_node_name + run_options = container.lean_runner.get_basic_docker_config_without_algo(lean_config, None, True, None, None, + None, compute_directory) + run_options["mounts"].append(Mount(target="/QuantConnect/platform-services/airlock", + source=str(compute_directory), type="bind")) + run_options["mounts"].append(Mount(target="/var/run/docker.sock", source="/var/run/docker.sock", + type="bind", read_only=True)) + docker_config_source = Path("~/.docker/config.json").expanduser() + run_options["mounts"].append(Mount(target="/root/.docker/config.json", source=str(docker_config_source), + type="bind", read_only=True)) + container.lean_runner.parse_extra_docker_config(run_options, loads(extra_docker_config)) + + if not slave: + run_options["ports"]["9696"] = str(port) + run_options["ports"]["9697"] = str(get_free_port()) + + root_directory = container.lean_config_manager.get_cli_root_directory() + run_options["volumes"][str(root_directory)] = {"bind": "/LeanCLIWorkspace", "mode": "rw"} + + run_options["remove"] = False + run_options["name"] = compute_node_name + run_options["environment"]["MODE"] = str('slave') if slave else str('master') + run_options["environment"]["IP"] = str(ip) + run_options["environment"]["PORT"] = str(port) + run_options["environment"]["TOKEN"] = str(token) + run_options["user"] = "root" + run_options["restart_policy"] = {"Name": "always"} + + if not image: + image = "quantconnect/platform-services:latest" + docker_image = DockerImage.parse(image) + container.update_manager.pull_docker_image_if_necessary(docker_image, update, no_update) + try: + container.docker_manager.run_image(image, **run_options) + except APIError as error: + msg = error.explanation + if isinstance(msg, str) and any(m in msg.lower() for m in [ + "port is already allocated", + "ports are not available" + "an attempt was made to access a socket in a way forbidden by its access permissions" + ]): + f"Port {port} is already in use, please specify a different port using --port " + raise error + + +def get_ip_address(): + from socket import gethostname, gethostbyname + hostname = gethostname() + return gethostbyname(hostname + ".local") + + +@command(cls=LeanCommand, requires_lean_config=True, requires_docker=True, help="Start a new private cloud") +@option("--master", is_flag=True, default=False, help="Run in master mode") +@option("--slave", is_flag=True, default=False, help="Run in slave mode") +@option("--token", type=str, required=False, help="The master server token") +@option("--ip", type=str, required=False, help="The master server address") +@option("--port", type=int, required=False, default=0, help="The master server port") +@option("--update", is_flag=True, default=False, help="Pull the latest image before starting") +@option("--no-update", is_flag=True, default=False, help="Do not update to the latest version") +@option("--compute", type=str, required=False, help="Compute configuration to use") +@option("--extra-docker-config", type=str, default="{}", help="Extra docker configuration as a JSON string.") +@option("--image", type=str, hidden=True) +def start(master: bool, + slave: bool, + token: str, + ip: str, + port: int, + update: bool, + no_update: bool, + compute: Optional[str], + extra_docker_config: Optional[str], + image: Optional[str]) -> None: + logger = container.logger + + if slave and master: + raise RuntimeError(f"Can only provide one of '--master' or '--slave'") + if not slave and not master: + # just default to slave if none given + slave = True + + if not ip: + ip = get_ip_address() + logger.info(f"IP address was not provided using '{ip}'") + + str_mode = 'slave' if slave else 'master' + logger.info(f'Start running in {str_mode} mode') + + if not compute: + # configure + compute = [] + for module in cli_compute: + module.config_build({}, logger, True) + compute_config = module.get_settings() + compute.append(compute_config) + else: + compute = loads(compute) + + if slave: + if not token: + raise RuntimeError(f"Master token is required when running as slave") + if port == 0: + raise RuntimeError(f"Master port is required when running as slave") + else: + if not token: + from uuid import uuid4 + token = uuid4().hex + + docker_container = get_private_cloud_containers() + if any(docker_container): + names = [node.name for node in docker_container if node.status == 'running'] + if master and (COMPUTE_MASTER in names or COMPUTE_MESSAGING in names): + raise RuntimeError(f"Private cloud nodes already running detected: {names}") + logger.info(f"Running nodes: {names}") + + container.temp_manager.delete_temporary_directories_when_done = False + lean_config = container.lean_config_manager.get_complete_lean_config(None, None, None) + + if master: + deploy(ip, port, token, False, update, no_update, image, lean_config, extra_docker_config) + if port == 0: + port = container.docker_manager.get_container_port(COMPUTE_MASTER, "9696/tcp") + logger.info(f"Slaves can be added running: " + f"lean private-cloud start --slave --ip {ip} --token \"{token}\" --port {port}") + + compute_index = len(get_private_cloud_containers([COMPUTE_SLAVE])) + if compute: + logger.debug(f"Starting given compute configuration: {compute}") + for configuration in compute: + lean_config["compute"] = configuration + for i in range(compute_index, int(configuration["count"]) + compute_index): + deploy(ip, port, token, True, update, no_update, image, lean_config, extra_docker_config, i) diff --git a/lean/commands/private_cloud/stop.py b/lean/commands/private_cloud/stop.py new file mode 100644 index 00000000..6eddc9c4 --- /dev/null +++ b/lean/commands/private_cloud/stop.py @@ -0,0 +1,46 @@ +# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. +# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from click import command + +from lean.click import LeanCommand +from lean.constants import PRIVATE_CLOUD +from lean.container import container + + +def get_private_cloud_containers(container_filter: [] = None): + result = [] + if not container_filter: + container_filter = [PRIVATE_CLOUD] + for name in container_filter: + for docker_container in container.docker_manager.get_containers_by_name(name, starts_with=True): + result.append(docker_container) + return result + + +@command(cls=LeanCommand, requires_docker=True, help="Stops a running private cloud") +def stop() -> None: + logger = container.logger + for docker_container in get_private_cloud_containers(): + logger.info(f'Stopping: {docker_container.name.lstrip("/")}') + if docker_container: + try: + docker_container.kill() + except: + # might be restarting or not running + pass + try: + docker_container.remove() + except: + # might be running with autoremove + pass diff --git a/lean/components/docker/docker_manager.py b/lean/components/docker/docker_manager.py index 87a142f3..6901d880 100644 --- a/lean/components/docker/docker_manager.py +++ b/lean/components/docker/docker_manager.py @@ -109,7 +109,7 @@ def run_image(self, image: DockerImage, **kwargs) -> bool: format_output = kwargs.pop("format_output", lambda chunk: None) commands = kwargs.pop("commands", None) - if commands is not None: + if commands: shell_script_commands = ["#!/usr/bin/env bash", "set -e"] if self._logger.debug_logging_enabled: shell_script_commands.append("set -x") @@ -400,11 +400,23 @@ def get_container_by_name(self, container_name: str): :param container_name: the name of the container to find :return: the container with the given name, or None if it does not exist """ + containers = self.get_containers_by_name(container_name, starts_with=False) + return None if len(containers) == 0 else containers[0] + + def get_containers_by_name(self, container_name: str, starts_with: bool = False): + """Finds a container with a given name. + + :param container_name: the name of the container to find + :param starts_with: optionally match by starts_with + :return: the container with the given name, or None if it does not exist + """ + result = [] for container in self._get_docker_client().containers.list(all=True): if container.name.lstrip("/") == container_name: - return container - - return None + result.append(container) + elif starts_with and container.name.lstrip("/").startswith(container_name): + result.append(container) + return result def show_logs(self, container_name: str, follow: bool = False) -> None: """Shows the logs of a Docker container in the terminal. diff --git a/lean/components/docker/lean_runner.py b/lean/components/docker/lean_runner.py index 7c08ba1d..4c2260ff 100644 --- a/lean/components/docker/lean_runner.py +++ b/lean/components/docker/lean_runner.py @@ -250,7 +250,8 @@ def get_basic_docker_config_without_algo(self, detach: bool, image: DockerImage, target_path: str, - paths_to_mount: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + paths_to_mount: Optional[Dict[str, str]] = None, + config_local_path: Path = None) -> Dict[str, Any]: """Creates a basic Docker config to run the engine with. This method constructs the parts of the Docker config that is the same for both the engine and the optimizer. @@ -261,6 +262,7 @@ def get_basic_docker_config_without_algo(self, :param image: The docker image that will be used :param target_path: The target path inside the Docker container where the C# project should be located. :param paths_to_mount: additional paths to mount to the container + :param config_local_path: optional config local path :return: the Docker configuration containing basic configuration to run Lean """ @@ -276,7 +278,6 @@ def get_basic_docker_config_without_algo(self, lean_config.update({ "debug-mode": self._logger.debug_logging_enabled, "data-folder": "/Lean/Data", - "results-destination-folder": "/Results", "object-store-root": "/Storage" }) @@ -296,18 +297,23 @@ def get_basic_docker_config_without_algo(self, # Set up modules self._setup_installed_packages(run_options, image, target_path) - self._mount_lean_config_and_finalize(run_options, lean_config, None) + self._mount_lean_config_and_finalize(run_options, lean_config, None, config_local_path) return run_options - def _mount_lean_config_and_finalize(self, run_options: Dict[str, Any], lean_config: Dict[str, Any], output_dir: Optional[Path]): + def _mount_lean_config_and_finalize(self, run_options: Dict[str, Any], lean_config: Dict[str, Any], + output_dir: Optional[Path], config_local_path: Path = None): """Mounts Lean config and finalizes.""" from docker.types import Mount from uuid import uuid4 from json import dumps + from os import makedirs # Save the final Lean config to a temporary file so we can mount it into the container - config_path = self._temp_manager.create_temporary_directory() / "config.json" + if not config_local_path: + config_local_path = self._temp_manager.create_temporary_directory() + makedirs(config_local_path, exist_ok=True) + config_path = config_local_path / "config.json" with config_path.open("w+", encoding="utf-8") as file: file.write(dumps(lean_config, indent=4)) @@ -914,16 +920,29 @@ def mount_paths(self, paths_to_mount, lean_config, run_options): @staticmethod def parse_extra_docker_config(run_options: Dict[str, Any], extra_docker_config: Optional[Dict[str, Any]]) -> None: - from docker.types import DeviceRequest # Add known additional run options from the extra docker config. # For now, only device_requests is supported if extra_docker_config is not None: if "device_requests" in extra_docker_config: + from docker.types import DeviceRequest run_options["device_requests"] = [DeviceRequest(**device_request) for device_request in extra_docker_config["device_requests"]] if "volumes" in extra_docker_config: - volumes = run_options.get("volumes") - if not volumes: - volumes = run_options["volumes"] = {} - volumes.update(extra_docker_config["volumes"]) + target = run_options.get("volumes") + if not target: + target = run_options["volumes"] = {} + target.update(extra_docker_config["volumes"]) + + if "mounts" in extra_docker_config: + from docker.types import Mount + + target = run_options.get("mounts") + if not target: + target = run_options["mounts"] = [] + + for mount in extra_docker_config["mounts"]: + read_only = True + if "read_only" in mount: + read_only = mount["read_only"] + target.append(Mount(target=mount["target"], source=mount["source"], type="bind", read_only=read_only)) diff --git a/lean/constants.py b/lean/constants.py index ada2dbc1..3e24b66f 100644 --- a/lean/constants.py +++ b/lean/constants.py @@ -96,11 +96,17 @@ # The name of the Docker network which all Lean CLI containers are ran on DOCKER_NETWORK = "lean_cli" +PRIVATE_CLOUD = "private-cloud-" +COMPUTE_MASTER = PRIVATE_CLOUD + "master" +COMPUTE_MESSAGING = PRIVATE_CLOUD + "messaging" +COMPUTE_SLAVE = PRIVATE_CLOUD + "compute-" + # Module constants MODULE_TYPE = "type" MODULE_PLATFORM = "platform" # types +MODULE_COMPUTE = "compute" MODULE_ADDON = "addon-module" MODULE_BROKERAGE = "brokerage" MODULE_DATA_DOWNLOADER = "data-downloader" diff --git a/lean/models/cli/__init__.py b/lean/models/cli/__init__.py index 576f6456..033c2b81 100644 --- a/lean/models/cli/__init__.py +++ b/lean/models/cli/__init__.py @@ -13,7 +13,7 @@ from typing import List from lean.constants import MODULE_BROKERAGE, MODULE_TYPE, MODULE_PLATFORM, MODULE_CLI_PLATFORM, \ - MODULE_DATA_DOWNLOADER, MODULE_HISTORY_PROVIDER, MODULE_DATA_QUEUE_HANDLER, MODULE_ADDON + MODULE_DATA_DOWNLOADER, MODULE_HISTORY_PROVIDER, MODULE_DATA_QUEUE_HANDLER, MODULE_ADDON, MODULE_COMPUTE from lean.models import json_modules from lean.models.json_module import JsonModule @@ -23,6 +23,7 @@ cli_data_downloaders: List[JsonModule] = [] cli_history_provider: List[JsonModule] = [] cli_data_queue_handlers: List[JsonModule] = [] +cli_compute: List[JsonModule] = [] for json_module in json_modules: module_type = json_module[MODULE_TYPE] @@ -39,5 +40,5 @@ cli_data_queue_handlers.append(JsonModule(json_module, MODULE_DATA_QUEUE_HANDLER, MODULE_CLI_PLATFORM)) if MODULE_ADDON in module_type: cli_addon_modules.append(JsonModule(json_module, MODULE_ADDON, MODULE_CLI_PLATFORM)) - - + if MODULE_COMPUTE in module_type: + cli_compute.append(JsonModule(json_module, MODULE_COMPUTE, MODULE_CLI_PLATFORM))