Skip to content

Commit

Permalink
feat: start all services in a workspace for local runs (#772)
Browse files Browse the repository at this point in the history
* feat: mvp of multiple runtimes local run

* fix: runtime breakages elsewhere

* refactor: portpicker for runtimes, task abortion

* feat: get service name from shuttle.toml

* fix: clippy

* fix: broken test

* refactor: shuttle.toml not required in workspace test
  • Loading branch information
oddgrd authored Mar 31, 2023
1 parent b37c9ef commit 515bd3f
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 195 deletions.
336 changes: 178 additions & 158 deletions cargo-shuttle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use shuttle_common::models::resource::get_resources_table;
use shuttle_common::project::ProjectName;
use shuttle_common::resource;
use shuttle_proto::runtime::{self, LoadRequest, StartRequest, SubscribeLogsRequest};
use tokio::task::JoinSet;

use std::collections::HashMap;
use std::ffi::OsString;
Expand All @@ -37,7 +38,7 @@ use git2::{Repository, StatusOptions};
use ignore::overrides::OverrideBuilder;
use ignore::WalkBuilder;
use shuttle_common::models::{project, secret};
use shuttle_service::builder::{build_workspace, Runtime};
use shuttle_service::builder::{build_workspace, BuiltService};
use std::fmt::Write;
use strum::IntoEnumIterator;
use tar::Builder;
Expand Down Expand Up @@ -435,197 +436,216 @@ impl Shuttle {
working_directory.display()
);

let runtimes = build_workspace(working_directory, run_args.release, tx).await?;
let provisioner = LocalProvisioner::new()?;
let provisioner_server = provisioner.start(SocketAddr::new(
Ipv4Addr::LOCALHOST.into(),
run_args.port + 1,
));

trace!("loading secrets");
// Compile all the alpha or shuttle-next services in the workspace.
let services = build_workspace(working_directory, run_args.release, tx).await?;

let secrets_path = if working_directory.join("Secrets.dev.toml").exists() {
working_directory.join("Secrets.dev.toml")
} else {
working_directory.join("Secrets.toml")
};
let mut runtime_handles = JoinSet::new();

// Start all the services.
for service in services {
let BuiltService {
executable_path,
is_wasm,
working_directory,
..
} = service.clone();

trace!("loading secrets");
let secrets_path = if working_directory.join("Secrets.dev.toml").exists() {
working_directory.join("Secrets.dev.toml")
} else {
working_directory.join("Secrets.toml")
};

let secrets: HashMap<String, String> = if let Ok(secrets_str) = read_to_string(secrets_path)
{
let secrets: HashMap<String, String> =
secrets_str.parse::<toml::Value>()?.try_into()?;
if let Ok(secrets_str) = read_to_string(secrets_path) {
let secrets: HashMap<String, String> =
secrets_str.parse::<toml::Value>()?.try_into()?;

trace!(keys = ?secrets.keys(), "available secrets");
trace!(keys = ?secrets.keys(), "available secrets");

secrets
} else {
trace!("no Secrets.toml was found");
Default::default()
};
secrets
} else {
trace!("no Secrets.toml was found");
Default::default()
};

let service_name = self.ctx.project_name().to_string();
let runtime_path = || {
if is_wasm {
let runtime_path = home::cargo_home()
.expect("failed to find cargo home dir")
.join("bin/shuttle-next");

let (is_wasm, executable_path) = match runtimes[0].clone() {
Runtime::Next(path) => (true, path),
Runtime::Alpha(path) => (false, path),
};
println!("Installing shuttle-next runtime. This can take a while...");

let provisioner = LocalProvisioner::new()?;
let provisioner_server = provisioner.start(SocketAddr::new(
Ipv4Addr::LOCALHOST.into(),
run_args.port + 1,
));
if cfg!(debug_assertions) {
// Canonicalized path to shuttle-runtime for dev to work on windows
let path = std::fs::canonicalize(format!("{MANIFEST_DIR}/../runtime"))
.expect("path to shuttle-runtime does not exist or is invalid");

let runtime_path = || {
if is_wasm {
let runtime_path = home::cargo_home()
.expect("failed to find cargo home dir")
.join("bin/shuttle-next");

println!("Installing shuttle runtime. This can take a while...");

if cfg!(debug_assertions) {
// Canonicalized path to shuttle-runtime for dev to work on windows
let path = std::fs::canonicalize(format!("{MANIFEST_DIR}/../runtime"))
.expect("path to shuttle-runtime does not exist or is invalid");

trace!(?path, "installing runtime from local filesystem");

std::process::Command::new("cargo")
.arg("install")
.arg("shuttle-runtime")
.arg("--path")
.arg(path)
.arg("--bin")
.arg("shuttle-next")
.arg("--features")
.arg("next")
.output()
.expect("failed to install the shuttle runtime");
} else {
// If the version of cargo-shuttle is different from shuttle-runtime,
// or it isn't installed, try to install shuttle-runtime from crates.io.
if let Err(err) = check_version(&runtime_path) {
warn!("{}", err);
trace!(?path, "installing runtime from local filesystem");

trace!("installing shuttle-runtime");
std::process::Command::new("cargo")
.arg("install")
.arg("shuttle-runtime")
.arg("--path")
.arg(path)
.arg("--bin")
.arg("shuttle-next")
.arg("--features")
.arg("next")
.output()
.expect("failed to install the shuttle runtime");
} else {
// If the version of cargo-shuttle is different from shuttle-runtime,
// or it isn't installed, try to install shuttle-runtime from crates.io.
if let Err(err) = check_version(&runtime_path) {
warn!("{}", err);

trace!("installing shuttle-runtime");
std::process::Command::new("cargo")
.arg("install")
.arg("shuttle-runtime")
.arg("--bin")
.arg("shuttle-next")
.arg("--features")
.arg("next")
.output()
.expect("failed to install the shuttle runtime");
};
};
};

runtime_path
} else {
trace!(path = ?executable_path, "using alpha runtime");
executable_path.clone()
}
};

let (mut runtime, mut runtime_client) = runtime::start(
is_wasm,
runtime::StorageManagerType::WorkingDir(working_directory.to_path_buf()),
&format!("http://localhost:{}", run_args.port + 1),
None,
run_args.port + 2,
runtime_path,
)
.await
.map_err(|err| {
provisioner_server.abort();

err
})?;

let load_request = tonic::Request::new(LoadRequest {
path: executable_path
.into_os_string()
.into_string()
.expect("to convert path to string"),
service_name: service_name.clone(),
resources: Default::default(),
secrets,
});
trace!("loading service");
let response = runtime_client
.load(load_request)
.or_else(|err| async {
provisioner_server.abort();
runtime.kill().await?;

Err(err)
})
.await?
.into_inner();

if !response.success {
error!(error = response.message, "failed to load your service");
exit(1);
}

let resources = response
.resources
.into_iter()
.map(resource::Response::from_bytes)
.collect();

let resources = get_resources_table(&resources, self.ctx.project_name().as_str());
runtime_path
} else {
trace!(path = ?executable_path, "using alpha runtime");
executable_path.clone()
}
};

let mut stream = runtime_client
.subscribe_logs(tonic::Request::new(SubscribeLogsRequest {}))
.or_else(|err| async {
let (mut runtime, mut runtime_client) = runtime::start(
is_wasm,
runtime::StorageManagerType::WorkingDir(working_directory.to_path_buf()),
&format!("http://localhost:{}", run_args.port + 1),
None,
portpicker::pick_unused_port().unwrap(),
runtime_path,
)
.await
.map_err(|err| {
provisioner_server.abort();
runtime.kill().await?;

Err(err)
})
.await?
.into_inner();

tokio::spawn(async move {
while let Ok(Some(log)) = stream.message().await {
let log: shuttle_common::LogItem = log.try_into().expect("to convert log");
println!("{log}");
err
})?;

let service_name = service.service_name()?;

let load_request = tonic::Request::new(LoadRequest {
path: executable_path
.into_os_string()
.into_string()
.expect("to convert path to string"),
service_name: service_name.to_string(),
resources: Default::default(),
secrets,
});
trace!("loading service");
let response = runtime_client
.load(load_request)
.or_else(|err| async {
provisioner_server.abort();
runtime.kill().await?;

Err(err)
})
.await?
.into_inner();

if !response.success {
error!(error = response.message, "failed to load your service");
exit(1);
}
});

println!("{resources}");

let addr = if run_args.external {
std::net::IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))
} else {
Ipv4Addr::LOCALHOST.into()
};
let resources = response
.resources
.into_iter()
.map(resource::Response::from_bytes)
.collect();

let resources = get_resources_table(&resources, service_name.as_str());

let mut stream = runtime_client
.subscribe_logs(tonic::Request::new(SubscribeLogsRequest {}))
.or_else(|err| async {
provisioner_server.abort();
runtime.kill().await?;

Err(err)
})
.await?
.into_inner();

tokio::spawn(async move {
while let Ok(Some(log)) = stream.message().await {
let log: shuttle_common::LogItem = log.try_into().expect("to convert log");
println!("{log}");
}
});

let addr = SocketAddr::new(addr, run_args.port);
println!("{resources}");

let start_request = StartRequest {
ip: addr.to_string(),
};
let addr = if run_args.external {
std::net::IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))
} else {
Ipv4Addr::LOCALHOST.into()
};

