Skip to content

Commit

Permalink
Add kubernetes-test-framework
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII committed May 28, 2020
1 parent 96a06e0 commit 7b9a6eb
Show file tree
Hide file tree
Showing 13 changed files with 613 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ members = [
"lib/codec",
"lib/file-source",
"lib/tracing-limit",
"lib/kubernetes-test-framework",
]

[dependencies]
Expand Down
11 changes: 11 additions & 0 deletions lib/kubernetes-test-framework/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "kubernetes-test-framework"
version = "0.1.0"
authors = ["MOZGIII <[email protected]>"]
edition = "2018"
description = "Kubernetes Test Framework used to test Vector in Kubernetes"

[dependencies]
k8s-openapi = { version = "0.7", default-features = false, features = ["v1_15"] }
serde_json = "1"
tempfile = "3"
81 changes: 81 additions & 0 deletions lib/kubernetes-test-framework/src/framework.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! The test framework main entry point.
use super::{
log_lookup, namespace, test_pod, vector, wait_for_resource, wait_for_rollout, Interface, Result,
};

pub struct Framework {
interface: Interface,
}

impl Framework {
/// Create a new [`Framework`].
pub fn new(interface: Interface) -> Self {
Self { interface }
}

pub fn vector(&self, namespace: &str, custom_resource: &str) -> Result<vector::Manager> {
let manager = vector::Manager::new(
self.interface.deploy_vector_command.as_str(),
namespace,
custom_resource,
)?;
manager.up()?;
Ok(manager)
}

pub fn namespace(&self, namespace: &str) -> Result<namespace::Manager> {
let manager = namespace::Manager::new(&self.interface.kubectl_command, namespace)?;
manager.up()?;
Ok(manager)
}

pub fn test_pod(&self, config: test_pod::Config) -> Result<test_pod::Manager> {
let manager = test_pod::Manager::new(&self.interface.kubectl_command, config)?;
manager.up()?;
Ok(manager)
}

pub fn logs(&self, namespace: &str, resource: &str) -> Result<log_lookup::Reader> {
log_lookup::logs(&self.interface.kubectl_command, namespace, resource)
}

pub fn wait<'a>(
&self,
namespace: &str,
resources: impl IntoIterator<Item = &'a str>,
wait_for: wait_for_resource::WaitFor<&'_ str>,
extra: impl IntoIterator<Item = &'a str>,
) -> Result<()> {
wait_for_resource::namespace(
&self.interface.kubectl_command,
namespace,
resources,
wait_for,
extra,
)
}

pub fn wait_all_namespaces<'a>(
&self,
resources: impl IntoIterator<Item = &'a str>,
wait_for: wait_for_resource::WaitFor<&'_ str>,
extra: impl IntoIterator<Item = &'a str>,
) -> Result<()> {
wait_for_resource::all_namespaces(
&self.interface.kubectl_command,
resources,
wait_for,
extra,
)
}

pub fn wait_for_rollout<'a>(
&self,
namespace: &str,
resource: &str,
extra: impl IntoIterator<Item = &'a str>,
) -> Result<()> {
wait_for_rollout::run(&self.interface.kubectl_command, namespace, resource, extra)
}
}
17 changes: 17 additions & 0 deletions lib/kubernetes-test-framework/src/interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::env;

#[derive(Debug)]
pub struct Interface {
pub deploy_vector_command: String,
pub kubectl_command: String,
}

impl Interface {
pub fn from_env() -> Option<Self> {
Some(Self {
deploy_vector_command: env::var("KUBE_TEST_DEPLOY_COMMAND").ok()?,
kubectl_command: env::var("VECTOR_TEST_KUBECTL")
.unwrap_or_else(|_| "kubectl".to_owned()),
})
}
}
34 changes: 34 additions & 0 deletions lib/kubernetes-test-framework/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Kubernetes test framework.
//!
//! The main goal of the design of this test framework is to wire kubernetes
//! components testing through the same tools that are available to the
//! developer as executable commands, rather than using a rust interface to talk
//! to k8s cluster directly.
//! This enables very trivial troubleshooting and allows us to use the same
//! deployemnt mechanisms that we use for prodcution - effectively giving us
//! the opportunity to test e2e - not just the code layer, but also the
//! deployment configuration.
// TODO: deny
#![allow(
missing_debug_implementations,
missing_copy_implementations,
missing_docs
)]

pub mod framework;
pub mod interface;
pub mod log_lookup;
pub mod namespace;
mod resource_file;
pub mod test_pod;
pub mod vector;
pub mod wait_for_resource;
pub mod wait_for_rollout;

// Re-export some unit for trivial accessability.

pub use framework::Framework;
pub use interface::Interface;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
124 changes: 124 additions & 0 deletions lib/kubernetes-test-framework/src/log_lookup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use super::Result;
use std::io::{BufRead, BufReader};
use std::process::{Child, ChildStdout, Command, ExitStatus, Stdio};

