Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compute_ctl): Allow usage of DB names with whitespaces #9919

Merged
merged 5 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions compute_tools/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use compute_api::responses::CatalogObjects;
use futures::Stream;
use postgres::NoTls;
use std::{path::Path, process::Stdio, result::Result, sync::Arc};
Expand All @@ -13,7 +12,8 @@ use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::warn;

use crate::compute::ComputeNode;
use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async};
use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async, postgres_conf_for_db};
use compute_api::responses::CatalogObjects;

pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<CatalogObjects> {
let connstr = compute.connstr.clone();
Expand Down Expand Up @@ -43,6 +43,8 @@ pub enum SchemaDumpError {
DatabaseDoesNotExist,
#[error("Failed to execute pg_dump.")]
IO(#[from] std::io::Error),
#[error("Unexpected error.")]
Unexpected,
}

// It uses the pg_dump utility to dump the schema of the specified database.
Expand All @@ -60,11 +62,38 @@ pub async fn get_database_schema(
let pgbin = &compute.pgbin;
let basepath = Path::new(pgbin).parent().unwrap();
let pgdump = basepath.join("pg_dump");
let mut connstr = compute.connstr.clone();
connstr.set_path(dbname);

// Replace the DB in the connection string and disable it to parts.
// This is the only option to handle DBs with special characters.
let conf =
postgres_conf_for_db(&compute.connstr, dbname).map_err(|_| SchemaDumpError::Unexpected)?;
let host = conf
.get_hosts()
.first()
.ok_or(SchemaDumpError::Unexpected)?;
let host = match host {
tokio_postgres::config::Host::Tcp(ip) => ip.to_string(),
#[cfg(unix)]
tokio_postgres::config::Host::Unix(path) => path.to_string_lossy().to_string(),
};
let port = conf
.get_ports()
.first()
.ok_or(SchemaDumpError::Unexpected)?;
let user = conf.get_user().ok_or(SchemaDumpError::Unexpected)?;
let dbname = conf.get_dbname().ok_or(SchemaDumpError::Unexpected)?;

let mut cmd = Command::new(pgdump)
// XXX: this seems to be the only option to deal with DBs with `=` in the name
// See <https://www.postgresql.org/message-id/flat/20151023003445.931.91267%40wrigleys.postgresql.org>
.env("PGDATABASE", dbname)
.arg("--host")
.arg(host)
.arg("--port")
.arg(port.to_string())
.arg("--username")
.arg(user)
.arg("--schema-only")
.arg(connstr.as_str())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
Expand Down
153 changes: 86 additions & 67 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ use utils::measured_stream::MeasuredReader;
use nix::sys::signal::{kill, Signal};
use remote_storage::{DownloadError, RemotePath};
use tokio::spawn;
use url::Url;

use crate::installed_extensions::get_installed_extensions_sync;
use crate::installed_extensions::get_installed_extensions;
use crate::local_proxy;
use crate::pg_helpers::*;
use crate::spec::*;
Expand Down Expand Up @@ -816,30 +815,32 @@ impl ComputeNode {
Ok(())
}

async fn get_maintenance_client(url: &Url) -> Result<tokio_postgres::Client> {
let mut connstr = url.clone();
async fn get_maintenance_client(
conf: &tokio_postgres::Config,
) -> Result<tokio_postgres::Client> {
let mut conf = conf.clone();

connstr
.query_pairs_mut()
.append_pair("application_name", "apply_config");
conf.application_name("apply_config");

let (client, conn) = match tokio_postgres::connect(connstr.as_str(), NoTls).await {
let (client, conn) = match conf.connect(NoTls).await {
// If connection fails, it may be the old node with `zenith_admin` superuser.
//
// In this case we need to connect with old `zenith_admin` name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
Err(e) => match e.code() {
Some(&SqlState::INVALID_PASSWORD)
| Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => {
// connect with zenith_admin if cloud_admin could not authenticate
// Connect with zenith_admin if cloud_admin could not authenticate
info!(
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
e
);
let mut zenith_admin_connstr = connstr.clone();

zenith_admin_connstr
.set_username("zenith_admin")
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
zenith_admin_conf.user("zenith_admin");

let mut client =
Client::connect(zenith_admin_connstr.as_str(), NoTls)
zenith_admin_conf.connect(NoTls)
.context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?;

// Disable forwarding so that users don't get a cloud_admin role
Expand All @@ -853,8 +854,8 @@ impl ComputeNode {

drop(client);

// reconnect with connstring with expected name
tokio_postgres::connect(connstr.as_str(), NoTls).await?
// Reconnect with connstring with expected name
conf.connect(NoTls).await?
}
_ => return Err(e.into()),
},
Expand Down Expand Up @@ -885,7 +886,7 @@ impl ComputeNode {
pub fn apply_spec_sql(
&self,
spec: Arc<ComputeSpec>,
url: Arc<Url>,
conf: Arc<tokio_postgres::Config>,
concurrency: usize,
) -> Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
Expand All @@ -897,7 +898,7 @@ impl ComputeNode {

rt.block_on(async {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&url).await?;
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();

let databases = get_existing_dbs_async(&client).await?;
Expand Down Expand Up @@ -931,7 +932,7 @@ impl ComputeNode {
RenameAndDeleteDatabases,
CreateAndAlterDatabases,
] {
debug!("Applying phase {:?}", &phase);
info!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
Expand All @@ -942,6 +943,7 @@ impl ComputeNode {
.await?;
}

info!("Applying RunInEachDatabase phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));

let db_processes = spec
Expand All @@ -955,7 +957,7 @@ impl ComputeNode {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut url = url.as_ref().clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
let db = db.clone();

Expand All @@ -964,14 +966,14 @@ impl ComputeNode {
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
url.set_path(db.name.as_str());
conf.dbname(db.name.as_str());
}
}

let url = Arc::new(url);
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
spec.clone(),
url,
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
Expand Down Expand Up @@ -1017,7 +1019,7 @@ impl ComputeNode {
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
async fn apply_spec_sql_db(
spec: Arc<ComputeSpec>,
url: Arc<Url>,
conf: Arc<tokio_postgres::Config>,
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
jwks_roles: Arc<HashSet<String>>,
concurrency_token: Arc<tokio::sync::Semaphore>,
Expand Down Expand Up @@ -1046,7 +1048,7 @@ impl ComputeNode {
// that database.
|| async {
if client_conn.is_none() {
let db_client = Self::get_maintenance_client(&url).await?;
let db_client = Self::get_maintenance_client(&conf).await?;
client_conn.replace(db_client);
}
let client = client_conn.as_ref().unwrap();
Expand All @@ -1061,34 +1063,16 @@ impl ComputeNode {
Ok::<(), anyhow::Error>(())
}

/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
//
// In this case we need to connect with old `zenith_admin` name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
let mut url = self.connstr.clone();
url.query_pairs_mut()
.append_pair("application_name", "apply_config");

let url = Arc::new(url);
let spec = Arc::new(
compute_state
.pspec
.as_ref()
.expect("spec must be set")
.spec
.clone(),
);

// Choose how many concurrent connections to use for applying the spec changes.
// If the cluster is not currently Running we don't have to deal with user connections,
/// Choose how many concurrent connections to use for applying the spec changes.
pub fn max_service_connections(
&self,
compute_state: &ComputeState,
spec: &ComputeSpec,
) -> usize {
// If the cluster is in Init state we don't have to deal with user connections,
// and can thus use all `max_connections` connection slots. However, that's generally not
// very efficient, so we generally still limit it to a smaller number.
let max_concurrent_connections = if compute_state.status != ComputeStatus::Running {
if compute_state.status == ComputeStatus::Init {
// If the settings contain 'max_connections', use that as template
if let Some(config) = spec.cluster.settings.find("max_connections") {
config.parse::<usize>().ok()
Expand Down Expand Up @@ -1144,10 +1128,29 @@ impl ComputeNode {
.map(|val| if val > 1 { val - 1 } else { 1 })
.last()
.unwrap_or(3)
};
}
}

/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
let mut conf = tokio_postgres::Config::from_str(self.connstr.as_str()).unwrap();
conf.application_name("apply_config");

let conf = Arc::new(conf);
let spec = Arc::new(
compute_state
.pspec
.as_ref()
.expect("spec must be set")
.spec
.clone(),
);

let max_concurrent_connections = self.max_service_connections(compute_state, &spec);

// Merge-apply spec & changes to PostgreSQL state.
self.apply_spec_sql(spec.clone(), url.clone(), max_concurrent_connections)?;
self.apply_spec_sql(spec.clone(), conf.clone(), max_concurrent_connections)?;

if let Some(ref local_proxy) = &spec.clone().local_proxy_config {
info!("configuring local_proxy");
Expand All @@ -1156,12 +1159,11 @@ impl ComputeNode {

// Run migrations separately to not hold up cold starts
thread::spawn(move || {
let mut connstr = url.as_ref().clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "migrations");
let conf = conf.as_ref().clone();
let mut conf = postgres::config::Config::from(conf);
conf.application_name("migrations");

let mut client = Client::connect(connstr.as_str(), NoTls)?;
let mut client = conf.connect(NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
});

Expand Down Expand Up @@ -1222,21 +1224,28 @@ impl ComputeNode {
let pgdata_path = Path::new(&self.pgdata);
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
config::write_postgres_conf(&postgresql_conf_path, &spec, None)?;
// temporarily reset max_cluster_size in config

// TODO(ololobus): We need a concurrency during reconfiguration as well,
// but DB is already running and used by user. We can easily get out of
// `max_connections` limit, and the current code won't handle that.
// let compute_state = self.state.lock().unwrap().clone();
// let max_concurrent_connections = self.max_service_connections(&compute_state, &spec);
let max_concurrent_connections = 1;

// Temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:
// creating new extensions, roles, etc...
// creating new extensions, roles, etc.
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
self.pg_reload_conf()?;

if spec.mode == ComputeMode::Primary {
let mut url = self.connstr.clone();
url.query_pairs_mut()
.append_pair("application_name", "apply_config");
let url = Arc::new(url);
let mut conf = tokio_postgres::Config::from_str(self.connstr.as_str()).unwrap();
conf.application_name("apply_config");
let conf = Arc::new(conf);

let spec = Arc::new(spec.clone());

self.apply_spec_sql(spec, url, 1)?;
self.apply_spec_sql(spec, conf, max_concurrent_connections)?;
}

Ok(())
Expand Down Expand Up @@ -1362,7 +1371,17 @@ impl ComputeNode {

let connstr = self.connstr.clone();
thread::spawn(move || {
get_installed_extensions_sync(connstr).context("get_installed_extensions")
let res = get_installed_extensions(&connstr);
match res {
Ok(extensions) => {
info!(
"[NEON_EXT_STAT] {}",
serde_json::to_string(&extensions)
.expect("failed to serialize extensions list")
);
}
Err(err) => error!("could not get installed extensions: {err:?}"),
}
});
}

Expand Down
7 changes: 6 additions & 1 deletion compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,12 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}

let connstr = compute.connstr.clone();
let res = crate::installed_extensions::get_installed_extensions(connstr).await;
let res = task::spawn_blocking(move || {
installed_extensions::get_installed_extensions(&connstr)
})
.await
.unwrap();

match res {
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
Err(e) => render_json_error(
Expand Down
Loading
Loading