Skip to content

Commit

Permalink
Merge pull request #27 from Danktronics/dev
Browse files Browse the repository at this point in the history
Auto Restart - 0.3
  • Loading branch information
Sei4or authored Nov 15, 2020
2 parents 766cd06 + a559105 commit 80a1232
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 21 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "influo"
version = "0.2.1"
version = "0.3.0"
authors = ["Sei4or <[email protected]>", "sithsiri <[email protected]>"]
description = "CI / CD the right way"
license = "MIT"
Expand All @@ -12,8 +12,7 @@ anyhow = "1.0"
regex = "1"
lazy_static = "1.4.0"
crossbeam-channel = "0.5"
tokio = { version = "0.2", features = ["process", "rt-threaded", "sync", "io-util"] }
tokio = { version = "0.3.3", features = ["process", "rt-multi-thread", "sync", "io-util"] }
futures = "0.3.4"
bytes = "0.5.4"
shell-words = "1.0.0"
chrono = "0.4"
1 change: 1 addition & 0 deletions examples/example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"environment": "production",
"condition": "automatic",
"deploy_path": "./projects",
"auto_restart": {"not": [0]},
"branches": [
"master"
],
Expand Down
71 changes: 71 additions & 0 deletions src/model/project/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@ pub struct Procedure {
pub environment: String,
pub condition: String,
pub deploy_path: String,
pub auto_restart: AutoRestartPolicy,
pub branches: Vec<String>,
pub log: Option<String>,
}

#[derive(Debug, Clone)]
pub enum AutoRestartPolicy {
Always, // If the command was unsuccessful, restart
Never, // If the command was unsuccessful, don't restart
ExclusionCodes(Vec<i32>), // If the command was unsuccessful and if it is NOT one of the exclusion codes restart
InclusionCodes(Vec<i32>), // If the command was unsuccessful and if it is one of the inclusion codes restart
}