pub fn logs(kubectl_command: &str, namespace: &str, resource: &str) -> Result<Reader> {
let mut command = Command::new(kubectl_command);

command.stdin(Stdio::null()).stderr(Stdio::inherit());

command.arg("logs");
command.arg("-f");
command.arg("-n").arg(namespace);
command.arg(resource);

let reader = Reader::spawn(command)?;
Ok(reader)
}

pub struct Reader {
child: Child,
reader: BufReader<ChildStdout>,
}

impl Reader {
pub fn spawn(mut command: Command) -> std::io::Result<Self> {
Self::prepare_stdout(&mut command);
let child = command.spawn()?;
Ok(Self::new(child))
}

fn prepare_stdout(command: &mut Command) {
command.stdout(Stdio::piped());
}

fn new(mut child: Child) -> Self {
let stdout = child.stdout.take().unwrap();
let reader = BufReader::new(stdout);
Reader { child, reader }
}

pub fn wait(&mut self) -> std::io::Result<ExitStatus> {
self.child.wait()
}

pub fn kill(&mut self) -> std::io::Result<()> {
self.child.kill()
}
}

impl Iterator for Reader {
type Item = String;

fn next(&mut self) -> Option<Self::Item> {
let mut s = String::new();
let result = self.reader.read_line(&mut s);
match result {
Ok(0) => None,
Ok(_) => Some(s),
Err(err) => panic!(err),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_reader_finite() {
let mut command = Command::new("echo");
command.arg("test");

let mut reader = Reader::spawn(command).expect("unable to spawn");

// Collect all line, expect stream to finish.
let lines: Vec<_> = (&mut reader).collect();
// Assert we got all the lines we expected.
assert_eq!(lines, vec!["test\n".to_owned()]);

// Ensure wait doesn't fail, and that we exit status is success.
let exit_status = reader.wait().expect("wait failed");
assert!(exit_status.success());
}

#[test]
fn test_reader_inifinite() {
let mut command = Command::new("bash");
command.arg("-c");
command.arg(r#"NUM=0; while true; do echo "Line $NUM"; NUM=$((NUM+=1)); sleep 0.01; done"#);

let mut reader = Reader::spawn(command).expect("unable to spawn");

// Read the lines and at some point ask the command we're reading from
// to stop.
let mut expected_num = 0;
while let Some(line) = (&mut reader).next() {
// Assert we're getting expected lines.
assert_eq!(line, format!("Line {}\n", expected_num));

// On line 100 issue a `kill` to stop the infinite stream.
if expected_num == 100 {
reader.kill().expect("process already stopped")
}

// If we are past 200 it means we issued `kill` at 100 and it wasn't
// effective. This is problem, fail the test.
// We don't to this immediately after `kill` to allow for some
// potential race condition. That kind of race is not just ok, but
// is desirable in the real-life usage to read-up the whole stdout
// buffer.
if expected_num > 200 {
panic!("went too far without stop being effective");
}

// Bump the expected num for the next iteration.
expected_num += 1;
}

// Ensure wait doesn't fail. We killed the process, so expect
// a non-success exit code.
let exit_status = reader.wait().expect("wait failed");
assert!(!exit_status.success());
}
}
45 changes: 45 additions & 0 deletions lib/kubernetes-test-framework/src/namespace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use super::Result;
use std::process::{Command, Stdio};

pub struct Manager {
kubectl_command: String,
namespace: String,
}

impl Manager {
pub fn new(kubectl_command: &str, namespace: &str) -> Result<Self> {
Ok(Self {
kubectl_command: kubectl_command.to_owned(),
namespace: namespace.to_owned(),
})
}

pub fn up(&self) -> Result<()> {
self.exec("create")
}

pub fn down(&self) -> Result<()> {
self.exec("delete")
}

fn exec(&self, subcommand: &str) -> Result<()> {
if !Command::new(&self.kubectl_command)
.arg(subcommand)
.arg("namespace")
.arg(&self.namespace)
.stdin(Stdio::null())
.spawn()?
.wait()?
.success()
{
Err(format!("failed to exec: {}", subcommand))?;
}
Ok(())
}
}

impl Drop for Manager {
fn drop(&mut self) {
self.down().expect("namespace turndown failed");
}
}
27 changes: 27 additions & 0 deletions lib/kubernetes-test-framework/src/resource_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::path::{Path, PathBuf};
use tempfile::{tempdir, TempDir};

#[derive(Debug)]
pub struct ResourceFile {
dir: TempDir,
path: PathBuf,
}

impl ResourceFile {
pub fn new(data: &str) -> std::io::Result<Self> {
let dir = tempdir()?;
let path = dir.path().join("custom.yaml");
std::fs::write(&path, data)?;
Ok(Self { dir, path })
}

pub fn path(&self) -> &Path {
self.path.as_path()
}
}

impl Drop for ResourceFile {
fn drop(&mut self) {
std::fs::remove_file(&self.path).expect("unable to clean up custom resource file");
}
}
Loading

0 comments on commit 7b9a6eb

Please sign in to comment.