Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement live reconfiguration in the compute_ctl, rebased #3964

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df6
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending

## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
Expand Down
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50
Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one

# start postgres compute node
> ./target/debug/neon_local pg start main
Starting new postgres (v14) main on timeline de200bd42b49cc1814412c7e592dd6e9 ...
> ./target/debug/neon_local endpoint start main
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
Starting postgres at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'

# check list of running postgres instances
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
```

2. Now, it is possible to connect to postgres and run some queries:
Expand Down Expand Up @@ -184,14 +184,14 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
(L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601]

# start postgres on that branch
> ./target/debug/neon_local pg start migration_check --branch-name migration_check
Starting new postgres migration_check on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
> ./target/debug/neon_local endpoint start migration_check --branch-name migration_check
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Starting postgres node at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
Starting postgres at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'

# check the new list of running postgres instances
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16F9A38 running
migration_check 127.0.0.1:55433 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running

Expand Down
1 change: 1 addition & 0 deletions compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
url.workspace = true

compute_api.workspace = true
workspace_hack.workspace = true
161 changes: 94 additions & 67 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,24 @@ use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::exit;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};

use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use chrono::Utc;
use clap::Arg;
use tracing::{error, info};
use url::Url;

use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus};

use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_tools::compute::{ComputeNode, ComputeNodeInner, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::pg_helpers::*;
use compute_tools::spec::*;
use url::Url;
use compute_tools::spec::get_spec_from_control_plane;

fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
Expand All @@ -62,7 +64,7 @@ fn main() -> Result<()> {
let connstr = matches
.get_one::<String>("connstr")
.expect("Postgres connection string is required");
let spec = matches.get_one::<String>("spec");
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");

let compute_id = matches.get_one::<String>("compute-id");
Expand All @@ -71,40 +73,97 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();

let spec: ComputeSpec = match spec {
let mut spec = None;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
Some(json) => serde_json::from_str(json)?,
Some(json) => {
spec = Some(serde_json::from_str(json)?);
}
None => {
// Second, try to read it from the file if path is provided
if let Some(sp) = spec_path {
let path = Path::new(sp);
let file = File::open(path)?;
serde_json::from_reader(file)?
spec = Some(serde_json::from_reader(file)?);
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};

reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?
live_config_allowed = true;
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = Some(s);
}
} else {
panic!(
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
control_plane_uri, compute_id
);
panic!("must specify both --control-plane-uri and --compute-id or none");
}
} else {
panic!("compute spec should be provided via --spec or --spec-path argument");
panic!(
"compute spec should be provided by one of the following ways: \
--spec OR --spec-path OR --control-plane-uri and --compute-id"
);
}
}
};

// Volatile compute state under mutex and condition variable to notify everyone
// who is interested in the state changes.
let compute_node = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
inner: Mutex::new(ComputeNodeInner {
state: ComputeState {
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
},
spec: None,
metrics: ComputeMetrics::default(),
}),
state_changed: Condvar::new()
};

// If we have a spec already, go immediately into Init state.
let spec_set = spec.is_some();
if let Some(spec) = spec {
let mut inner = compute_node.inner.lock().unwrap();

let parsed_spec = ParsedSpec::try_from(spec)
.map_err(|msg| anyhow!("error parsing compute spec: {msg}"))?;
inner.spec = Some(parsed_spec);
inner.state.status = ComputeStatus::Init;
}

let compute = Arc::new(compute_node);

// Launch http service first, so we were able to serve control-plane
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");

if !spec_set {
// No spec was provided earlier, hang waiting for it.
info!("no compute spec provided, waiting");

let mut inner = compute.inner.lock().unwrap();
while inner.state.status != ComputeStatus::ConfigurationPending {
inner = compute.state_changed.wait(inner).unwrap();

if inner.state.status == ComputeStatus::ConfigurationPending {
info!("got spec, continue configuration");
// Spec is already set by the http server handler.
inner.state.status = ComputeStatus::Init;
break;
}
}
};

// We got the spec. Start up
let startup_tracing_context = {
let inner = compute.inner.lock().unwrap();
inner.spec.as_ref().unwrap().spec.startup_tracing_context.clone()
};

// Extract OpenTelemetry context for the startup actions from the spec, and
// attach it to the current tracing context.
//
Expand All @@ -120,50 +179,18 @@ fn main() -> Result<()> {
// postgres is configured and up-and-running, we exit this span. Any other
// actions that are performed on incoming HTTP requests, for example, are
// performed in separate spans.
let startup_context_guard = if let Some(ref carrier) = spec.startup_tracing_context {
let startup_context_guard = if let Some(ref carrier) = startup_tracing_context {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
Some(TraceContextPropagator::new().extract(carrier).attach())
} else {
None
};

let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let storage_auth_token = spec.storage_auth_token.clone();
let tenant = spec
.cluster
.settings
.find("neon.tenant_id")
.expect("tenant id should be provided");
let timeline = spec
.cluster
.settings
.find("neon.timeline_id")
.expect("tenant id should be provided");

let compute_state = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
spec,
tenant,
timeline,
pageserver_connstr,
storage_auth_token,
metrics: ComputeMetrics::default(),
state: RwLock::new(ComputeState::new()),
};
let compute = Arc::new(compute_state);

// Launch service threads first, so we were able to serve availability
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");

// Start Postgres
let mut delay_exit = false;
Expand All @@ -172,10 +199,10 @@ fn main() -> Result<()> {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:?}", err);
let mut state = compute.state.write().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
drop(state);
let mut inner = compute.inner.lock().unwrap();
inner.state.error = Some(format!("{:?}", err));
inner.state.status = ComputeStatus::Failed;
drop(inner);
delay_exit = true;
None
}
Expand Down Expand Up @@ -262,7 +289,7 @@ fn cli() -> clap::Command {
Arg::new("control-plane-uri")
.short('p')
.long("control-plane-uri")
.value_name("CONTROL_PLANE"),
.value_name("CONTROL_PLANE_API_BASE_URI"),
)
}

Expand Down
Loading