impl Procedure {
pub fn new(raw_procedure: &Value, raw_default_deploy_path: Option<&Value>) -> Result<Procedure, Error> {
let name: &str = match raw_procedure.get("name") {
Expand Down Expand Up @@ -67,6 +76,67 @@ impl Procedure {
}
};

let auto_restart: AutoRestartPolicy = match raw_procedure.get("auto_restart") {
Some(raw_auto_restart) => match raw_auto_restart.as_bool() {
Some(raw_auto_restart_bool) => {
if raw_auto_restart_bool {
AutoRestartPolicy::Always
} else {
AutoRestartPolicy::Never
}
},
None => match raw_auto_restart.as_object() {
Some(raw_auto_restart_object) => {
if raw_auto_restart_object.contains_key("only") {
match raw_auto_restart_object.get("only").unwrap().as_array() {
Some(raw_auto_restart_inclusion_codes) => {
let mut inclusion_codes: Vec<i32> = Vec::new();
for raw_code in raw_auto_restart_inclusion_codes {
match raw_code.as_u64() {
Some(raw_code_u64) => {
if raw_code_u64 > std::i32::MAX as u64 {
return Err(anyhow!("An auto restart integer provided exceeded the i32 max"));
}

inclusion_codes.push(raw_code_u64 as i32);
},
None => return Err(anyhow!("An auto restart value is not a valid i32"))
}
}
AutoRestartPolicy::InclusionCodes(inclusion_codes)
},
None => return Err(anyhow!("Auto restart rule inclusion codes not an array"))
}
} else if raw_auto_restart_object.contains_key("not") {
match raw_auto_restart_object.get("not").unwrap().as_array() {
Some(raw_auto_restart_inclusion_codes) => {
let mut exclusion_codes: Vec<i32> = Vec::new();
for raw_code in raw_auto_restart_inclusion_codes {
match raw_code.as_u64() {
Some(raw_code_u64) => {
if raw_code_u64 > std::i32::MAX as u64 {
return Err(anyhow!("An auto restart integer provided exceeded the i32 max"));
}

exclusion_codes.push(raw_code_u64 as i32);
},
None => return Err(anyhow!("An auto restart value is not a valid i32"))
}
}
AutoRestartPolicy::ExclusionCodes(exclusion_codes)
},
None => return Err(anyhow!("Auto restart rule exclusion codes not an array"))
}
} else {
return Err(anyhow!("Auto restart rule object does not specify a valid rule"));
}
},
None => return Err(anyhow!("Auto restart rule is not an object or boolean"))
}
},
None => AutoRestartPolicy::Never
};

let raw_branches: &Vec<Value> = match raw_procedure.get("branches") {
Some(v) => match v.as_array() {
Some(v) => v,
Expand Down Expand Up @@ -98,6 +168,7 @@ impl Procedure {
environment: environment.to_string(),
condition: condition.to_string(),
deploy_path: deploy_path.to_string(),
auto_restart: auto_restart,
branches: branches,
log: log,
})
Expand Down
73 changes: 57 additions & 16 deletions src/procedure_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
project::{
Project,
branch::Branch,
procedure::Procedure,
procedure::{Procedure, AutoRestartPolicy},
},
channel::{
Channel,
Expand All @@ -34,13 +34,18 @@ pub fn run_project_procedure(project: &Project, branch: &Branch, procedure: &Pro
let commands: Vec<String> = procedure.commands.clone();
let procedure_name = procedure.name.clone();
let procedure_log = procedure.log.clone();
let procedure_restart_policy = procedure.auto_restart.clone();

thread::spawn(move || {
let mut success = true;
for command in commands {
let mut current_command_index = 0;
loop {
let command = &commands[current_command_index];

info!(format!("[{}] [{}] Running command: {}", procedure_name, path, command));
let mut runtime = Builder::new().threaded_scheduler().enable_all().build().unwrap();
let result_child_process = runtime.handle().enter(|| run_procedure_command(&command, &path));
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let _guard = runtime.enter();
let result_child_process = run_procedure_command(&command, &path);
if result_child_process.is_err() {
break;
}
Expand All @@ -63,13 +68,47 @@ pub fn run_project_procedure(project: &Project, branch: &Branch, procedure: &Pro

// Blocks the thread until the child process running the command has exited
let read_connection = procedure_thread_connection.read().unwrap();
if !runtime.block_on(manage_child(&mut child_process, &read_connection)) {
match child_process.kill() {
Ok(()) => (),
Err(_e) => warn!(format!("[{}] Unable to kill child process. It may already be dead.", procedure_name))
};
info!(format!("[{}] Skipping the remaining commands for project (URL: {}) on branch {} in procedure {}", procedure_name, read_connection.remote_url, read_connection.branch, read_connection.procedure_name));
success = false;
let child_result = runtime.block_on(manage_child(&mut child_process, &read_connection));
if !child_result.0 {
if let Some(exit_code) = child_result.1 {
let should_restart = match &procedure_restart_policy {
AutoRestartPolicy::Always => true,
AutoRestartPolicy::Never => false,
AutoRestartPolicy::ExclusionCodes(excluded_codes) => {
if !excluded_codes.contains(&exit_code) {
true
} else {
false
}
},
AutoRestartPolicy::InclusionCodes(included_codes) => {
if included_codes.contains(&exit_code) {
true
} else {
false
}
}
};

if !should_restart {
match runtime.block_on(child_process.kill()) {
Ok(()) => (),
Err(_e) => warn!(format!("[{}] Unable to kill child process. It may already be dead.", procedure_name))
};
info!(format!("[{}] Skipping the remaining commands for project (URL: {}) on branch {} in procedure {}", procedure_name, read_connection.remote_url, read_connection.branch, read_connection.procedure_name));
success = false;
break;
}
} else {
error!(format!("[{}] Encountered unsuccessful child response with missing exit code", procedure_name));
success = false;
break;
}
} else {
current_command_index += 1;
}

if commands.len() == current_command_index {
break;
}
}
Expand All @@ -83,8 +122,10 @@ pub fn run_project_procedure(project: &Project, branch: &Branch, procedure: &Pro
Ok(())
}

/// Manages a child and returns a future with a bool (true if command ran successfully)
async fn manage_child(child: &mut Child, connection: &ThreadProcedureConnection) -> bool {
/// Manages a child and returns a future with the result
/// Result.0 is if the command was successful
/// Result.1 is if the command should be rerun
async fn manage_child(child: &mut Child, connection: &ThreadProcedureConnection) -> (bool, Option<i32>) {
let child_completion_future = complete_child(child).fuse();
let command_exit = process_commands(&connection.owner_channel).fuse();

Expand All @@ -93,11 +134,11 @@ async fn manage_child(child: &mut Child, connection: &ThreadProcedureConnection)
select! {
(success, exit_code) = child_completion_future => {
debug!(format!("[{}]: Child exited with code {}", connection.procedure_name, exit_code));
return success;
return (success, Some(exit_code));
},
() = command_exit => {
debug!(format!("[{}]: Terminating due to Command::KillProcedure", connection.procedure_name));
return false;
return (false, None);
},
}
}
Expand All @@ -106,7 +147,7 @@ async fn manage_child(child: &mut Child, connection: &ThreadProcedureConnection)
/// Bool indicates whether it exited successfully
/// i32 is status code
async fn complete_child(child: &mut Child) -> (bool, i32) {
let status_result: Result<ExitStatus, std::io::Error> = child.await; // Blocking
let status_result: Result<ExitStatus, std::io::Error> = child.wait().await;
if status_result.is_err() {
return (false, 1);
}
Expand Down
4 changes: 2 additions & 2 deletions src/system_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ pub fn setup_git_repository(remote_url: &str, project_deploy_path: &str, branch:
// Make sure the deploy path is valid
fs::create_dir_all(&project_path)?;

let clone_attempt = run_system_command(&format!("git clone {} {}", remote_url, branch), &project_path);
let clone_attempt = run_system_command(&format!("git clone --single-branch --branch {} {} {}", branch, remote_url, branch), &project_path);
if clone_attempt.is_err() {
if let Err(e0) = clone_attempt {
debug!(format!("Git clone attempt failed for {} due to: {}", remote_url, e0));
let pull_attempt = run_system_command(&"git pull", &format!("{}/{}", project_path, branch));
let pull_attempt = run_system_command(&format!("git pull origin {}", branch), &format!("{}/{}", project_path, branch));
if pull_attempt.is_err() {
if let Err(e1) = pull_attempt {
debug!(format!("Git pull attempt failed for {} due to: {}", remote_url, e1));
Expand Down

0 comments on commit 80a1232

Please sign in to comment.