Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance private cloud deployment support #520

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1831,9 +1831,9 @@ Options:
--master Run in master mode
--slave Run in slave mode
--token TEXT The master server token
--master-ip TEXT The master server ip address
--master-domain TEXT The master server domain
--master-port INTEGER The master server port
--slave-ip TEXT The slave server ip address
--slave-domain TEXT The slave server domain
--update Pull the latest image before starting
--no-update Do not update to the latest version
--compute TEXT Compute configuration to use
Expand Down
87 changes: 48 additions & 39 deletions lean/commands/private_cloud/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,18 @@
from lean.constants import COMPUTE_MASTER, COMPUTE_SLAVE, COMPUTE_MESSAGING


def get_free_port():
from socket import socket
for i in range(0, 3):
try:
port = 32787 + i
with socket() as s:
s.bind(('', port))
return port
except:
pass
return 0


def deploy(ip: str, port: int, token: str, slave: bool, update: bool, no_update: bool,
def deploy(target_master_domain: str, self_domain: str, port: int, token: str, slave: bool, update: bool, no_update: bool,
image: str, lean_config: dict, extra_docker_config: str, counter: int = 0):
logger = container.logger

compute_node_name = f"{COMPUTE_SLAVE}{counter}" if slave else COMPUTE_MASTER
logger.info(f"Starting {compute_node_name}...")
compute_directory = Path(f"~/.lean/compute/{compute_node_name}").expanduser()
lean_config["node-name"] = compute_node_name

run_options = container.lean_runner.get_basic_docker_config_without_algo(lean_config, None, True, None, None,
None, compute_directory)
run_options["environment"]["AIRLOCK"] = compute_directory
run_options["mounts"].append(Mount(target="/QuantConnect/platform-services/airlock",
source=str(compute_directory), type="bind"))
run_options["mounts"].append(Mount(target="/var/run/docker.sock", source="/var/run/docker.sock",
Expand All @@ -59,17 +48,31 @@ def deploy(ip: str, port: int, token: str, slave: bool, update: bool, no_update:
type="bind", read_only=True))
container.lean_runner.parse_extra_docker_config(run_options, loads(extra_docker_config))

if not image:
image = "quantconnect/platform-services:latest"

is_domain = not self_domain.replace('.', '').isnumeric()
if not slave:
run_options["ports"]["9696"] = str(port)
run_options["ports"]["9697"] = str(get_free_port())
if not is_domain:
run_options["ports"]["9696"] = str(port)
run_options["ports"]["9697"] = str(0)

root_directory = container.lean_config_manager.get_cli_root_directory()
run_options["volumes"][str(root_directory)] = {"bind": "/LeanCLIWorkspace", "mode": "rw"}

if is_domain:
labels = {}
for name, value in container.docker_manager.get_image_labels(image):
if slave and name == "slave" or not slave and name == "master":
for key, label in loads(value).items():
labels[key] = label.replace("{{domain}}", self_domain)
run_options["labels"] = labels

run_options["remove"] = False
run_options["name"] = compute_node_name
run_options["environment"]["MODE"] = str('slave') if slave else str('master')
run_options["environment"]["IP"] = str(ip)
run_options["environment"]["MASTER_DOMAIN"] = str(target_master_domain)
run_options["environment"]["SELF_DOMAIN"] = str(self_domain)
run_options["environment"]["PORT"] = str(port)
run_options["environment"]["TOKEN"] = str(token)
run_options["user"] = "root"
Expand Down Expand Up @@ -103,9 +106,9 @@ def get_ip_address():
@option("--master", is_flag=True, default=False, help="Run in master mode")
@option("--slave", is_flag=True, default=False, help="Run in slave mode")
@option("--token", type=str, required=False, help="The master server token")
@option("--master-ip", type=str, required=False, help="The master server ip address")
@option("--master-port", type=int, required=False, default=0, help="The master server port")
@option("--slave-ip", type=str, required=False, help="The slave server ip address")
@option("--master-domain", type=str, required=False, help="The master server domain")
@option("--master-port", type=int, required=False, default=443, help="The master server port")
@option("--slave-domain", type=str, required=False, help="The slave server domain")
@option("--update", is_flag=True, default=False, help="Pull the latest image before starting")
@option("--no-update", is_flag=True, default=False, help="Do not update to the latest version")
@option("--compute", type=str, required=False, help="Compute configuration to use")
Expand All @@ -115,8 +118,8 @@ def get_ip_address():
def start(master: bool,
slave: bool,
token: str,
master_ip: str,
slave_ip: str,
master_domain: str,
slave_domain: str,
master_port: int,
update: bool,
no_update: bool,
Expand All @@ -135,9 +138,9 @@ def start(master: bool,
# just default to slave if none given
slave = True

if not master_ip:
master_ip = get_ip_address()
logger.info(f"'--master-ip' was not provided using '{master_ip}'")
if not master_domain:
master_domain = get_ip_address()
logger.info(f"'--master-domain' was not provided using '{master_domain}'")

str_mode = 'slave' if slave else 'master'
logger.info(f'Start running in {str_mode} mode')
Expand All @@ -154,9 +157,7 @@ def start(master: bool,

if slave:
if not token:
raise RuntimeError(f"Master token is required when running as slave")
if master_port == 0:
raise RuntimeError(f"Master port is required when running as slave")
raise RuntimeError(f"Master token '--token' is required when running as slave")
else:
if not token:
from uuid import uuid4
Expand All @@ -166,41 +167,49 @@ def start(master: bool,
if any(docker_container):
names = [node.name for node in docker_container if node.status == 'running']
if master and (COMPUTE_MASTER in names or COMPUTE_MESSAGING in names):
raise RuntimeError(f"Private cloud nodes already running detected: {names}")
raise RuntimeError(f"Private cloud nodes already running, please use '--stop'. Detected: {names}")
logger.info(f"Running nodes: {names}")

container.temp_manager.delete_temporary_directories_when_done = False
lean_config = container.lean_config_manager.get_complete_lean_config(None, None, None)

master_is_domain = not master_domain.replace('.', '').isnumeric()
if master:
deploy(master_ip, master_port, token, False, update, no_update, image, lean_config, extra_docker_config)
master_port_option = f" --master-port {master_port}"
if master_is_domain:
slave_domain = master_domain
master_port_option = ''
deploy(master_domain, master_domain, master_port, token, False, update, no_update, image,
lean_config, extra_docker_config)
if master_port == 0:
master_port = container.docker_manager.get_container_port(COMPUTE_MASTER, "9696/tcp")
logger.info(f"Slaves can be added running: "
f"lean private-cloud start --slave --master-ip {master_ip} --token \"{token}\" --master-port {master_port}")

logger.info(f"Slaves can be added running: lean private-cloud start --slave --master-domain {master_domain}"
f" --slave-domain {{slave.domain}} --token \"{token}\"{master_port_option}")

compute_index = len(get_private_cloud_containers([COMPUTE_SLAVE]))
if compute:
logger.debug(f"Starting given compute configuration: {compute}")

if not slave_ip:
logger.debug(f"'slave-ip' was not given will try to figure it out...")
if not slave_domain:
logger.debug(f"'slave-domain' was not given will try to figure it out...")
retry_count = 0
while retry_count < 10:
retry_count += 1
try:
from requests import get
resp = get(f'http://{master_ip}:{master_port}', stream=True)
slave_ip = resp.raw._connection.sock.getsockname()[0]
resp = get(f'http://{master_domain}:{master_port}', stream=True)
slave_domain = resp.raw._connection.sock.getsockname()[0]
break
except Exception as e:
from time import sleep
sleep(1)
pass
lean_config["self-ip-address"] = slave_ip
logger.info(f"Using ip address '{slave_ip}' as own")
lean_config["self-ip-address"] = slave_domain
logger.debug(f"Using address '{slave_domain}' as own")

for configuration in compute:
lean_config["compute"] = configuration
for i in range(compute_index, int(configuration["count"]) + compute_index):
deploy(master_ip, master_port, token, True, update, no_update, image, lean_config, extra_docker_config, i)
deploy(master_domain, slave_domain, master_port, token, True, update, no_update, image,
lean_config, extra_docker_config, i)
10 changes: 6 additions & 4 deletions lean/components/docker/docker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ def __init__(self, logger: Logger, temp_manager: TempManager, platform_manager:
self._temp_manager = temp_manager
self._platform_manager = platform_manager

def get_image_label(self, image: DockerImage, label: str, default: str) -> str:
docker_image = self._get_docker_client().images.get(str(image))
def get_image_labels(self, image: str) -> str:
docker_image = self._get_docker_client().images.get(image)
return docker_image.labels.items()

for name, value in docker_image.labels.items():
def get_image_label(self, image: DockerImage, label: str, default: str) -> str:
for name, value in self.get_image_labels(str(image)):
if name == label:
self._logger.debug(f"Label '{label}' found in image '{image.name}', value {value}")
return value
Expand Down Expand Up @@ -179,7 +181,7 @@ def run_image(self, image: DockerImage, **kwargs) -> bool:
from time import sleep
i = 0
self._logger.info(f'Verifying deployment \'{container.name}\' is stable...')
while i < 35:
while i < 60:
i += 1
container.reload()
if (container.status != "running" and container.attrs and "State" in container.attrs and "ExitCode"
Expand Down
Loading