From 8a38a126e95a6c08f7b782a3503f30351e7d2a84 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Tue, 2 Apr 2024 14:04:07 +0200 Subject: [PATCH] feat: gateway command to sync permit (#1705) * nit: name * refactor: args * feat: sync projects loop * fix: permit client error model * fix: auth user & tier sync * nit: unused runtime deps * fix * fix: improve test * ci: add permit key (TODO) * ci: use correct staging permit key * feat: permit health check * todo * fix: ignore project create 409s * feat: sync projects by user * fix: hashmap inserts --- .circleci/config.yml | 6 ++ Cargo.lock | 3 - Makefile | 17 ++-- auth/src/error.rs | 2 + auth/src/lib.rs | 37 ++++--- backends/src/client/permit.rs | 82 ++++++++++++++- backends/src/test_utils/gateway.rs | 3 +- backends/tests/integration/permit_tests.rs | 14 ++- common/src/claims.rs | 19 +++- docker-compose.yml | 6 ++ gateway/src/api/latest.rs | 2 +- gateway/src/args.rs | 26 +++-- gateway/src/lib.rs | 14 +-- gateway/src/main.rs | 113 ++++++++++++++------- gateway/src/service.rs | 18 ++-- runtime/Cargo.toml | 3 - 16 files changed, 265 insertions(+), 100 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a9bed31f9..ebd9dc5e0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -356,6 +356,9 @@ jobs: gateway-admin-key: description: "Admin API key that authorizes gateway requests to auth service, for key to jwt conversion." type: string + permit-api-key: + description: "Permit.io API key for the Permit environment that matches the current ${SHUTTLE_ENV}." + type: string steps: - checkout - set-git-tag @@ -383,6 +386,7 @@ jobs: AUTH_JWTSIGNING_PRIVATE_KEY=${<< parameters.jwt-signing-private-key >>} \ CONTROL_DB_POSTGRES_URI=${<< parameters.control-db-postgres-uri >>} \ GATEWAY_ADMIN_KEY=${<< parameters.gateway-admin-key >>} \ + PERMIT_API_KEY=${<< parameters.permit-api-key >>} \ make deploy - when: condition: @@ -748,6 +752,7 @@ workflows: jwt-signing-private-key: DEV_AUTH_JWTSIGNING_PRIVATE_KEY control-db-postgres-uri: DEV_CONTROL_DB_POSTGRES_URI gateway-admin-key: DEV_GATEWAY_ADMIN_KEY + permit-api-key: STAGING_PERMIT_API_KEY requires: - build-and-push-unstable - approve-deploy-images-unstable @@ -832,6 +837,7 @@ workflows: jwt-signing-private-key: PROD_AUTH_JWTSIGNING_PRIVATE_KEY control-db-postgres-uri: PROD_CONTROL_DB_POSTGRES_URI gateway-admin-key: PROD_GATEWAY_ADMIN_KEY + permit-api-key: PROD_PERMIT_API_KEY ssh-fingerprint: 6a:c5:33:fe:5b:c9:06:df:99:64:ca:17:0d:32:18:2e ssh-config-script: production-ssh-config.sh ssh-host: shuttle.prod.internal diff --git a/Cargo.lock b/Cargo.lock index c8b9c1c7d..fd3315a0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5554,10 +5554,8 @@ version = "0.42.0" dependencies = [ "anyhow", "async-trait", - "chrono", "colored", "portpicker", - "prost-types", "serde", "serde_json", "shuttle-codegen", @@ -5565,7 +5563,6 @@ dependencies = [ "shuttle-proto", "shuttle-service", "strfmt", - "thiserror", "tokio", "tokio-stream", "tonic 0.10.2", diff --git a/Makefile b/Makefile index 788bd0380..e36fdb7be 100644 --- a/Makefile +++ b/Makefile @@ -43,6 +43,11 @@ STRIPE_SECRET_KEY?="" AUTH_JWTSIGNING_PRIVATE_KEY?="" PERMIT_API_KEY?="" +# log level set in all backends +RUST_LOG?=shuttle=debug,info + +# production/staging/dev +SHUTTLE_ENV?=dev DD_ENV=$(SHUTTLE_ENV) ifeq ($(SHUTTLE_ENV),production) DOCKER_COMPOSE_FILES=docker-compose.yml @@ -53,8 +58,8 @@ CONTAINER_REGISTRY=public.ecr.aws/shuttle # make sure we only ever go to production with `--tls=enable` USE_TLS=enable CARGO_PROFILE=release -RUST_LOG?=shuttle=debug,info else +# add local development overrides to compose DOCKER_COMPOSE_FILES=docker-compose.yml docker-compose.dev.yml STACK?=shuttle-dev APPS_FQDN=unstable.shuttleapp.rs @@ -63,7 +68,10 @@ CONTAINER_REGISTRY=public.ecr.aws/shuttle-dev USE_TLS?=disable # default for local run CARGO_PROFILE?=debug -RUST_LOG?=shuttle=debug,info +ifeq ($(CI),true) +# use release builds for staging deploys so that the DLC cache can be re-used for prod deploys +CARGO_PROFILE=release +endif DEV_SUFFIX=-dev DEPLOYS_API_KEY?=gateway4deployes GATEWAY_ADMIN_KEY?=dh9z58jttoes3qvt @@ -79,11 +87,6 @@ LOGGER_POSTGRES_PASSWORD?=postgres LOGGER_POSTGRES_URI?=postgres://postgres:${LOGGER_POSTGRES_PASSWORD}@logger-postgres:5432/postgres endif -ifeq ($(CI),true) -# default for staging -CARGO_PROFILE=release -endif - POSTGRES_EXTRA_PATH?=./extras/postgres POSTGRES_TAG?=14 diff --git a/auth/src/error.rs b/auth/src/error.rs index d1b0f7892..22d85fc54 100644 --- a/auth/src/error.rs +++ b/auth/src/error.rs @@ -26,6 +26,8 @@ pub enum Error { Stripe(#[from] StripeError), #[error("Failed to communicate with service API.")] ServiceApi(#[from] client::Error), + #[error("Failed to communicate with Permit API.")] + PermitApi(#[from] client::permit::Error), } impl Serialize for Error { diff --git a/auth/src/lib.rs b/auth/src/lib.rs index 0011f5da3..1edf13a79 100644 --- a/auth/src/lib.rs +++ b/auth/src/lib.rs @@ -6,7 +6,11 @@ mod user; use anyhow::Result; use args::{CopyPermitEnvArgs, StartArgs, SyncArgs}; -use shuttle_backends::client::{permit, PermissionsDal}; +use http::StatusCode; +use shuttle_backends::client::{ + permit::{self, Error, ResponseContent}, + PermissionsDal, +}; use shuttle_common::{claims::AccountTier, ApiKey}; use sqlx::{migrate::Migrator, query, PgPool}; use tracing::info; @@ -54,37 +58,38 @@ pub async fn sync(pool: PgPool, args: SyncArgs) -> Result<()> { match permit_client.get_user(&user.id).await { Ok(p_user) => { // Update tier if out of sync + let wanted_tier = user.account_tier.as_permit_account_tier(); if !p_user .roles - .is_some_and(|rs| rs.iter().any(|r| r.role == user.account_tier.to_string())) + .is_some_and(|rs| rs.iter().any(|r| r.role == wanted_tier.to_string())) { - match user.account_tier { - AccountTier::Basic - | AccountTier::PendingPaymentPro - | AccountTier::CancelledPro - | AccountTier::Team - | AccountTier::Admin - | AccountTier::Deployer => { + println!("updating tier for user: {}", user.id); + match wanted_tier { + AccountTier::Basic => { permit_client.make_basic(&user.id).await?; } AccountTier::Pro => { permit_client.make_pro(&user.id).await?; } + _ => unreachable!(), } } } - Err(_) => { - // FIXME: Make the error type better so that this is only done on 404s - + Err(Error::ResponseError(ResponseContent { + status: StatusCode::NOT_FOUND, + .. + })) => { + // Add users that are not in permit println!("creating user: {}", user.id); - // Add users that are not in permit permit_client.new_user(&user.id).await?; - - if user.account_tier == AccountTier::Pro { + if user.account_tier.as_permit_account_tier() == AccountTier::Pro { permit_client.make_pro(&user.id).await?; } } + Err(e) => { + println!("failed to fetch user {}. skipping. error: {e}", user.id); + } } } @@ -100,7 +105,7 @@ pub async fn copy_environment(args: CopyPermitEnvArgs) -> Result<()> { args.permit.permit_api_key, ); - client.copy_environment(&args.target).await + Ok(client.copy_environment(&args.target).await?) } pub async fn init(pool: PgPool, args: InitArgs, tier: AccountTier) -> Result<()> { diff --git a/backends/src/client/permit.rs b/backends/src/client/permit.rs index a3dde9e78..11f009c08 100644 --- a/backends/src/client/permit.rs +++ b/backends/src/client/permit.rs @@ -1,10 +1,13 @@ -use anyhow::Error; +use std::fmt::{Debug, Display}; + use async_trait::async_trait; +use http::StatusCode; use permit_client_rs::{ apis::{ resource_instances_api::{create_resource_instance, delete_resource_instance}, role_assignments_api::{assign_role, unassign_role}, users_api::{create_user, delete_user, get_user}, + Error as PermitClientError, }, models::{ ResourceInstanceCreate, RoleAssignmentCreate, RoleAssignmentRemove, UserCreate, UserRead, @@ -17,6 +20,7 @@ use permit_pdp_client_rs::{ }, data_updater_api::trigger_policy_data_update_data_updater_trigger_post, policy_updater_api::trigger_policy_update_policy_updater_trigger_post, + Error as PermitPDPClientError, }, models::{AuthorizationQuery, Resource, User, UserPermissionsQuery, UserPermissionsResult}, }; @@ -143,7 +147,7 @@ impl PermissionsDal for Client { } async fn create_project(&self, user_id: &str, project_id: &str) -> Result<(), Error> { - create_resource_instance( + if let Err(e) = create_resource_instance( &self.api, &self.proj_id, &self.env_id, @@ -154,7 +158,18 @@ impl PermissionsDal for Client { attributes: None, }, ) - .await?; + .await + { + // Early return all errors except 409's (project already exists) + let e: Error = e.into(); + if let Error::ResponseError(ref re) = e { + if re.status != StatusCode::CONFLICT { + return Err(e); + } + } else { + return Err(e); + } + } self.assign_resource_role(user_id, format!("Project:{project_id}"), "admin") .await?; @@ -492,7 +507,7 @@ impl Client { } } -// #[cfg(feature = "admin")] +/// Higher level management methods. Use with care. mod admin { use permit_client_rs::{ apis::environments_api::copy_environment, @@ -505,7 +520,8 @@ mod admin { use super::*; impl Client { - /// Copy and overwrite the policies of one env to another existing one + /// Copy and overwrite a permit env's policies to another env. + /// Requires a project level API key. pub async fn copy_environment(&self, target_env: &str) -> Result<(), Error> { copy_environment( &self.api, @@ -543,3 +559,59 @@ mod admin { } } } + +/// Dumbed down and unified version of the client's errors to get rid of the genereic +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("reqwest error: {0}")] + Reqwest(reqwest::Error), + #[error("serde error: {0}")] + Serde(serde_json::Error), + #[error("io error: {0}")] + Io(std::io::Error), + #[error("response error: {0}")] + ResponseError(ResponseContent), +} +#[derive(Debug)] +pub struct ResponseContent { + pub status: reqwest::StatusCode, + pub content: String, + pub entity: String, +} +impl Display for ResponseContent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "status: {}, content: {}, entity: {}", + self.status, self.content, self.entity + ) + } +} +impl From> for Error { + fn from(value: PermitClientError) -> Self { + match value { + PermitClientError::Reqwest(e) => Self::Reqwest(e), + PermitClientError::Serde(e) => Self::Serde(e), + PermitClientError::Io(e) => Self::Io(e), + PermitClientError::ResponseError(e) => Self::ResponseError(ResponseContent { + status: e.status, + content: e.content, + entity: format!("{:?}", e.entity), + }), + } + } +} +impl From> for Error { + fn from(value: PermitPDPClientError) -> Self { + match value { + PermitPDPClientError::Reqwest(e) => Self::Reqwest(e), + PermitPDPClientError::Serde(e) => Self::Serde(e), + PermitPDPClientError::Io(e) => Self::Io(e), + PermitPDPClientError::ResponseError(e) => Self::ResponseError(ResponseContent { + status: e.status, + content: e.content, + entity: format!("{:?}", e.entity), + }), + } + } +} diff --git a/backends/src/test_utils/gateway.rs b/backends/src/test_utils/gateway.rs index 540b14d67..fe026fece 100644 --- a/backends/src/test_utils/gateway.rs +++ b/backends/src/test_utils/gateway.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use anyhow::Error; use async_trait::async_trait; use permit_client_rs::models::UserRead; use permit_pdp_client_rs::models::UserPermissionsResult; @@ -12,7 +11,7 @@ use wiremock::{ Mock, MockServer, Request, ResponseTemplate, }; -use crate::client::PermissionsDal; +use crate::client::{permit::Error, PermissionsDal}; pub async fn get_mocked_gateway_server() -> MockServer { let mock_server = MockServer::start().await; diff --git a/backends/tests/integration/permit_tests.rs b/backends/tests/integration/permit_tests.rs index 37625b968..65731ef09 100644 --- a/backends/tests/integration/permit_tests.rs +++ b/backends/tests/integration/permit_tests.rs @@ -1,12 +1,16 @@ mod needs_docker { use std::sync::OnceLock; + use http::StatusCode; use permit_client_rs::apis::{ resource_instances_api::{delete_resource_instance, list_resource_instances}, users_api::list_users, }; use serial_test::serial; - use shuttle_backends::client::{permit::Client, PermissionsDal}; + use shuttle_backends::client::{ + permit::{Client, Error, ResponseContent}, + PermissionsDal, + }; use shuttle_common::claims::AccountTier; use shuttle_common_tests::permit_pdp::DockerInstance; use test_context::{test_context, AsyncTestContext}; @@ -116,7 +120,13 @@ mod needs_docker { client.delete_user(u).await.unwrap(); let res = client.get_user(u).await; - assert!(res.is_err()); + assert!(matches!( + res, + Err(Error::ResponseError(ResponseContent { + status: StatusCode::NOT_FOUND, + .. + })) + )); } #[test_context(Wrap)] diff --git a/common/src/claims.rs b/common/src/claims.rs index 64eb1be05..a27c6e4ec 100644 --- a/common/src/claims.rs +++ b/common/src/claims.rs @@ -169,9 +169,9 @@ impl ScopeBuilder { )] #[serde(rename_all = "lowercase")] #[cfg_attr(feature = "display", derive(strum::Display))] +#[cfg_attr(feature = "display", strum(serialize_all = "lowercase"))] #[cfg_attr(feature = "persist", derive(sqlx::Type))] #[cfg_attr(feature = "persist", sqlx(rename_all = "lowercase"))] -#[cfg_attr(feature = "display", strum(serialize_all = "lowercase"))] pub enum AccountTier { #[default] Basic, @@ -184,6 +184,23 @@ pub enum AccountTier { Deployer, } +impl AccountTier { + /// The tier that this user should have in Permit.io. + /// Permit should only store the tier that determines permissions, + /// with the exception of 'admin', which is an override and not checked against Permit. + pub fn as_permit_account_tier(&self) -> Self { + match self { + Self::Basic + | Self::PendingPaymentPro + | Self::CancelledPro + | Self::Team + | Self::Admin + | Self::Deployer => Self::Basic, + Self::Pro => Self::Pro, + } + } +} + impl From for Vec { fn from(tier: AccountTier) -> Self { let mut builder = ScopeBuilder::new(); diff --git a/docker-compose.yml b/docker-compose.yml index 72da2bc44..18f2aa654 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -290,3 +290,9 @@ services: placement: constraints: - node.hostname==controller + healthcheck: + test: curl -f -s http://localhost:7000 + interval: 1m + timeout: 10s + retries: 5 + diff --git a/gateway/src/api/latest.rs b/gateway/src/api/latest.rs index 4c3832370..1481bf10e 100644 --- a/gateway/src/api/latest.rs +++ b/gateway/src/api/latest.rs @@ -768,7 +768,7 @@ async fn renew_gateway_acme_certificate( .whole_days() <= RENEWAL_VALIDITY_THRESHOLD_IN_DAYS { - let tls_path = service.state_location.join("ssl.pem"); + let tls_path = service.state_dir.join("ssl.pem"); let certs = service .create_certificate(&acme_client, account.credentials()) .await; diff --git a/gateway/src/args.rs b/gateway/src/args.rs index 188598045..6cb5d09da 100644 --- a/gateway/src/args.rs +++ b/gateway/src/args.rs @@ -6,8 +6,8 @@ use http::Uri; #[derive(Parser, Debug)] pub struct Args { - /// Where to store gateway state (such as sqlite state, and certs) - #[arg(long, default_value = "./")] + /// Where to store gateway state (sqlite and certs) + #[arg(long, default_value = ".")] pub state: PathBuf, #[command(subcommand)] @@ -23,6 +23,7 @@ pub enum UseTls { #[derive(Subcommand, Debug)] pub enum Commands { Start(StartArgs), + Sync(SyncArgs), } #[derive(clap::Args, Debug, Clone)] @@ -40,11 +41,13 @@ pub struct StartArgs { #[arg(long, default_value = "enable")] pub use_tls: UseTls, #[command(flatten)] - pub context: ContextArgs, + pub context: ServiceArgs, + #[command(flatten)] + pub permit: PermitArgs, } #[derive(clap::Args, Debug, Clone)] -pub struct ContextArgs { +pub struct ServiceArgs { /// Default image to deploy user runtimes into #[arg(long, default_value = "public.ecr.aws/shuttle/deployer:latest")] pub image: String, @@ -88,6 +91,18 @@ pub struct ContextArgs { #[arg(long, default_value = "990")] pub hard_container_limit: u32, + /// Allow tests to set some extra /etc/hosts + pub extra_hosts: Vec, +} + +#[derive(clap::Args, Debug, Clone)] +pub struct SyncArgs { + #[command(flatten)] + pub permit: PermitArgs, +} + +#[derive(clap::Args, Debug, Clone)] +pub struct PermitArgs { /// Address to reach the permit.io API at #[arg(long, default_value = "https://api.eu-central-1.permit.io")] pub permit_api_uri: Uri, @@ -100,7 +115,4 @@ pub struct ContextArgs { /// Permit API key #[arg(long, default_value = "permit_")] pub permit_api_key: String, - - /// Allow tests to set some extra /etc/hosts - pub extra_hosts: Vec, } diff --git a/gateway/src/lib.rs b/gateway/src/lib.rs index 0b3c194aa..a483d38b3 100644 --- a/gateway/src/lib.rs +++ b/gateway/src/lib.rs @@ -282,7 +282,7 @@ pub mod tests { use crate::acme::AcmeClient; use crate::api::latest::ApiBuilder; - use crate::args::{ContextArgs, StartArgs, UseTls}; + use crate::args::{PermitArgs, ServiceArgs, StartArgs, UseTls}; use crate::project::Project; use crate::proxy::UserServiceBuilder; use crate::service::{ContainerSettings, GatewayService, MIGRATIONS}; @@ -540,7 +540,7 @@ pub mod tests { user, bouncer, use_tls: UseTls::Disable, - context: ContextArgs { + context: ServiceArgs { docker_host, image, prefix, @@ -571,13 +571,15 @@ pub mod tests { cch_container_limit: 1, soft_container_limit: 2, hard_container_limit: 3, + + // Allow access to the auth on the host + extra_hosts: vec!["host.docker.internal:host-gateway".to_string()], + }, + permit: PermitArgs { permit_api_uri: Default::default(), // TODO: will need mock? permit_pdp_uri: Default::default(), // TODO: will need mock? permit_env: Default::default(), // TODO: will need mock? permit_api_key: Default::default(), // TODO: will need mock? - - // Allow access to the auth on the host - extra_hosts: vec!["host.docker.internal:host-gateway".to_string()], }, }; @@ -608,7 +610,7 @@ pub mod tests { } } - pub fn args(&self) -> ContextArgs { + pub fn args(&self) -> ServiceArgs { self.args.context.clone() } diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 31b562373..26e1896bf 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -1,59 +1,52 @@ +use std::collections::BTreeMap; +use std::io::Cursor; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + use async_posthog::ClientOptions; use clap::Parser; use futures::prelude::*; - -use shuttle_backends::client::permit; +use shuttle_backends::client::{permit, PermissionsDal}; use shuttle_backends::trace::setup_tracing; use shuttle_common::log::Backend; +use sqlx::migrate::MigrateDatabase; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous}; +use sqlx::{Sqlite, SqlitePool}; +use tracing::{debug, error, info, info_span, trace, warn, Instrument}; + use shuttle_gateway::acme::{AcmeClient, CustomDomain}; use shuttle_gateway::api::latest::{ApiBuilder, SVC_DEGRADED_THRESHOLD}; -use shuttle_gateway::args::StartArgs; use shuttle_gateway::args::{Args, Commands, UseTls}; +use shuttle_gateway::args::{StartArgs, SyncArgs}; use shuttle_gateway::proxy::UserServiceBuilder; use shuttle_gateway::service::{GatewayService, MIGRATIONS}; use shuttle_gateway::tls::make_tls_acceptor; use shuttle_gateway::worker::{Worker, WORKER_QUEUE_SIZE}; -use sqlx::migrate::MigrateDatabase; -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous}; -use sqlx::{Sqlite, SqlitePool}; -use std::io::{self, Cursor}; - -use std::path::PathBuf; -use std::str::FromStr; -use std::sync::Arc; -use std::time::Duration; -use tracing::{debug, error, info, info_span, trace, warn, Instrument}; #[tokio::main(flavor = "multi_thread")] -async fn main() -> io::Result<()> { - let args = Args::parse(); +async fn main() { + setup_tracing(tracing_subscriber::registry(), Backend::Gateway); + let args = Args::parse(); trace!(args = ?args, "parsed args"); - let ph_client_options = ClientOptions::new( + let posthog_client = async_posthog::client(ClientOptions::new( "phc_cQMQqF5QmcEzXEaVlrhv3yBSNRyaabXYAyiCV7xKHUH".to_string(), "https://eu.posthog.com".to_string(), Duration::from_millis(800), - ); - - let posthog_client = async_posthog::client(ph_client_options); - - setup_tracing(tracing_subscriber::registry(), Backend::Gateway); + )); let db_path = args.state.join("gateway.sqlite"); let db_uri = db_path.to_str().unwrap(); + info!("Using state db: {}", db_uri); if !db_path.exists() { + info!("Creating new state db"); Sqlite::create_database(db_uri).await.unwrap(); } - info!( - "state db: {}", - std::fs::canonicalize(&args.state) - .unwrap() - .to_string_lossy() - ); - let sqlite_options = SqliteConnectOptions::from_str(db_uri) .unwrap() .journal_mode(SqliteJournalMode::Wal) @@ -63,34 +56,80 @@ async fn main() -> io::Result<()> { // LD_LIBRARY_PATH env set in build.rs. .extension("ulid0"); + info!("Connecting and migrating db..."); let db = SqlitePool::connect_with(sqlite_options).await.unwrap(); MIGRATIONS.run(&db).await.unwrap(); match args.command { Commands::Start(start_args) => start(db, args.state, posthog_client, start_args).await, + Commands::Sync(sync_args) => sync_permit_projects(db, sync_args).await, + } +} + +async fn sync_permit_projects(db: SqlitePool, args: SyncArgs) { + let client = permit::Client::new( + args.permit.permit_api_uri.to_string(), + args.permit.permit_pdp_uri.to_string(), + "default".to_owned(), + args.permit.permit_env, + args.permit.permit_api_key, + ); + + let projects: Vec<(String, String)> = + sqlx::query_as("SELECT user_id, project_id FROM projects") + .fetch_all(&db) + .await + .unwrap(); + let mut projects_by_user = BTreeMap::>::new(); + for (uid, pid) in projects { + let v = projects_by_user.entry(uid).or_default(); + v.push(pid); + } + + for (uid, pids) in projects_by_user { + println!("syncing {uid} projects"); + match client.get_user_projects(&uid).await { + Ok(projs) => { + for pid in pids { + if !projs + .iter() + .any(|p| p.resource.as_ref().unwrap().key == pid) + { + println!("creating project link {uid} <-> {pid}"); + client.create_project(&uid, &pid).await.unwrap(); + } else { + println!("project link exists {uid} <-> {pid}"); + } + } + } + Err(e) => { + println!("failed to get projects for {uid}. skipping. error: {e:?}"); + } + } } } async fn start( db: SqlitePool, - fs: PathBuf, + state_dir: PathBuf, posthog_client: async_posthog::Client, args: StartArgs, -) -> io::Result<()> { +) { let gateway = Arc::new( GatewayService::init( args.context.clone(), db, - fs, + state_dir, Box::new(permit::Client::new( - args.context.permit_api_uri.to_string(), - args.context.permit_pdp_uri.to_string(), + args.permit.permit_api_uri.to_string(), + args.permit.permit_pdp_uri.to_string(), "default".to_owned(), - args.context.permit_env, - args.context.permit_api_key, + args.permit.permit_env, + args.permit.permit_api_key, )), ) - .await?, + .await + .unwrap(), ); let worker = Worker::new(); @@ -223,6 +262,4 @@ async fn start( _ = user_handle => error!("user handle finished"), _ = ambulance_handle => error!("ambulance handle finished"), ); - - Ok(()) } diff --git a/gateway/src/service.rs b/gateway/src/service.rs index 1cde62f67..dfbf57eac 100644 --- a/gateway/src/service.rs +++ b/gateway/src/service.rs @@ -42,7 +42,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use ulid::Ulid; use crate::acme::{AcmeClient, CustomDomain}; -use crate::args::ContextArgs; +use crate::args::ServiceArgs; use crate::project::{Project, ProjectCreating, ProjectError, IS_HEALTHY_TIMEOUT}; use crate::task::{self, BoxedTask, TaskBuilder}; use crate::tls::ChainAndPrivateKey; @@ -80,8 +80,8 @@ impl ContainerSettingsBuilder { Self::default() } - pub async fn from_args(self, args: &ContextArgs) -> ContainerSettings { - let ContextArgs { + pub async fn from_args(self, args: &ServiceArgs) -> ContainerSettings { + let ServiceArgs { prefix, network_name, provisioner_host, @@ -190,7 +190,7 @@ pub struct GatewayService { context: GatewayContext, db: SqlitePool, task_router: TaskRouter, - pub state_location: PathBuf, + pub state_dir: PathBuf, pub permit_client: Box, /// Maximum number of containers the gateway can start before blocking cch projects @@ -211,9 +211,9 @@ impl GatewayService { /// * `args` - The [`Args`] with which the service was /// started. Will be passed as [`Context`] to workers and state. pub async fn init( - args: ContextArgs, + args: ServiceArgs, db: SqlitePool, - state_location: PathBuf, + state_dir: PathBuf, permit_client: Box, ) -> io::Result { let docker_stats_path_v1 = PathBuf::from_str(DOCKER_STATS_PATH_CGROUP_V1) @@ -262,7 +262,7 @@ impl GatewayService { context: provider, db, task_router, - state_location, + state_dir, permit_client, provisioner_host: Endpoint::new(format!("http://{}:8000", args.provisioner_host)) .expect("to have a valid provisioner endpoint"), @@ -841,7 +841,7 @@ impl GatewayService { acme: &AcmeClient, creds: AccountCredentials<'_>, ) -> ChainAndPrivateKey { - let tls_path = self.state_location.join("ssl.pem"); + let tls_path = self.state_dir.join("ssl.pem"); match ChainAndPrivateKey::load_pem(&tls_path) { Ok(valid) => valid, Err(_) => { @@ -915,7 +915,7 @@ impl GatewayService { } pub fn credentials(&self) -> AccountCredentials<'_> { - let creds_path = self.state_location.join("acme.json"); + let creds_path = self.state_dir.join("acme.json"); if !creds_path.exists() { panic!( "no ACME credentials found at {}, cannot continue with certificate creation", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 22a0f3765..521dc45ef 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -16,13 +16,10 @@ shuttle-service = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } -chrono = { workspace = true } colored = { workspace = true, optional = true } -prost-types = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } strfmt = { workspace = true } -thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } tonic = { workspace = true }