trace!(?start_request, "starting service");
let response = runtime_client
.start(tonic::Request::new(start_request))
.or_else(|err| async {
provisioner_server.abort();
runtime.kill().await?;
let addr = SocketAddr::new(
addr,
portpicker::pick_unused_port().expect("unable to find available port"),
);

Err(err)
})
.await?
.into_inner();
let start_request = StartRequest {
ip: addr.to_string(),
};

trace!(response = ?response, "client response: ");
trace!(?start_request, "starting service");
let response = runtime_client
.start(tonic::Request::new(start_request))
.or_else(|err| async {
provisioner_server.abort();
runtime.kill().await?;

Err(err)
})
.await?
.into_inner();

trace!(response = ?response, "client response: ");

println!(
"\n{:>12} {} on http://{}",
"Starting".bold().green(),
&service_name,
addr
);

println!(
"\n{:>12} {} on http://{}",
"Starting".bold().green(),
self.ctx.project_name(),
addr
);
runtime_handles.spawn(async move { runtime.wait().await });
}

runtime.wait().await?;
// TODO: figure out how best to handle the runtime handles, and what to do if
// one completes.
while let Some(res) = runtime_handles.join_next().await {
println!(
"a service future completed with exit status: {:?}",
res.unwrap().unwrap().code()
);
}

Ok(())
}
Expand Down
Loading

0 comments on commit 515bd3f

Please sign in to comment.