Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(error code when shutdown fails): set exit flag to non-zero when shutdown times out #17676

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(missing_docs)]
use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf, time::Duration};
use std::{
collections::HashMap, num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration,
};

use exitcode::ExitCode;
use futures::StreamExt;
Expand Down Expand Up @@ -32,6 +34,11 @@ use crate::{
trace,
};

#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::os::windows::process::ExitStatusExt;

pub static WORKER_THREADS: OnceNonZeroUsize = OnceNonZeroUsize::new();

use crate::internal_events::{VectorQuit, VectorStarted, VectorStopped};
Expand Down Expand Up @@ -145,10 +152,10 @@ impl ApplicationConfig {
}

impl Application {
pub fn run() {
pub fn run() -> ExitStatus {
let (runtime, app) = Self::prepare_start().unwrap_or_else(|code| std::process::exit(code));

runtime.block_on(app.run());
runtime.block_on(app.run())
}

pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> {
Expand Down Expand Up @@ -242,7 +249,7 @@ pub struct StartedApplication {
}

impl StartedApplication {
pub async fn run(self) {
pub async fn run(self) -> ExitStatus {
self.main().await.shutdown().await
}

Expand Down Expand Up @@ -317,7 +324,7 @@ pub struct FinishedApplication {
}

impl FinishedApplication {
pub async fn shutdown(self) {
pub async fn shutdown(self) -> ExitStatus {
let FinishedApplication {
signal,
mut signal_rx,
Expand All @@ -335,18 +342,42 @@ impl FinishedApplication {
SignalTo::Shutdown => {
emit!(VectorStopped);
tokio::select! {
_ = topology_controller.stop() => (), // Graceful shutdown finished
_ = topology_controller.stop() => ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::OK as u32
}
#[cfg(unix)]
exitcode::OK
}), // Graceful shutdown finished
_ = signal_rx.recv() => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
// Dropping the shutdown future will immediately shut the server down
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}

}
}
SignalTo::Quit => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
drop(topology_controller);
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}
_ => unreachable!(),
}
Expand Down
13 changes: 9 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
extern crate vector;
use vector::app::Application;

use std::process::ExitCode;

#[cfg(unix)]
fn main() {
fn main() -> ExitCode {
#[cfg(feature = "allocation-tracing")]
{
use crate::vector::internal_telemetry::allocations::{
Expand Down Expand Up @@ -35,14 +37,17 @@ fn main() {
}
}

Application::run();
let exit_code = Application::run().code().unwrap_or(exitcode::UNAVAILABLE) as u8;
ExitCode::from(exit_code)
}

#[cfg(windows)]
pub fn main() {
pub fn main() -> ExitCode {
// We need to be able to run vector in User Interactive mode. We first try
// to run vector as a service. If we fail, we consider that we are in
// interactive mode and then fallback to console mode. See
// https://docs.microsoft.com/en-us/dotnet/api/system.environment.userinteractive?redirectedfrom=MSDN&view=netcore-3.1#System_Environment_UserInteractive
vector::vector_windows::run().unwrap_or_else(|_| Application::run());
let exit_code = vector::vector_windows::run()
.unwrap_or_else(|_| Application::run().code().unwrap_or(exitcode::UNAVAILABLE));
ExitCode::from(exit_code as u8)
}
17 changes: 13 additions & 4 deletions src/vector_windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const SERVICE_NAME: &str = "vector";
const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS;

const NO_ERROR: u32 = 0;
const ERROR: u32 = 121;

pub mod service_control {
use std::{ffi::OsString, fmt, fmt::Formatter, time::Duration};
Expand Down Expand Up @@ -361,8 +362,9 @@ fn win_main(arguments: Vec<OsString>) {
if let Err(_e) = run_service(arguments) {}
}

pub fn run() -> Result<()> {
service_dispatcher::start(SERVICE_NAME, ffi_service_main)
pub fn run() -> Result<i32> {
service_dispatcher::start(SERVICE_NAME, ffi_service_main).map(|()| 0_i32)
// Always returns 0 exit code as errors are handled by the service dispatcher.
}

fn run_service(_arguments: Vec<OsString>) -> Result<()> {
Expand Down Expand Up @@ -398,14 +400,21 @@ fn run_service(_arguments: Vec<OsString>) -> Result<()> {
process_id: None,
})?;

runtime.block_on(app.run());
let program_completion_status = runtime.block_on(app.run());

// Tell the system that service has stopped.
status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::Stopped,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(NO_ERROR),
exit_code: {
if program_completion_status.success() {
ServiceExitCode::Win32(NO_ERROR)
} else {
// we didn't gracefully shutdown within grace period.
ServiceExitCode::Win32(ERROR)
}
},
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None,
Expand Down