diff --git a/Cargo.lock b/Cargo.lock index 9bcc32a2..b649f679 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3544,6 +3544,7 @@ dependencies = [ "httpmock", "k8s-openapi", "kube", + "lazy_static", "mockall", "object_store", "parse_datetime_fork", @@ -3634,6 +3635,7 @@ dependencies = [ "serde_yaml", "sk-api", "sk-core", + "thiserror", "tokio", "tracing", "tracing-test", diff --git a/sk-cli/src/crd.rs b/sk-cli/src/crd.rs index f1d1d9dc..e32e642d 100644 --- a/sk-cli/src/crd.rs +++ b/sk-cli/src/crd.rs @@ -1,4 +1,3 @@ -use kube::CustomResourceExt; use sk_core::prelude::*; pub fn cmd() -> EmptyResult { diff --git a/sk-cli/src/validation/tests/mod.rs b/sk-cli/src/validation/tests/mod.rs index 67ef24c2..2f1e139f 100644 --- a/sk-cli/src/validation/tests/mod.rs +++ b/sk-cli/src/validation/tests/mod.rs @@ -4,7 +4,7 @@ mod validation_store_test; use std::collections::BTreeMap; use rstest::*; -use sk_core::k8s::testutils::test_deployment; +use sk_core::prelude::*; use sk_store::TraceEvent; use super::annotated_trace::AnnotatedTraceEvent; diff --git a/sk-cli/src/validation/tests/status_field_populated_test.rs b/sk-cli/src/validation/tests/status_field_populated_test.rs index 6be1f761..2dc18735 100644 --- a/sk-cli/src/validation/tests/status_field_populated_test.rs +++ b/sk-cli/src/validation/tests/status_field_populated_test.rs @@ -1,7 +1,6 @@ use assertables::*; use kube::api::DynamicObject; use serde_json::json; -use sk_core::k8s::testutils::test_deployment; use sk_store::TraceEvent; use super::*; diff --git a/sk-cli/src/xray/view/helpers.rs b/sk-cli/src/xray/view/helpers.rs index e087b432..e078d266 100644 --- a/sk-cli/src/xray/view/helpers.rs +++ b/sk-cli/src/xray/view/helpers.rs @@ -2,7 +2,7 @@ use chrono::TimeDelta; use kube::api::DynamicObject; use lazy_static::lazy_static; use ratatui::prelude::*; -use sk_core::k8s::KubeResourceExt; +use sk_core::prelude::*; use crate::validation::{ AnnotatedTraceEvent, diff --git a/sk-core/Cargo.toml b/sk-core/Cargo.toml index 06a75232..9796dde9 100644 --- a/sk-core/Cargo.toml +++ b/sk-core/Cargo.toml @@ -9,7 +9,7 @@ readme.workspace = true edition.workspace = true [features] -testutils = ["dep:http", "dep:httpmock", "dep:mockall", "dep:rstest"] +testutils = ["dep:http", "dep:httpmock", "dep:lazy_static", "dep:mockall", "dep:rstest"] [dependencies] anyhow = { workspace = true } @@ -37,6 +37,7 @@ url = { workspace = true } # testutils dependencies http = { workspace = true, optional = true } httpmock = { workspace = true, optional = true } +lazy_static = { workspace = true, optional = true } mockall = { workspace = true, optional = true } rstest = { workspace = true, optional = true } diff --git a/sk-core/src/constants.rs b/sk-core/src/constants.rs index 9b6201be..52204287 100644 --- a/sk-core/src/constants.rs +++ b/sk-core/src/constants.rs @@ -30,7 +30,10 @@ pub const ERROR_RETRY_DELAY_SECONDS: u64 = 30; #[cfg(feature = "testutils")] mod test_constants { - pub const EMPTY_OBJ_HASH: u64 = 15130871412783076140; + use lazy_static::lazy_static; + + use crate::k8s::GVK; + pub const EMPTY_POD_SPEC_HASH: u64 = 17506812802394981455; pub const TEST_DEPLOYMENT: &str = "the-deployment"; pub const TEST_NAMESPACE: &str = "test-namespace"; @@ -40,6 +43,11 @@ mod test_constants { pub const TEST_DRIVER_ROOT_NAME: &str = "sk-test-driver-12345-root"; pub const TEST_VIRT_NS_PREFIX: &str = "virt-test"; pub const TEST_CTRL_NAMESPACE: &str = "ctrl-ns"; + + lazy_static! { + pub static ref DEPL_GVK: GVK = GVK::new("apps", "v1", "Deployment"); + pub static ref DS_GVK: GVK = GVK::new("apps", "v1", "DaemonSet"); + } } #[cfg(feature = "testutils")] diff --git a/sk-core/src/hooks.rs b/sk-core/src/hooks.rs index d05910a3..d355bb62 100644 --- a/sk-core/src/hooks.rs +++ b/sk-core/src/hooks.rs @@ -9,6 +9,7 @@ use tokio::io::{ BufWriter, }; use tokio::process::Command; +use tracing::*; use crate::prelude::*; @@ -70,7 +71,6 @@ mod test { use tracing_test::*; use super::*; - use crate::k8s::testutils::test_sim; #[rstest] #[traced_test] diff --git a/sk-core/src/k8s/container_state.rs b/sk-core/src/k8s/container_state.rs index 769c4e5d..bae73d32 100644 --- a/sk-core/src/k8s/container_state.rs +++ b/sk-core/src/k8s/container_state.rs @@ -1,3 +1,5 @@ +use tracing::*; + use super::*; use crate::prelude::*; diff --git a/sk-core/src/k8s/gvk.rs b/sk-core/src/k8s/gvk.rs index 12c46863..d89b9119 100644 --- a/sk-core/src/k8s/gvk.rs +++ b/sk-core/src/k8s/gvk.rs @@ -5,6 +5,7 @@ use std::ops::Deref; use kube::api::{ DynamicObject, GroupVersionKind, + TypeMeta, }; use serde::{ de, @@ -49,6 +50,13 @@ impl GVK { bail!("invalid format for api_version: {}", rf.api_version); } } + + pub fn into_type_meta(&self) -> TypeMeta { + TypeMeta { + api_version: self.0.api_version(), + kind: self.0.kind.clone(), + } + } } // Impl Deref lets a GVK act like a GroupVersionKind anywhere one of those is expected @@ -60,18 +68,24 @@ impl Deref for GVK { } } -impl Serialize for GVK { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { +impl fmt::Display for GVK { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut group = Cow::from(&self.0.group); if !group.is_empty() { group.to_mut().push('/'); } - let skey = format!("{group}{}.{}", self.0.version, self.0.kind); - serializer.serialize_str(&skey) + write!(f, "{group}{}.{}", self.0.version, self.0.kind) + } +} + +impl Serialize for GVK { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // reuse the display impl for serializing + serializer.serialize_str(&format!("{self}")) } } diff --git a/sk-core/src/k8s/lease.rs b/sk-core/src/k8s/lease.rs index d6def1cc..a379b96b 100644 --- a/sk-core/src/k8s/lease.rs +++ b/sk-core/src/k8s/lease.rs @@ -6,8 +6,8 @@ use clockabilly::{ }; use k8s_openapi::api::coordination::v1 as coordinationv1; use kube::api::Patch; -use kube::ResourceExt; use serde_json::json; +use tracing::*; use crate::k8s::{ build_object_meta, diff --git a/sk-core/src/k8s/owners.rs b/sk-core/src/k8s/owners.rs index 29480036..5793c3c3 100644 --- a/sk-core/src/k8s/owners.rs +++ b/sk-core/src/k8s/owners.rs @@ -6,10 +6,7 @@ use kube::discovery::{ ApiCapabilities, Scope, }; -use kube::{ - Resource, - ResourceExt, -}; +use kube::Resource; use tracing::*; use super::*; diff --git a/sk-core/src/k8s/pod_lifecycle.rs b/sk-core/src/k8s/pod_lifecycle.rs index ec527c55..4c4157b9 100644 --- a/sk-core/src/k8s/pod_lifecycle.rs +++ b/sk-core/src/k8s/pod_lifecycle.rs @@ -6,7 +6,7 @@ use std::cmp::{ }; use clockabilly::Clockable; -use kube::ResourceExt; +use tracing::*; use super::*; use crate::prelude::*; diff --git a/sk-core/src/k8s/sim.rs b/sk-core/src/k8s/sim.rs index f44c316f..f3ee996e 100644 --- a/sk-core/src/k8s/sim.rs +++ b/sk-core/src/k8s/sim.rs @@ -1,4 +1,3 @@ -use kube::ResourceExt; use sk_api::v1::{ Simulation, SimulationMetricsConfig, diff --git a/sk-core/src/k8s/tests/mod.rs b/sk-core/src/k8s/tests/mod.rs index db930807..f8867b8b 100644 --- a/sk-core/src/k8s/tests/mod.rs +++ b/sk-core/src/k8s/tests/mod.rs @@ -8,5 +8,4 @@ use rstest::*; use tracing_test::traced_test; use super::*; -use crate::k8s::testutils::*; use crate::macros::*; diff --git a/sk-core/src/k8s/tests/owners_test.rs b/sk-core/src/k8s/tests/owners_test.rs index 4b3ca9a2..599e5884 100644 --- a/sk-core/src/k8s/tests/owners_test.rs +++ b/sk-core/src/k8s/tests/owners_test.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use kube::ResourceExt; use serde_json::json; use super::*; diff --git a/sk-core/src/k8s/testutils/objs.rs b/sk-core/src/k8s/testutils/objs.rs index f9fa6e74..5aa60031 100644 --- a/sk-core/src/k8s/testutils/objs.rs +++ b/sk-core/src/k8s/testutils/objs.rs @@ -1,17 +1,24 @@ -use kube::api::{ - DynamicObject, - GroupVersionKind, -}; +use kube::api::DynamicObject; use kube::discovery::ApiResource; use rstest::*; +use serde_json::json; use crate::prelude::*; +// If the fixture objects below change, these hash values will need to be updated +pub const TEST_DEPL_HASH: u64 = 3664028200602729212; +pub const TEST_DS_HASH: u64 = 16161139027557399432; + #[fixture] pub fn test_deployment(#[default(TEST_DEPLOYMENT)] name: &str) -> DynamicObject { - DynamicObject::new( - &name, - &ApiResource::from_gvk(&GroupVersionKind::gvk("core".into(), "v1".into(), "deployment".into())), - ) - .within(TEST_NAMESPACE) + DynamicObject::new(&name, &ApiResource::from_gvk(&DEPL_GVK)) + .within(TEST_NAMESPACE) + .data(json!({"spec": {"replicas": 42}})) +} + +#[fixture] +pub fn test_daemonset(#[default(TEST_DEPLOYMENT)] name: &str) -> DynamicObject { + DynamicObject::new(&name, &ApiResource::from_gvk(&DS_GVK)) + .within(TEST_NAMESPACE) + .data(json!({"spec": {"updateStrategy": {"type": "onDelete"}}})) } diff --git a/sk-core/src/k8s/util.rs b/sk-core/src/k8s/util.rs index 79b7f06a..4d352039 100644 --- a/sk-core/src/k8s/util.rs +++ b/sk-core/src/k8s/util.rs @@ -3,7 +3,6 @@ use std::collections::BTreeMap; use kube::api::{ DynamicObject, Resource, - ResourceExt, TypeMeta, }; use serde_json as json; @@ -39,7 +38,7 @@ where }); } -pub fn build_deletable(ns_name: &str) -> DynamicObject { +pub fn build_deletable(gvk: &GVK, ns_name: &str) -> DynamicObject { let (ns, name) = split_namespaced_name(ns_name); DynamicObject { metadata: metav1::ObjectMeta { @@ -47,7 +46,7 @@ pub fn build_deletable(ns_name: &str) -> DynamicObject { name: Some(name), ..Default::default() }, - types: None, + types: Some(gvk.into_type_meta()), data: json::Value::Null, } } @@ -77,6 +76,11 @@ where build_object_meta_helper(Some(namespace.into()), name, sim_name, owner) } +pub fn format_gvk_name(gvk: &GVK, ns_name: &str) -> String { + format!("{gvk}:{ns_name}") +} + + pub fn sanitize_obj(obj: &mut DynamicObject, api_version: &str, kind: &str) { obj.metadata.creation_timestamp = None; obj.metadata.deletion_timestamp = None; diff --git a/sk-core/src/lib.rs b/sk-core/src/lib.rs index e603cbd5..65f5e2b9 100644 --- a/sk-core/src/lib.rs +++ b/sk-core/src/lib.rs @@ -11,12 +11,18 @@ pub mod time; pub mod prelude { pub use k8s_openapi::api::core::v1 as corev1; pub use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; + pub use kube::{ + CustomResourceExt, + ResourceExt, + }; pub use sk_api::v1::{ Simulation, SimulationRoot, }; - pub use tracing::*; pub use crate::constants::*; pub use crate::errors::EmptyResult; + #[cfg(feature = "testutils")] + pub use crate::k8s::testutils::*; + pub use crate::k8s::KubeResourceExt; } diff --git a/sk-ctrl/src/cert_manager.rs b/sk-ctrl/src/cert_manager.rs index 0f1cb01d..69b2b480 100644 --- a/sk-ctrl/src/cert_manager.rs +++ b/sk-ctrl/src/cert_manager.rs @@ -10,6 +10,7 @@ use serde::{ use sk_core::k8s::build_object_meta; use sk_core::macros::*; use sk_core::prelude::*; +use tracing::*; use crate::context::SimulationContext; diff --git a/sk-ctrl/src/context.rs b/sk-ctrl/src/context.rs index cb46e6bf..25f58c3f 100644 --- a/sk-ctrl/src/context.rs +++ b/sk-ctrl/src/context.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use kube::ResourceExt; use sk_api::v1::Simulation; +use sk_core::prelude::*; use crate::Options; diff --git a/sk-ctrl/src/controller.rs b/sk-ctrl/src/controller.rs index 95f91868..c67f5599 100644 --- a/sk-ctrl/src/controller.rs +++ b/sk-ctrl/src/controller.rs @@ -18,7 +18,6 @@ use kube::api::{ Patch, }; use kube::runtime::controller::Action; -use kube::ResourceExt; use serde_json::json; use sk_api::prometheus::*; use sk_api::v1::{ @@ -34,13 +33,13 @@ use sk_core::k8s::{ is_terminal, metrics_ns, try_claim_lease, - KubeResourceExt, LeaseState, }; use sk_core::prelude::*; use tokio::runtime::Handle; use tokio::task::block_in_place; use tokio::time::Duration; +use tracing::*; use crate::cert_manager; use crate::context::SimulationContext; diff --git a/sk-ctrl/src/main.rs b/sk-ctrl/src/main.rs index 064626d6..331261e7 100644 --- a/sk-ctrl/src/main.rs +++ b/sk-ctrl/src/main.rs @@ -21,6 +21,7 @@ use kube::runtime::{ }; use sk_core::logging; use sk_core::prelude::*; +use tracing::*; use crate::context::SimulationContext; use crate::controller::{ diff --git a/sk-ctrl/src/objects.rs b/sk-ctrl/src/objects.rs index 0d7f9c7c..56daa398 100644 --- a/sk-ctrl/src/objects.rs +++ b/sk-ctrl/src/objects.rs @@ -5,7 +5,6 @@ use anyhow::anyhow; use k8s_openapi::api::admissionregistration::v1 as admissionv1; use k8s_openapi::api::batch::v1 as batchv1; use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; -use kube::ResourceExt; use object_store::ObjectStoreScheme; use reqwest::Url; use sk_api::prometheus::{ diff --git a/sk-driver/src/main.rs b/sk-driver/src/main.rs index c0b78582..9dbdf19a 100644 --- a/sk-driver/src/main.rs +++ b/sk-driver/src/main.rs @@ -18,7 +18,6 @@ use sk_core::external_storage::{ }; use sk_core::k8s::{ ApiSet, - KubeResourceExt, OwnersCache, }; use sk_core::prelude::*; @@ -32,6 +31,7 @@ use sk_store::{ }; use tokio::sync::Mutex; use tokio::time::sleep; +use tracing::*; use crate::mutation::MutationData; use crate::runner::run_trace; diff --git a/sk-driver/src/mutation.rs b/sk-driver/src/mutation.rs index 795cb6a2..5706097b 100644 --- a/sk-driver/src/mutation.rs +++ b/sk-driver/src/mutation.rs @@ -12,7 +12,6 @@ use kube::core::admission::{ AdmissionResponse, AdmissionReview, }; -use kube::ResourceExt; use rocket::serde::json::Json; use serde_json::{ json, @@ -20,11 +19,12 @@ use serde_json::{ }; use sk_core::jsonutils; use sk_core::k8s::{ - KubeResourceExt, PodExt, PodLifecycleData, + GVK, }; use sk_core::prelude::*; +use tracing::*; use crate::DriverContext; @@ -119,15 +119,16 @@ fn add_lifecycle_annotation( ) -> EmptyResult { if let Some(orig_ns) = pod.annotations().get(ORIG_NAMESPACE_ANNOTATION_KEY) { for owner in owners { + let owner_gvk = GVK::from_owner_ref(owner)?; let owner_ns_name = format!("{}/{}", orig_ns, owner.name); - if !ctx.store.has_obj(&owner_ns_name) { + if !ctx.store.has_obj(&owner_gvk, &owner_ns_name) { continue; } let hash = jsonutils::hash(&serde_json::to_value(&pod.stable_spec()?)?); let seq = mut_data.count(hash); - let lifecycle = ctx.store.lookup_pod_lifecycle(&owner_ns_name, hash, seq); + let lifecycle = ctx.store.lookup_pod_lifecycle(&owner_gvk, &owner_ns_name, hash, seq); if let Some(patch) = to_annotation_patch(&lifecycle) { info!("applying lifecycle annotations (hash={hash}, seq={seq})"); if pod.metadata.annotations.is_none() { diff --git a/sk-driver/src/runner.rs b/sk-driver/src/runner.rs index 4e19d861..00d3eac9 100644 --- a/sk-driver/src/runner.rs +++ b/sk-driver/src/runner.rs @@ -24,7 +24,6 @@ use kube::api::{ PatchParams, PropagationPolicy, }; -use kube::ResourceExt; use serde_json::json; use sk_core::errors::*; use sk_core::k8s::{ @@ -38,6 +37,7 @@ use sk_core::k8s::{ use sk_core::macros::*; use sk_core::prelude::*; use tokio::time::sleep; +use tracing::*; use super::*; diff --git a/sk-driver/src/tests/data/trace.json b/sk-driver/src/tests/data/trace.json index 7aab755f..45d5bdf8 100644 --- a/sk-driver/src/tests/data/trace.json +++ b/sk-driver/src/tests/data/trace.json @@ -1,12 +1,13 @@ -[ - { +{ + "version": 2, + "config": { "trackedObjects": { "apps/v1.Deployment": { "podSpecTemplatePath": "/spec/template" } } }, - [ + "events": [ { "ts": 1709241485, "applied_objs": [ @@ -106,8 +107,10 @@ ] } ], - { - "default/nginx-deployment": 2842228259284014139 + "index": { + "apps/v1.Deployment": { + "default/nginx-deployment": 2842228259284014139 + } }, - {} -] + "pod_lifecycles": {} +} diff --git a/sk-driver/src/tests/helpers.rs b/sk-driver/src/tests/helpers.rs index 6875db0e..a03bbbbd 100644 --- a/sk-driver/src/tests/helpers.rs +++ b/sk-driver/src/tests/helpers.rs @@ -1,15 +1,10 @@ -use std::collections::{ - HashMap, - VecDeque, -}; use std::fs::File; use std::io::BufReader; use std::sync::Arc; use sk_store::{ - PodLifecyclesMap, + ExportedTrace, TraceEvent, - TracerConfig, }; use tokio::sync::Mutex; @@ -20,22 +15,17 @@ pub fn build_trace_data(has_start_marker: bool) -> Vec { // then re-encode it into msgpack so we can pass the data to import let trace_data_file = File::open("./src/tests/data/trace.json").unwrap(); let reader = BufReader::new(trace_data_file); - let (config, mut events, index, lifecycle_data): ( - TracerConfig, - VecDeque, - HashMap, - HashMap, - ) = serde_json::from_reader(reader).unwrap(); + let mut exported_trace: ExportedTrace = serde_json::from_reader(reader).unwrap(); if has_start_marker { - events.push_front(TraceEvent { + exported_trace.prepend_event(TraceEvent { ts: 1709241485, applied_objs: vec![], deleted_objs: vec![], }); } - rmp_serde::to_vec_named(&(&config, &events, &index, &lifecycle_data)).unwrap() + rmp_serde::to_vec_named(&exported_trace).unwrap() } pub fn build_driver_context( diff --git a/sk-driver/src/tests/mod.rs b/sk-driver/src/tests/mod.rs index 6f05aa28..9be7fdce 100644 --- a/sk-driver/src/tests/mod.rs +++ b/sk-driver/src/tests/mod.rs @@ -3,7 +3,7 @@ mod mutation_test; mod runner_test; use rstest::*; -use sk_core::k8s::testutils::*; +use sk_core::prelude::*; use tracing_test::traced_test; use super::mutation::*; diff --git a/sk-driver/src/tests/mutation_test.rs b/sk-driver/src/tests/mutation_test.rs index 7351427c..3ab345dd 100644 --- a/sk-driver/src/tests/mutation_test.rs +++ b/sk-driver/src/tests/mutation_test.rs @@ -15,7 +15,6 @@ use kube::core::{ GroupVersionKind, GroupVersionResource, }; -use kube::ResourceExt; use mockall::predicate; use rocket::serde::json::Json; use sk_core::k8s::PodLifecycleData; @@ -131,10 +130,10 @@ async fn test_mutate_pod(mut test_pod: corev1::Pod, mut adm_resp: AdmissionRespo let mut store = MockTraceStore::new(); let _ = store .expect_lookup_pod_lifecycle() - .with(predicate::always(), predicate::eq(EMPTY_POD_SPEC_HASH), predicate::eq(0)) - .returning(|_, _, _| PodLifecycleData::Finished(1, 2)) + .with(predicate::always(), predicate::always(), predicate::eq(EMPTY_POD_SPEC_HASH), predicate::eq(0)) + .returning(|_, _, _, _| PodLifecycleData::Finished(1, 2)) .once(); - let _ = store.expect_has_obj().returning(move |o| o == owner_ns_name); + let _ = store.expect_has_obj().returning(move |_gvk, o| o == owner_ns_name); let ctx = ctx(test_pod.clone(), vec![root.clone(), depl.clone()], store); diff --git a/sk-store/Cargo.toml b/sk-store/Cargo.toml index 9f88fd0f..155244d9 100644 --- a/sk-store/Cargo.toml +++ b/sk-store/Cargo.toml @@ -23,6 +23,7 @@ serde_json = { workspace = true } serde_yaml = { workspace = true } sk-api = { workspace = true } sk-core = { workspace = true } +thiserror = { workspace = true } tracing = { workspace = true } # testutils dependencies diff --git a/sk-store/src/event_list.rs b/sk-store/src/event_list.rs new file mode 100644 index 00000000..ee572e57 --- /dev/null +++ b/sk-store/src/event_list.rs @@ -0,0 +1,86 @@ +use std::collections::VecDeque; +use std::ops::Index; + +use kube::api::DynamicObject; +use sk_core::prelude::*; +use tracing::*; + +use crate::{ + TraceAction, + TraceEvent, +}; + +#[derive(Default)] +pub struct TraceEventList(VecDeque); + +impl TraceEventList { + pub(crate) fn append(&mut self, ts: i64, obj: &DynamicObject, action: TraceAction) { + info!( + "{:?} @ {ts}: {} {}", + action, + obj.types + .clone() + .map(|tm| format!("{}.{}", tm.api_version, tm.kind)) + .unwrap_or("".into()), + obj.namespaced_name(), + ); + + let obj = obj.clone(); + match self.0.back_mut() { + Some(evt) if evt.ts == ts => match action { + TraceAction::ObjectApplied => evt.applied_objs.push(obj), + TraceAction::ObjectDeleted => evt.deleted_objs.push(obj), + }, + _ => { + let evt = match action { + TraceAction::ObjectApplied => TraceEvent { ts, applied_objs: vec![obj], ..Default::default() }, + TraceAction::ObjectDeleted => TraceEvent { ts, deleted_objs: vec![obj], ..Default::default() }, + }; + self.0.push_back(evt); + }, + } + } + + pub(crate) fn back(&self) -> Option<&TraceEvent> { + self.0.back() + } + + pub(crate) fn front(&self) -> Option<&TraceEvent> { + self.0.front() + } + + pub(crate) fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub(crate) fn len(&self) -> usize { + self.0.len() + } +} + +impl Index for TraceEventList { + type Output = TraceEvent; + + fn index(&self, i: usize) -> &Self::Output { + &self.0[i] + } +} + +impl From> for TraceEventList { + fn from(v: VecDeque) -> TraceEventList { + TraceEventList(v) + } +} + +impl From> for TraceEventList { + fn from(v: Vec) -> TraceEventList { + TraceEventList(v.into()) + } +} + +#[cfg(test)] +impl FromIterator for TraceEventList { + fn from_iter>(ii: T) -> Self { + TraceEventList(ii.into_iter().collect()) + } +} diff --git a/sk-store/src/trace_filter.rs b/sk-store/src/filter.rs similarity index 97% rename from sk-store/src/trace_filter.rs rename to sk-store/src/filter.rs index 14421e2b..d2e7b619 100644 --- a/sk-store/src/trace_filter.rs +++ b/sk-store/src/filter.rs @@ -1,6 +1,6 @@ use kube::api::DynamicObject; use sk_api::v1::ExportFilters; -use sk_core::k8s::KubeResourceExt; +use sk_core::prelude::*; use super::TraceEvent; diff --git a/sk-store/src/index.rs b/sk-store/src/index.rs new file mode 100644 index 00000000..8dfd893a --- /dev/null +++ b/sk-store/src/index.rs @@ -0,0 +1,60 @@ +use std::collections::HashMap; +use std::mem::take; + +use serde::{ + Deserialize, + Serialize, +}; +use sk_core::k8s::{ + format_gvk_name, + GVK, +}; + +#[derive(Default, Deserialize, Serialize)] +pub struct TraceIndex { + #[serde(flatten)] + index: HashMap>, +} + +impl TraceIndex { + pub fn new() -> TraceIndex { + TraceIndex::default() + } + + pub fn contains(&self, gvk: &GVK, ns_name: &str) -> bool { + self.index.get(gvk).is_some_and(|gvk_hash| gvk_hash.contains_key(ns_name)) + } + + pub fn flattened_keys(&self) -> Vec { + self.index + .iter() + .flat_map(|(gvk, gvk_hash)| gvk_hash.keys().map(move |ns_name| format_gvk_name(gvk, ns_name))) + .collect() + } + + pub fn get(&self, gvk: &GVK, ns_name: &str) -> Option { + self.index.get(gvk)?.get(ns_name).cloned() + } + + pub fn insert(&mut self, gvk: GVK, ns_name: String, hash: u64) { + self.index.entry(gvk).or_default().insert(ns_name, hash); + } + + pub fn is_empty(&self) -> bool { + self.index.values().all(|gvk_hash| gvk_hash.is_empty()) + } + + pub fn len(&self) -> usize { + self.index.values().map(|gvk_hash| gvk_hash.len()).sum() + } + + pub fn remove(&mut self, gvk: GVK, ns_name: &str) { + self.index.entry(gvk).and_modify(|gvk_hash| { + gvk_hash.remove(ns_name); + }); + } + + pub fn take_gvk_index(&mut self, gvk: &GVK) -> HashMap { + take(self.index.get_mut(gvk).unwrap_or(&mut HashMap::new())) + } +} diff --git a/sk-store/src/lib.rs b/sk-store/src/lib.rs index dbfe0077..b513273c 100644 --- a/sk-store/src/lib.rs +++ b/sk-store/src/lib.rs @@ -1,10 +1,12 @@ mod config; +mod event_list; +mod filter; +mod index; mod pod_owners_map; -mod trace_filter; -mod trace_store; +mod store; pub mod watchers; -use std::collections::VecDeque; +use std::collections::HashMap; use kube::api::DynamicObject; use serde::{ @@ -12,17 +14,21 @@ use serde::{ Serialize, }; use sk_core::errors::*; -use sk_core::k8s::PodLifecycleData; +use sk_core::k8s::{ + PodLifecycleData, + GVK, +}; use sk_core::prelude::*; pub use crate::config::{ TracerConfig, TrackedObjectConfig, }; -pub use crate::trace_store::TraceStore; +pub use crate::event_list::TraceEventList; +pub use crate::index::TraceIndex; +use crate::pod_owners_map::PodLifecyclesMap; +pub use crate::store::TraceStore; -#[cfg(test)] -mod tests; #[derive(Debug)] enum TraceAction { ObjectApplied, @@ -37,15 +43,24 @@ pub struct TraceEvent { } pub struct TraceIterator<'a> { - events: &'a VecDeque, + events: &'a TraceEventList, idx: usize, } +#[derive(Deserialize, Serialize)] +pub struct ExportedTrace { + version: u16, + config: TracerConfig, + events: Vec, + index: TraceIndex, + pod_lifecycles: HashMap<(GVK, String), PodLifecyclesMap>, +} + pub trait TraceStorable { - fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option); - fn delete_obj(&mut self, obj: &DynamicObject, ts: i64); - fn update_all_objs(&mut self, objs: &[DynamicObject], ts: i64); - fn lookup_pod_lifecycle(&self, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData; + fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option) -> EmptyResult; + fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult; + fn update_all_objs_for_gvk(&mut self, gvk: &GVK, objs: &[DynamicObject], ts: i64) -> EmptyResult; + fn lookup_pod_lifecycle(&self, gvk: &GVK, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData; fn record_pod_lifecycle( &mut self, ns_name: &str, @@ -54,12 +69,15 @@ pub trait TraceStorable { lifecycle_data: &PodLifecycleData, ) -> EmptyResult; fn config(&self) -> &TracerConfig; - fn has_obj(&self, ns_name: &str) -> bool; + fn has_obj(&self, gvk: &GVK, ns_name: &str) -> bool; fn start_ts(&self) -> Option; fn end_ts(&self) -> Option; fn iter(&self) -> TraceIterator<'_>; } +#[cfg(test)] +mod tests; + #[cfg(feature = "testutils")] pub mod mock { use mockall::mock; @@ -70,10 +88,10 @@ pub mod mock { pub TraceStore {} impl TraceStorable for TraceStore { - fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option); - fn delete_obj(&mut self, obj: &DynamicObject, ts: i64); - fn update_all_objs(&mut self, objs: &[DynamicObject], ts: i64); - fn lookup_pod_lifecycle(&self, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData; + fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option) -> EmptyResult; + fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult; + fn update_all_objs_for_gvk(&mut self, gvk: &GVK, objs: &[DynamicObject], ts: i64) -> EmptyResult; + fn lookup_pod_lifecycle(&self, owner_gvk: &GVK, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData; fn record_pod_lifecycle( &mut self, ns_name: &str, @@ -82,7 +100,7 @@ pub mod mock { lifecycle_data: &PodLifecycleData, ) -> EmptyResult; fn config(&self) -> &TracerConfig; - fn has_obj(&self, ns_name: &str) -> bool; + fn has_obj(&self, gvk: &GVK, ns_name: &str) -> bool; fn start_ts(&self) -> Option; fn end_ts(&self) -> Option; fn iter<'a>(&'a self) -> TraceIterator<'a>; @@ -91,4 +109,10 @@ pub mod mock { } #[cfg(feature = "testutils")] -pub use crate::pod_owners_map::PodLifecyclesMap; +impl ExportedTrace { + pub fn prepend_event(&mut self, event: TraceEvent) { + let mut tmp = vec![event]; + tmp.append(&mut self.events); + self.events = tmp; + } +} diff --git a/sk-store/src/pod_owners_map.rs b/sk-store/src/pod_owners_map.rs index 7fceb102..b78b7975 100644 --- a/sk-store/src/pod_owners_map.rs +++ b/sk-store/src/pod_owners_map.rs @@ -2,8 +2,14 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use sk_core::errors::*; -use sk_core::k8s::PodLifecycleData; -use sk_core::prelude::*; +use sk_core::k8s::{ + format_gvk_name, + PodLifecycleData, + GVK, +}; +use tracing::*; + +use crate::TraceIndex; // The PodOwnersMap tracks lifecycle data for all pods that are owned by some object that we care // about (e.g., if we are tracking Deployments, the owners map will track the lifecycle data for @@ -46,14 +52,14 @@ pub type PodLifecyclesMap = HashMap>; #[derive(Default)] pub(crate) struct PodOwnersMap { - m: HashMap, - index: HashMap, + m: HashMap<(GVK, String), PodLifecyclesMap>, + index: HashMap, } impl PodOwnersMap { pub(crate) fn new_from_parts( - m: HashMap, - index: HashMap, + m: HashMap<(GVK, String), PodLifecyclesMap>, + index: HashMap, ) -> PodOwnersMap { PodOwnersMap { m, index } } @@ -64,20 +70,23 @@ impl PodOwnersMap { pub(crate) fn lifecycle_data_for<'a>( &'a self, + owner_gvk: &GVK, owner_ns_name: &str, pod_hash: u64, ) -> Option<&'a Vec> { - self.m.get(owner_ns_name)?.get(&pod_hash) + self.m.get(&(owner_gvk.clone(), owner_ns_name.into()))?.get(&pod_hash) } pub(crate) fn store_new_pod_lifecycle( &mut self, - ns_name: &str, + pod_ns_name: &str, + owner_gvk: &GVK, owner_ns_name: &str, hash: u64, lifecycle_data: &PodLifecycleData, ) { - let idx = match self.m.entry(owner_ns_name.into()) { + let owner_gvk_name = format_gvk_name(owner_gvk, owner_ns_name); + let idx = match self.m.entry((owner_gvk.clone(), owner_ns_name.into())) { Entry::Vacant(e) => { e.insert([(hash, vec![lifecycle_data.clone()])].into()); 0 @@ -89,30 +98,33 @@ impl PodOwnersMap { }, }; - info!("inserting pod {ns_name} owned by {owner_ns_name} with hash {hash}: {lifecycle_data:?}"); - self.index.insert(ns_name.into(), (owner_ns_name.into(), hash, idx)); + info!("inserting pod {pod_ns_name} owned by {owner_gvk_name} with hash {hash}: {lifecycle_data:?}"); + self.index + .insert(pod_ns_name.into(), ((owner_gvk.clone(), owner_ns_name.into()), hash, idx)); } - pub(crate) fn update_pod_lifecycle(&mut self, ns_name: &str, lifecycle_data: &PodLifecycleData) -> EmptyResult { - match self.index.get(ns_name) { - None => bail!("pod {} not present in index", ns_name), - Some((owner_ns_name, hash, sequence_idx)) => { + pub(crate) fn update_pod_lifecycle(&mut self, pod_ns_name: &str, lifecycle_data: &PodLifecycleData) -> EmptyResult { + match self.index.get(pod_ns_name) { + None => bail!("pod {} not present in index", pod_ns_name), + Some(((owner_gvk, owner_ns_name), hash, sequence_idx)) => { let owner_entry = self .m - .get_mut(owner_ns_name) - .ok_or(anyhow!("no owner entry for pod {}", ns_name))?; - let pods = - owner_entry - .get_mut(hash) - .ok_or(anyhow!("no entry for pod {} matching hash {}", ns_name, hash))?; + .get_mut(&(owner_gvk.clone(), owner_ns_name.into())) + .ok_or(anyhow!("no owner entry for pod {}", pod_ns_name))?; + let pods = owner_entry.get_mut(hash).ok_or(anyhow!( + "no entry for pod {} matching hash {}", + pod_ns_name, + hash + ))?; let pod_entry = pods.get_mut(*sequence_idx).ok_or(anyhow!( "no sequence index {} for pod {} matching hash {}", sequence_idx, - ns_name, + pod_ns_name, hash ))?; - info!("updating pod {ns_name} owned by {owner_ns_name} with hash {hash}: {lifecycle_data:?}"); + let owner_gvk_name = format_gvk_name(owner_gvk, owner_ns_name); + info!("updating pod {pod_ns_name} owned by {owner_gvk_name} with hash {hash}: {lifecycle_data:?}"); *pod_entry = lifecycle_data.clone(); Ok(()) }, @@ -121,25 +133,29 @@ impl PodOwnersMap { // Given an index of "owning objects", get a list of all the pods between a given start and end // time that belong to one of those owning objects. + #[allow(dead_code)] pub(crate) fn filter( &self, start_ts: i64, end_ts: i64, - index: &HashMap, - ) -> HashMap { + index: &TraceIndex, + ) -> HashMap<(GVK, String), PodLifecyclesMap> { self.m .iter() // The filtering is a little complicated here; if the owning object isn't in the index, // we discard it. Also, if none of the pods belonging to the owning object land // within the given time window, we want to discard it. Otherwise, we want to filter // down the list of pods to the ones that fall between the given time window. - .filter_map(|(owner, lifecycles_map)| { - if !index.contains_key(owner) { + .filter_map(|((owner_gvk, owner_ns_name), lifecycles_map)| { + if !index.contains(owner_gvk, owner_ns_name) { return None; } // Note the question mark here, doing a bunch of heavy lifting - Some((owner.clone(), filter_lifecycles_map(start_ts, end_ts, lifecycles_map)?)) + Some(( + (owner_gvk.clone(), owner_ns_name.clone()), + filter_lifecycles_map(start_ts, end_ts, lifecycles_map)?, + )) }) .collect() } @@ -169,7 +185,7 @@ pub(crate) fn filter_lifecycles_map( #[cfg(test)] impl PodOwnersMap { - pub(crate) fn pod_owner_meta(&self, ns_name: &str) -> Option<&(String, u64, usize)> { - self.index.get(ns_name) + pub(crate) fn pod_owner_meta(&self, pod_ns_name: &str) -> Option<&((GVK, String), u64, usize)> { + self.index.get(pod_ns_name) } } diff --git a/sk-store/src/trace_store.rs b/sk-store/src/store.rs similarity index 71% rename from sk-store/src/trace_store.rs rename to sk-store/src/store.rs index dfa4a5ad..2e46db8d 100644 --- a/sk-store/src/trace_store.rs +++ b/sk-store/src/store.rs @@ -1,8 +1,4 @@ -use std::collections::{ - HashMap, - VecDeque, -}; -use std::mem::take; +use std::collections::HashMap; use anyhow::bail; use clockabilly::{ @@ -10,38 +6,49 @@ use clockabilly::{ UtcClock, }; use kube::api::DynamicObject; -use kube::ResourceExt; use sk_api::v1::ExportFilters; use sk_core::jsonutils; use sk_core::k8s::{ build_deletable, - KubeResourceExt, PodExt, PodLifecycleData, GVK, }; use sk_core::prelude::*; use sk_core::time::duration_to_ts_from; +use thiserror::Error; +use tracing::*; use crate::config::TracerConfig; -use crate::pod_owners_map::{ - PodLifecyclesMap, - PodOwnersMap, -}; -use crate::trace_filter::filter_event; +use crate::filter::filter_event; +use crate::pod_owners_map::PodOwnersMap; use crate::{ + ExportedTrace, TraceAction, TraceEvent, + TraceEventList, + TraceIndex, TraceIterator, TraceStorable, }; +const CURRENT_TRACE_VERSION: u16 = 2; + +#[derive(Debug, Error)] +pub enum TraceStoreError { + #[error( + "could not parse trace file\n\nIf this trace file is older than version 2, \ + it is only parseable by SimKube <= 1.1.1. Please see the release notes for details." + )] + ParseFailed(#[from] rmp_serde::decode::Error), +} + #[derive(Default)] pub struct TraceStore { pub(crate) config: TracerConfig, - pub(crate) events: VecDeque, + pub(crate) events: TraceEventList, pub(crate) pod_owners: PodOwnersMap, - pub(crate) index: HashMap, + pub(crate) index: TraceIndex, } // The TraceStore object is an in-memory store of a cluster trace. It keeps track of all the @@ -63,14 +70,21 @@ impl TraceStore { // will return an index of objects that we collected, and we set the keep_deleted flag = // true so that in the second step, we keep pod data around even if the owning object was // deleted before the trace ends. - let (events, index) = self.collect_events(start_ts, end_ts, filter, true); + let (events, index) = self.collect_events(start_ts, end_ts, filter, true)?; + let num_events = events.len(); // Collect all pod lifecycle data that is a) between the start and end times, and b) is // owned by some object contained in the trace - let lifecycle_data = self.pod_owners.filter(start_ts, end_ts, &index); - let data = rmp_serde::to_vec_named(&(&self.config, &events, &index, &lifecycle_data))?; + let pod_lifecycles = self.pod_owners.filter(start_ts, end_ts, &index); + let data = rmp_serde::to_vec_named(&ExportedTrace { + version: CURRENT_TRACE_VERSION, + config: self.config.clone(), + events, + index, + pod_lifecycles, + })?; - info!("Exported {} events", events.len()); + info!("Exported {} events", num_events); Ok(data) } @@ -78,35 +92,38 @@ impl TraceStore { // the metadata necessary to pick up a trace and continue. Instead, we just re-import enough // information to be able to run a simulation off the trace store. pub fn import(data: Vec, maybe_duration: &Option) -> anyhow::Result { - let (config, mut events, index, lifecycle_data): ( - TracerConfig, - VecDeque, - HashMap, - HashMap, - ) = rmp_serde::from_slice(&data)?; - - let trace_start_ts = events - .front() + let mut exported_trace = rmp_serde::from_slice::(&data).map_err(TraceStoreError::ParseFailed)?; + + if exported_trace.version != CURRENT_TRACE_VERSION { + bail!("unsupported trace version: {}", exported_trace.version); + } + + let trace_start_ts = exported_trace + .events + .first() .unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() }) .ts; - let mut trace_end_ts = events - .back() + let mut trace_end_ts = exported_trace + .events + .last() .unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() }) .ts; if let Some(trace_duration_str) = maybe_duration { trace_end_ts = duration_to_ts_from(trace_start_ts, trace_duration_str)?; - events.retain(|evt| evt.ts < trace_end_ts); + exported_trace.events.retain(|evt| evt.ts < trace_end_ts); // Add an empty event to the very end to make sure the driver doesn't shut down early - events.push_back(TraceEvent { ts: trace_end_ts, ..Default::default() }); + exported_trace + .events + .push(TraceEvent { ts: trace_end_ts, ..Default::default() }); } - info!("Imported {} events between {trace_start_ts} and {trace_end_ts}", events.len()); + info!("Imported {} events between {trace_start_ts} and {trace_end_ts}", exported_trace.events.len()); Ok(TraceStore { - config, - events, - index, - pod_owners: PodOwnersMap::new_from_parts(lifecycle_data, HashMap::new()), + config: exported_trace.config, + events: exported_trace.events.into(), + index: exported_trace.index, + pod_owners: PodOwnersMap::new_from_parts(exported_trace.pod_lifecycles, HashMap::new()), }) } @@ -116,7 +133,7 @@ impl TraceStore { end_ts: i64, filter: &ExportFilters, keep_deleted: bool, - ) -> (Vec, HashMap) { + ) -> anyhow::Result<(Vec, TraceIndex)> { // TODO this is not a huge inefficiency but it is a little annoying to have // an empty event at the start_ts if there aren't any events that happened // before the start_ts @@ -125,7 +142,7 @@ impl TraceStore { // flattened_objects is a list of everything that happened before start_ts but is // still present at start_ts -- i.e., it is our starting configuration. let mut flattened_objects = HashMap::new(); - let mut index = HashMap::new(); + let mut index = TraceIndex::new(); for (evt, _) in self.iter() { // trace should be end-exclusive, so we use >= here: anything that is at the @@ -137,21 +154,23 @@ impl TraceStore { if let Some(new_evt) = filter_event(evt, filter) { for obj in &new_evt.applied_objs { + let gvk = GVK::from_dynamic_obj(obj)?; let ns_name = obj.namespaced_name(); if new_evt.ts < start_ts { flattened_objects.insert(ns_name.clone(), obj.clone()); } let hash = jsonutils::hash_option(obj.data.get("spec")); - index.insert(ns_name, hash); + index.insert(gvk, ns_name, hash); } for obj in &evt.deleted_objs { + let gvk = GVK::from_dynamic_obj(obj)?; let ns_name = obj.namespaced_name(); if new_evt.ts < start_ts { flattened_objects.remove(&ns_name); } if !keep_deleted { - index.remove(&ns_name); + index.remove(gvk, &ns_name); } } @@ -164,34 +183,7 @@ impl TraceStore { // events[0] is the empty event we inserted at the beginning, so we're guaranteed not to // overwrite anything here. events[0].applied_objs = flattened_objects.into_values().collect(); - (events, index) - } - - fn append_event(&mut self, ts: i64, obj: &DynamicObject, action: TraceAction) { - info!( - "{:?} @ {ts}: {} {}", - action, - obj.types - .clone() - .map(|tm| format!("{}.{}", tm.api_version, tm.kind)) - .unwrap_or("".into()), - obj.namespaced_name(), - ); - - let obj = obj.clone(); - match self.events.back_mut() { - Some(evt) if evt.ts == ts => match action { - TraceAction::ObjectApplied => evt.applied_objs.push(obj), - TraceAction::ObjectDeleted => evt.deleted_objs.push(obj), - }, - _ => { - let evt = match action { - TraceAction::ObjectApplied => TraceEvent { ts, applied_objs: vec![obj], ..Default::default() }, - TraceAction::ObjectDeleted => TraceEvent { ts, deleted_objs: vec![obj], ..Default::default() }, - }; - self.events.push_back(evt); - }, - } + Ok((events, index)) } } @@ -201,39 +193,51 @@ impl TraceStorable for TraceStore { // available in it yet. So here we have to pass in a maybe_old_hash which is the value from // the swapped-out data structure. If this is called from an `Applied` event, we just pass in // `None` and look up the value in the current index (if the object didn't exist in the old - // index either, we'll do a second lookup in the new index, but that should be pretty fast).. - fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option) { + // index either, we'll do a second lookup in the new index, but that should be pretty fast). + fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option) -> EmptyResult { + let gvk = GVK::from_dynamic_obj(obj)?; + let ns_name = obj.namespaced_name(); let new_hash = jsonutils::hash_option(obj.data.get("spec")); - let old_hash = maybe_old_hash.or_else(|| self.index.get(&ns_name).cloned()); + let old_hash = maybe_old_hash.or_else(|| self.index.get(&gvk, &ns_name)); if Some(new_hash) != old_hash { - self.append_event(ts, obj, TraceAction::ObjectApplied); + self.events.append(ts, obj, TraceAction::ObjectApplied); } - self.index.insert(ns_name, new_hash); + self.index.insert(gvk, ns_name, new_hash); + Ok(()) } - fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) { + fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult { + let gvk = GVK::from_dynamic_obj(obj)?; let ns_name = obj.namespaced_name(); - self.append_event(ts, obj, TraceAction::ObjectDeleted); - self.index.remove(&ns_name); + self.events.append(ts, obj, TraceAction::ObjectDeleted); + self.index.remove(gvk, &ns_name); + Ok(()) } - fn update_all_objs(&mut self, objs: &[DynamicObject], ts: i64) { - let mut old_index = take(&mut self.index); + fn update_all_objs_for_gvk(&mut self, gvk: &GVK, objs: &[DynamicObject], ts: i64) -> EmptyResult { + let mut old_gvk_index = self.index.take_gvk_index(gvk); for obj in objs { let ns_name = obj.namespaced_name(); - let old_hash = old_index.remove(&ns_name); - self.create_or_update_obj(obj, ts, old_hash); + let old_hash = old_gvk_index.remove(&ns_name); + self.create_or_update_obj(obj, ts, old_hash)?; } - for ns_name in old_index.keys() { - self.delete_obj(&build_deletable(ns_name), ts); + for ns_name in old_gvk_index.keys() { + self.delete_obj(&build_deletable(gvk, ns_name), ts)?; } + Ok(()) } - fn lookup_pod_lifecycle(&self, owner_ns_name: &str, pod_hash: u64, seq: usize) -> PodLifecycleData { - let maybe_lifecycle_data = self.pod_owners.lifecycle_data_for(owner_ns_name, pod_hash); + fn lookup_pod_lifecycle( + &self, + owner_gvk: &GVK, + owner_ns_name: &str, + pod_hash: u64, + seq: usize, + ) -> PodLifecycleData { + let maybe_lifecycle_data = self.pod_owners.lifecycle_data_for(owner_gvk, owner_ns_name, pod_hash); match maybe_lifecycle_data { Some(data) => data[seq % data.len()].clone(), _ => PodLifecycleData::Empty, @@ -259,15 +263,15 @@ impl TraceStorable for TraceStore { self.pod_owners.update_pod_lifecycle(ns_name, lifecycle_data)?; } else if let Some(pod) = &maybe_pod { // Otherwise, we need to check if any of the pod's owners are tracked by us - for rf in &owners { + for owner in &owners { // Pods are guaranteed to have namespaces, so the unwrap is fine - let owner_ns_name = format!("{}/{}", pod.namespace().unwrap(), rf.name); - if !self.index.contains_key(&owner_ns_name) { + let owner_ns_name = format!("{}/{}", pod.namespace().unwrap(), owner.name); + let owner_gvk = GVK::from_owner_ref(owner)?; + if !self.has_obj(&owner_gvk, &owner_ns_name) { continue; } - let gvk = GVK::from_owner_ref(rf)?; - if !self.config.track_lifecycle_for(&gvk) { + if !self.config.track_lifecycle_for(&owner_gvk) { continue; } @@ -282,7 +286,7 @@ impl TraceStorable for TraceStore { // more things out from this and/or allow users to specify what is filtered out. let hash = jsonutils::hash(&serde_json::to_value(&pod.stable_spec()?)?); self.pod_owners - .store_new_pod_lifecycle(ns_name, &owner_ns_name, hash, lifecycle_data); + .store_new_pod_lifecycle(ns_name, &owner_gvk, &owner_ns_name, hash, lifecycle_data); break; } } else { @@ -296,8 +300,8 @@ impl TraceStorable for TraceStore { &self.config } - fn has_obj(&self, ns_name: &str) -> bool { - self.index.contains_key(ns_name) + fn has_obj(&self, gvk: &GVK, ns_name: &str) -> bool { + self.index.contains(gvk, ns_name) } fn start_ts(&self) -> Option { @@ -336,16 +340,16 @@ impl<'a> Iterator for TraceIterator<'a> { #[cfg(test)] mod test { - use std::collections::HashSet; - use super::*; impl TraceStore { - pub fn objs_at(&self, end_ts: i64, filter: &ExportFilters) -> HashSet { + pub fn sorted_objs_at(&self, end_ts: i64, filter: &ExportFilters) -> Vec { // To compute the list of tracked_objects at a particular timestamp, we _don't_ want to // keep the deleted objects around, so we set that parameter to `false`. - let (_, index) = self.collect_events(0, end_ts, filter, false); - index.into_keys().collect() + let (_, index) = self.collect_events(0, end_ts, filter, false).expect("testing code"); + let mut res = index.flattened_keys(); + res.sort(); + res } } } diff --git a/sk-store/src/tests/import_export_test.rs b/sk-store/src/tests/import_export_test.rs index c293bb13..55766d96 100644 --- a/sk-store/src/tests/import_export_test.rs +++ b/sk-store/src/tests/import_export_test.rs @@ -10,9 +10,9 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use kube::api::DynamicObject; use kube::runtime::watcher::Event; use kube::ResourceExt; -use serde_json::json; use sk_api::v1::ExportFilters; use sk_core::macros::*; +use sk_core::prelude::*; use super::*; use crate::watchers::{ @@ -21,23 +21,12 @@ use crate::watchers::{ }; use crate::TraceStore; -fn test_pod(idx: i64) -> DynamicObject { - DynamicObject { - metadata: metav1::ObjectMeta { - namespace: Some(TEST_NAMESPACE.into()), - name: Some(format!("pod{idx}").into()), - ..Default::default() - }, - types: None, - data: json!({"spec": {}}), - } +fn d(idx: i64) -> DynamicObject { + test_deployment(&format!("depl{idx}")) } -fn test_daemonset_pod(idx: i64) -> DynamicObject { - let mut ds_pod = test_pod(idx + 100); - ds_pod.metadata.owner_references = - Some(vec![metav1::OwnerReference { kind: "DaemonSet".into(), ..Default::default() }]); - ds_pod +fn ds(idx: i64) -> DynamicObject { + test_daemonset(&format!("ds{idx}")) } // Set up a test stream to ensure that imports and exports work correctly. @@ -52,27 +41,34 @@ fn test_stream(clock: MockUtcClock) -> KubeObjectStream { let mut c = clock.clone(); async move { match state { - // Initial conditions: we create 10 regular pods and 5 daemonset pods + // Initial conditions: we create 10 deployments and 5 daemonsets + // These are handled by different watchers, so they appear as different events. + // I didn't really set up this test to handle multiple events happening at the + // same time so I'm just having the deployments created first and the daemonsets + // created at the next timestep. (-1, id) => { - let mut pods: Vec<_> = (0..10).map(|i| test_pod(i)).collect(); - let ds_pods: Vec<_> = (0..5).map(|i| test_daemonset_pod(i)).collect(); - pods.extend(ds_pods); - return Some((Ok(Event::Restarted(pods)), (0, id))); + let objs: Vec<_> = (0..10).map(|i| d(i)).collect(); + return Some((Ok((DEPL_GVK.clone(), Event::Restarted(objs))), (0, id))); + }, + (0, id) => { + let objs: Vec<_> = (0..5).map(|i| ds(i)).collect(); + let new_ts = c.advance(1); + return Some((Ok((DS_GVK.clone(), Event::Restarted(objs))), (new_ts, id))); }, - // We recreate one of the pods at time zero just to make sure there's + // We recreate one of the pods at time one just to make sure there's // no weird duplicate behaviours - (0, id) => { - let pod = test_pod(id); + (1, id) => { + let obj = d(id); let new_ts = c.advance(5); - return Some((Ok(Event::Applied(pod)), (new_ts, id))); + return Some((Ok((DEPL_GVK.clone(), Event::Applied(obj))), (new_ts, id))); }, // From times 10..20, we delete one of the regular pods (5..=19, id) => { - let pod = test_pod(id); + let obj = d(id); let new_ts = c.advance(5); - return Some((Ok(Event::Deleted(pod)), (new_ts, id + 1))); + return Some((Ok((DEPL_GVK.clone(), Event::Deleted(obj))), (new_ts, id + 1))); }, // In times 20..25, we test the various filter options: @@ -82,38 +78,38 @@ fn test_stream(clock: MockUtcClock) -> KubeObjectStream { // // In the test below, all of these events should be filtered out (20, id) => { - let pod = test_daemonset_pod(7); + let obj = ds(7); let new_ts = c.advance(1); - return Some((Ok(Event::Applied(pod)), (new_ts, id))); + return Some((Ok((DS_GVK.clone(), Event::Applied(obj))), (new_ts, id))); }, (21, id) => { - let pod = test_daemonset_pod(8); + let obj = ds(8); let new_ts = c.advance(1); - return Some((Ok(Event::Applied(pod)), (new_ts, id))); + return Some((Ok((DS_GVK.clone(), Event::Applied(obj))), (new_ts, id))); }, (22, id) => { - let mut pod = test_pod(30); - pod.metadata.namespace = Some("kube-system".into()); + let mut obj = d(30); + obj.metadata.namespace = Some("kube-system".into()); let new_ts = c.advance(1); - return Some((Ok(Event::Applied(pod)), (new_ts, id))); + return Some((Ok((DEPL_GVK.clone(), Event::Applied(obj))), (new_ts, id))); }, (23, id) => { - let pod = test_daemonset_pod(1); + let obj = ds(1); let new_ts = c.advance(1); - return Some((Ok(Event::Deleted(pod)), (new_ts, id))); + return Some((Ok((DS_GVK.clone(), Event::Deleted(obj))), (new_ts, id))); }, (24, id) => { - let mut pod = test_pod(31); - pod.labels_mut().insert("foo".into(), "bar".into()); + let mut obj = d(31); + obj.labels_mut().insert("foo".into(), "bar".into()); let new_ts = c.advance(1); - return Some((Ok(Event::Applied(pod)), (new_ts, id))); + return Some((Ok((DEPL_GVK.clone(), Event::Applied(obj))), (new_ts, id))); }, // Lastly we delete the remaining "regular" pods (25..=55, id) => { - let pod = test_pod(id); + let obj = d(id); let new_ts = c.advance(5); - return Some((Ok(Event::Deleted(pod)), (new_ts, id + 1))); + return Some((Ok((DEPL_GVK.clone(), Event::Deleted(obj))), (new_ts, id + 1))); }, _ => None, } @@ -155,8 +151,8 @@ async fn itest_export(#[case] duration: Option) { // Confirm that the results match what we expect let new_store = TraceStore::import(data, &duration).unwrap(); let import_end_ts = duration.map(|_| start_ts + 10).unwrap_or(end_ts); - let expected_pods = store.objs_at(import_end_ts, &filter); - let actual_pods = new_store.objs_at(end_ts, &filter); + let expected_pods = store.sorted_objs_at(import_end_ts, &filter); + let actual_pods = new_store.sorted_objs_at(end_ts, &filter); println!("Expected pods: {:?}", expected_pods); println!("Actual pods: {:?}", actual_pods); assert_eq!(actual_pods, expected_pods); diff --git a/sk-store/src/tests/pod_owners_map_test.rs b/sk-store/src/tests/pod_owners_map_test.rs index ca8b16f2..88bdb8a4 100644 --- a/sk-store/src/tests/pod_owners_map_test.rs +++ b/sk-store/src/tests/pod_owners_map_test.rs @@ -17,34 +17,48 @@ fn owners_map() -> PodOwnersMap { #[rstest] fn test_store_new_pod_lifecycle(mut owners_map: PodOwnersMap) { - owners_map.store_new_pod_lifecycle("podA", "deployment1", 1234, &PodLifecycleData::Running(5)); - owners_map.store_new_pod_lifecycle("podB", "deployment1", 1234, &PodLifecycleData::Running(7)); - owners_map.store_new_pod_lifecycle("podC", "deployment1", 5678, &PodLifecycleData::Running(9)); - owners_map.store_new_pod_lifecycle("podD", "deployment2", 5678, &PodLifecycleData::Running(13)); + owners_map.store_new_pod_lifecycle("podA", &DEPL_GVK, "deployment1", 1234, &PodLifecycleData::Running(5)); + owners_map.store_new_pod_lifecycle("podB", &DEPL_GVK, "deployment1", 1234, &PodLifecycleData::Running(7)); + owners_map.store_new_pod_lifecycle("podC", &DEPL_GVK, "deployment1", 5678, &PodLifecycleData::Running(9)); + owners_map.store_new_pod_lifecycle("podD", &DEPL_GVK, "deployment2", 5678, &PodLifecycleData::Running(13)); assert_eq!( - owners_map.lifecycle_data_for("deployment1", 1234).unwrap(), + owners_map.lifecycle_data_for(&DEPL_GVK, "deployment1", 1234).unwrap(), &vec![PodLifecycleData::Running(5), PodLifecycleData::Running(7)] ); - assert_eq!(owners_map.lifecycle_data_for("deployment1", 5678).unwrap(), &vec![PodLifecycleData::Running(9)]); - assert_eq!(owners_map.lifecycle_data_for("deployment2", 5678).unwrap(), &vec![PodLifecycleData::Running(13)]); + assert_eq!( + owners_map.lifecycle_data_for(&DEPL_GVK, "deployment1", 5678).unwrap(), + &vec![PodLifecycleData::Running(9)] + ); + assert_eq!( + owners_map.lifecycle_data_for(&DEPL_GVK, "deployment2", 5678).unwrap(), + &vec![PodLifecycleData::Running(13)] + ); - assert_eq!(*owners_map.pod_owner_meta("podA").unwrap(), ("deployment1".to_string(), 1234, 0)); - assert_eq!(*owners_map.pod_owner_meta("podB").unwrap(), ("deployment1".to_string(), 1234, 1)); - assert_eq!(*owners_map.pod_owner_meta("podC").unwrap(), ("deployment1".to_string(), 5678, 0)); - assert_eq!(*owners_map.pod_owner_meta("podD").unwrap(), ("deployment2".to_string(), 5678, 0)); + assert_eq!(*owners_map.pod_owner_meta("podA").unwrap(), ((DEPL_GVK.clone(), "deployment1".into()), 1234, 0)); + assert_eq!(*owners_map.pod_owner_meta("podB").unwrap(), ((DEPL_GVK.clone(), "deployment1".into()), 1234, 1)); + assert_eq!(*owners_map.pod_owner_meta("podC").unwrap(), ((DEPL_GVK.clone(), "deployment1".into()), 5678, 0)); + assert_eq!(*owners_map.pod_owner_meta("podD").unwrap(), ((DEPL_GVK.clone(), "deployment2".into()), 5678, 0)); } #[rstest] fn test_filter_owners_map() { - let index = HashMap::from([("test/deployment1".into(), 9876), ("test/deployment2".into(), 5432)]); + let mut index = TraceIndex::new(); + index.insert(DEPL_GVK.clone(), "test/deployment1".into(), 9876); + index.insert(DEPL_GVK.clone(), "test/deployment2".into(), 5432); let owners_map = PodOwnersMap::new_from_parts( HashMap::from([ - ("test/deployment1".into(), PodLifecyclesMap::from([(1234, vec![PodLifecycleData::Finished(1, 2)])])), ( - "test/deployment2".into(), + (DEPL_GVK.clone(), "test/deployment1".into()), + PodLifecyclesMap::from([(1234, vec![PodLifecycleData::Finished(1, 2)])]), + ), + ( + (DEPL_GVK.clone(), "test/deployment2".into()), PodLifecyclesMap::from([(5678, vec![PodLifecycleData::Running(6), PodLifecycleData::Running(11)])]), ), - ("test/deployment3".into(), PodLifecyclesMap::from([(9999, vec![PodLifecycleData::Finished(1, 2)])])), + ( + (DEPL_GVK.clone(), "test/deployment3".into()), + PodLifecyclesMap::from([(9999, vec![PodLifecycleData::Finished(1, 2)])]), + ), ]), HashMap::new(), ); @@ -53,7 +67,7 @@ fn test_filter_owners_map() { assert_eq!( res, HashMap::from([( - "test/deployment2".into(), + (DEPL_GVK.clone(), "test/deployment2".into()), PodLifecyclesMap::from([(5678, vec![PodLifecycleData::Running(6)])]), )]) ); diff --git a/sk-store/src/tests/trace_store_test.rs b/sk-store/src/tests/trace_store_test.rs index ad499f77..36eaf43f 100644 --- a/sk-store/src/tests/trace_store_test.rs +++ b/sk-store/src/tests/trace_store_test.rs @@ -2,12 +2,8 @@ use std::collections::HashMap; use assertables::*; use kube::api::DynamicObject; -use serde_json::json; use sk_api::v1::ExportFilters; -use sk_core::k8s::{ - KubeResourceExt, - GVK, -}; +use sk_core::k8s::GVK; use super::*; use crate::pod_owners_map::PodOwnersMap; @@ -16,7 +12,7 @@ use crate::pod_owners_map::PodOwnersMap; fn tracer() -> TraceStore { TraceStore::new(TracerConfig { tracked_objects: HashMap::from([( - GVK::new("apps", "v1", "Deployment"), + DEPL_GVK.clone(), TrackedObjectConfig { track_lifecycle: true, pod_spec_template_path: Some("/spec/template".into()), @@ -25,19 +21,6 @@ fn tracer() -> TraceStore { }) } -#[fixture] -fn test_obj(#[default("obj")] name: &str) -> DynamicObject { - DynamicObject { - metadata: metav1::ObjectMeta { - namespace: Some(TEST_NAMESPACE.into()), - name: Some(name.into()), - ..Default::default() - }, - types: None, - data: json!({"spec": {}}), - } -} - #[fixture] fn owner_ref() -> metav1::OwnerReference { metav1::OwnerReference { @@ -50,14 +33,14 @@ fn owner_ref() -> metav1::OwnerReference { #[rstest] fn test_lookup_pod_lifecycle_no_owner(tracer: TraceStore) { - let res = tracer.lookup_pod_lifecycle(TEST_DEPLOYMENT, EMPTY_POD_SPEC_HASH, 0); + let res = tracer.lookup_pod_lifecycle(&DEPL_GVK, TEST_DEPLOYMENT, EMPTY_POD_SPEC_HASH, 0); assert_eq!(res, PodLifecycleData::Empty); } #[rstest] fn test_lookup_pod_lifecycle_no_hash(mut tracer: TraceStore) { - tracer.index.insert(TEST_DEPLOYMENT.into(), 1234); - let res = tracer.lookup_pod_lifecycle(TEST_DEPLOYMENT, EMPTY_POD_SPEC_HASH, 0); + tracer.index.insert(DEPL_GVK.clone(), TEST_DEPLOYMENT.into(), 1234); + let res = tracer.lookup_pod_lifecycle(&DEPL_GVK, TEST_DEPLOYMENT, EMPTY_POD_SPEC_HASH, 0); assert_eq!(res, PodLifecycleData::Empty); } @@ -66,13 +49,16 @@ fn test_lookup_pod_lifecycle(mut tracer: TraceStore) { let owner_ns_name = format!("{TEST_NAMESPACE}/{TEST_DEPLOYMENT}"); let pod_lifecycle = PodLifecycleData::Finished(1, 2); - tracer.index.insert(owner_ns_name.clone(), 1234); + tracer.index.insert(DEPL_GVK.clone(), owner_ns_name.clone(), 1234); tracer.pod_owners = PodOwnersMap::new_from_parts( - HashMap::from([(owner_ns_name.clone(), HashMap::from([(EMPTY_POD_SPEC_HASH, vec![pod_lifecycle.clone()])]))]), + HashMap::from([( + (DEPL_GVK.clone(), owner_ns_name.clone()), + HashMap::from([(EMPTY_POD_SPEC_HASH, vec![pod_lifecycle.clone()])]), + )]), HashMap::new(), ); - let res = tracer.lookup_pod_lifecycle(&owner_ns_name, EMPTY_POD_SPEC_HASH, 0); + let res = tracer.lookup_pod_lifecycle(&DEPL_GVK, &owner_ns_name, EMPTY_POD_SPEC_HASH, 0); assert_eq!(res, pod_lifecycle); } @@ -82,20 +68,22 @@ fn test_collect_events_filtered(mut tracer: TraceStore) { .iter() .map(|(name, ts)| TraceEvent { ts: *ts, - applied_objs: vec![test_obj(name)], + applied_objs: vec![test_deployment(name)], deleted_objs: vec![], }) .collect(); - let (events, index) = tracer.collect_events( - 1, - 10, - &ExportFilters { - excluded_namespaces: vec![TEST_NAMESPACE.into()], - ..Default::default() - }, - false, - ); + let (events, index) = tracer + .collect_events( + 1, + 10, + &ExportFilters { + excluded_namespaces: vec![TEST_NAMESPACE.into()], + ..Default::default() + }, + false, + ) + .unwrap(); // Always an empty event at the beginning assert_eq!(events, vec![TraceEvent { ts: 1, ..Default::default() }]); @@ -108,7 +96,7 @@ fn test_collect_events(mut tracer: TraceStore) { .iter() .map(|(name, ts)| TraceEvent { ts: *ts, - applied_objs: vec![test_obj(name)], + applied_objs: vec![test_deployment(name)], deleted_objs: vec![], }) .collect(); @@ -117,39 +105,42 @@ fn test_collect_events(mut tracer: TraceStore) { TraceEvent { ts: 4, applied_objs: vec![], - deleted_objs: vec![test_obj("obj2")], + deleted_objs: vec![test_deployment("obj2")], }, ); all_events.push(TraceEvent { ts: 25, applied_objs: vec![], - deleted_objs: vec![test_obj("obj1")], + deleted_objs: vec![test_deployment("obj1")], }); tracer.events = all_events.clone().into(); - let (events, index) = tracer.collect_events(1, 10, &Default::default(), true); + let (events, index) = tracer.collect_events(1, 10, &Default::default(), true).unwrap(); // The first object was created before the collection started so the timestamp changes all_events[0].ts = 1; assert_eq!(events, all_events[0..4]); - let keys: Vec<_> = index.into_keys().collect(); assert_bag_eq!( - keys, - [format!("{TEST_NAMESPACE}/obj1"), format!("{TEST_NAMESPACE}/obj2"), format!("{TEST_NAMESPACE}/obj3")] - .map(|s| s.to_string()) + index.flattened_keys(), + [ + format!("{}:{TEST_NAMESPACE}/obj1", &*DEPL_GVK), + format!("{}:{TEST_NAMESPACE}/obj2", &*DEPL_GVK), + format!("{}:{TEST_NAMESPACE}/obj3", &*DEPL_GVK) + ] + .map(|s| s.to_string()) ); } #[rstest] -fn test_create_or_update_obj(mut tracer: TraceStore, test_obj: DynamicObject) { - let ns_name = test_obj.namespaced_name(); +fn test_create_or_update_obj(mut tracer: TraceStore, test_deployment: DynamicObject) { + let ns_name = test_deployment.namespaced_name(); let ts: i64 = 1234; // test idempotency, if we create the same obj twice nothing should change - tracer.create_or_update_obj(&test_obj, ts, None); - tracer.create_or_update_obj(&test_obj, 2445, None); + tracer.create_or_update_obj(&test_deployment, ts, None).unwrap(); + tracer.create_or_update_obj(&test_deployment, 2445, None).unwrap(); assert_eq!(tracer.index.len(), 1); - assert_eq!(tracer.index[&ns_name], EMPTY_OBJ_HASH); + assert_eq!(tracer.index.get(&DEPL_GVK, &ns_name).unwrap(), TEST_DEPL_HASH); assert_eq!(tracer.events.len(), 1); assert_eq!(tracer.events[0].applied_objs.len(), 1); assert_eq!(tracer.events[0].deleted_objs.len(), 0); @@ -160,16 +151,16 @@ fn test_create_or_update_obj(mut tracer: TraceStore, test_obj: DynamicObject) { fn test_create_or_update_objs(mut tracer: TraceStore) { let obj_names = vec!["obj1", "obj2"]; let ts = vec![1234, 3445]; - let objs: Vec<_> = obj_names.iter().map(|p| test_obj(p)).collect(); + let objs: Vec<_> = obj_names.iter().map(|p| test_deployment(p)).collect(); for i in 0..objs.len() { - tracer.create_or_update_obj(&objs[i], ts[i], None); + tracer.create_or_update_obj(&objs[i], ts[i], None).unwrap(); } assert_eq!(tracer.index.len(), objs.len()); for p in objs.iter() { let ns_name = p.namespaced_name(); - assert_eq!(tracer.index[&ns_name], EMPTY_OBJ_HASH); + assert_eq!(tracer.index.get(&DEPL_GVK, &ns_name).unwrap(), TEST_DEPL_HASH); } assert_eq!(tracer.events.len(), 2); @@ -181,13 +172,13 @@ fn test_create_or_update_objs(mut tracer: TraceStore) { } #[rstest] -fn test_delete_obj(mut tracer: TraceStore, test_obj: DynamicObject) { - let ns_name = test_obj.namespaced_name(); +fn test_delete_obj(mut tracer: TraceStore, test_deployment: DynamicObject) { + let ns_name = test_deployment.namespaced_name(); let ts: i64 = 1234; - tracer.index.insert(ns_name.clone(), EMPTY_OBJ_HASH); + tracer.index.insert(DEPL_GVK.clone(), ns_name.clone(), TEST_DEPL_HASH); - tracer.delete_obj(&test_obj, ts); + tracer.delete_obj(&test_deployment, ts).unwrap(); assert_eq!(tracer.index.len(), 0); assert_eq!(tracer.events.len(), 1); @@ -199,17 +190,17 @@ fn test_delete_obj(mut tracer: TraceStore, test_obj: DynamicObject) { #[rstest] fn test_recreate_index_all_new(mut tracer: TraceStore) { let obj_names = vec!["obj1", "obj2", "obj3"]; - let objs: Vec<_> = obj_names.iter().map(|p| test_obj(p)).collect(); + let objs: Vec<_> = obj_names.iter().map(|p| test_deployment(p)).collect(); let ts: i64 = 1234; // Calling it twice shouldn't change the tracked objs - tracer.update_all_objs(&objs, ts); - tracer.update_all_objs(&objs, 2445); + tracer.update_all_objs_for_gvk(&DEPL_GVK, &objs, ts).unwrap(); + tracer.update_all_objs_for_gvk(&DEPL_GVK, &objs, 2445).unwrap(); assert_eq!(tracer.index.len(), objs.len()); for p in objs.iter() { let ns_name = p.namespaced_name(); - assert_eq!(tracer.index[&ns_name], EMPTY_OBJ_HASH); + assert_eq!(tracer.index.get(&DEPL_GVK, &ns_name).unwrap(), TEST_DEPL_HASH); } assert_eq!(tracer.events.len(), 1); assert_eq!(tracer.events[0].applied_objs.len(), 3); @@ -220,19 +211,18 @@ fn test_recreate_index_all_new(mut tracer: TraceStore) { #[rstest] fn test_recreate_index_with_created_obj(mut tracer: TraceStore) { let obj_names = vec!["obj1", "obj2", "obj3", "obj4"]; - let objs: Vec<_> = obj_names.iter().map(|p| test_obj(p)).collect(); + let objs: Vec<_> = obj_names.iter().map(|p| test_deployment(p)).collect(); let ts = vec![1234, 2445]; - // Calling it twice shouldn't change the tracked objs let mut fewer_objs = objs.clone(); fewer_objs.pop(); - tracer.update_all_objs(&fewer_objs, ts[0]); - tracer.update_all_objs(&objs, ts[1]); + tracer.update_all_objs_for_gvk(&DEPL_GVK, &fewer_objs, ts[0]).unwrap(); + tracer.update_all_objs_for_gvk(&DEPL_GVK, &objs, ts[1]).unwrap(); assert_eq!(tracer.index.len(), objs.len()); for p in fewer_objs.iter() { let ns_name = p.namespaced_name(); - assert_eq!(tracer.index[&ns_name], EMPTY_OBJ_HASH); + assert_eq!(tracer.index.get(&DEPL_GVK, &ns_name).unwrap(), TEST_DEPL_HASH); } assert_eq!(tracer.events.len(), 2); assert_eq!(tracer.events[0].applied_objs.len(), 3); @@ -246,19 +236,18 @@ fn test_recreate_index_with_created_obj(mut tracer: TraceStore) { #[rstest] fn test_recreate_index_with_deleted_obj(mut tracer: TraceStore) { let obj_names = vec!["obj1", "obj2", "obj3"]; - let objs: Vec<_> = obj_names.iter().map(|p| test_obj(p)).collect(); + let objs: Vec<_> = obj_names.iter().map(|p| test_deployment(p)).collect(); let ts = vec![1234, 2445]; - // Calling it twice shouldn't change the tracked objs - tracer.update_all_objs(&objs, ts[0]); + tracer.update_all_objs_for_gvk(&DEPL_GVK, &objs, ts[0]).unwrap(); let mut fewer_objs = objs.clone(); fewer_objs.pop(); - tracer.update_all_objs(&fewer_objs, ts[1]); + tracer.update_all_objs_for_gvk(&DEPL_GVK, &fewer_objs, ts[1]).unwrap(); assert_eq!(tracer.index.len(), fewer_objs.len()); for p in fewer_objs.iter() { let ns_name = p.namespaced_name(); - assert_eq!(tracer.index[&ns_name], EMPTY_OBJ_HASH); + assert_eq!(tracer.index.get(&DEPL_GVK, &ns_name).unwrap(), TEST_DEPL_HASH); } assert_eq!(tracer.events.len(), 2); assert_eq!(tracer.events[0].applied_objs.len(), 3); @@ -269,6 +258,33 @@ fn test_recreate_index_with_deleted_obj(mut tracer: TraceStore) { assert_eq!(tracer.events[1].ts, ts[1]); } + +#[rstest] +fn test_recreate_index_two_obj_types(mut tracer: TraceStore) { + let obj_names_1 = vec!["obj1", "obj2", "obj3"]; + let objs1: Vec<_> = obj_names_1.iter().map(|p| test_deployment(p)).collect(); + let obj_names_2 = vec!["obj4", "obj5", "obj6"]; + let objs2: Vec<_> = obj_names_2.iter().map(|p| test_daemonset(p)).collect(); + let ts = 1234; + + tracer.update_all_objs_for_gvk(&DEPL_GVK, &objs1, ts).unwrap(); + tracer.update_all_objs_for_gvk(&DS_GVK, &objs2, ts).unwrap(); + + assert_eq!(tracer.index.len(), objs1.len() + objs2.len()); + for p in objs1.iter() { + let ns_name = p.namespaced_name(); + assert_eq!(tracer.index.get(&DEPL_GVK, &ns_name).unwrap(), TEST_DEPL_HASH); + } + for p in objs2.iter() { + let ns_name = p.namespaced_name(); + assert_eq!(tracer.index.get(&DS_GVK, &ns_name).unwrap(), TEST_DS_HASH); + } + assert_eq!(tracer.events.len(), 1); + assert_eq!(tracer.events[0].applied_objs.len(), 6); + assert_eq!(tracer.events[0].deleted_objs.len(), 0); + assert_eq!(tracer.events[0].ts, ts); +} + #[rstest] fn test_record_pod_lifecycle_already_stored_no_data(mut tracer: TraceStore, owner_ref: metav1::OwnerReference) { assert!(matches!( @@ -277,6 +293,22 @@ fn test_record_pod_lifecycle_already_stored_no_data(mut tracer: TraceStore, owne )); } +fn mock_pod_owners_map( + pod_ns_name: &str, + pod_spec_hash: u64, + owner_ns_name: &str, + init_lifecycle_data: Vec, + pod_seq_idx: usize, +) -> PodOwnersMap { + PodOwnersMap::new_from_parts( + HashMap::from([( + (DEPL_GVK.clone(), owner_ns_name.into()), + HashMap::from([(EMPTY_POD_SPEC_HASH, init_lifecycle_data)]), + )]), + HashMap::from([(pod_ns_name.into(), ((DEPL_GVK.clone(), owner_ns_name.into()), pod_spec_hash, pod_seq_idx))]), + ) +} + #[rstest] fn test_record_pod_lifecycle_already_stored_no_pod(mut tracer: TraceStore, owner_ref: metav1::OwnerReference) { let new_lifecycle_data = PodLifecycleData::Finished(5, 45); @@ -290,18 +322,18 @@ fn test_record_pod_lifecycle_already_stored_no_pod(mut tracer: TraceStore, owner let mut expected_lifecycle_data = init_lifecycle_data.clone(); expected_lifecycle_data[pod_seq_idx] = new_lifecycle_data.clone(); - let ns_name = format!("{}/{}", TEST_NAMESPACE, "the-pod"); + let pod_ns_name = format!("{}/{}", TEST_NAMESPACE, "the-pod"); let owner_ns_name = format!("{}/{}", TEST_NAMESPACE, owner_ref.name); - tracer.pod_owners = PodOwnersMap::new_from_parts( - HashMap::from([(owner_ns_name.clone(), HashMap::from([(EMPTY_POD_SPEC_HASH, init_lifecycle_data)]))]), - HashMap::from([(ns_name.clone(), (owner_ns_name.clone(), EMPTY_POD_SPEC_HASH, pod_seq_idx))]), - ); + tracer.pod_owners = + mock_pod_owners_map(&pod_ns_name, EMPTY_POD_SPEC_HASH, &owner_ns_name, init_lifecycle_data, pod_seq_idx); tracer - .record_pod_lifecycle(&ns_name, None, vec![owner_ref], &new_lifecycle_data) + .record_pod_lifecycle(&pod_ns_name, None, vec![owner_ref], &new_lifecycle_data) .unwrap(); assert_eq!( - tracer.pod_owners.lifecycle_data_for(&owner_ns_name, EMPTY_POD_SPEC_HASH), + tracer + .pod_owners + .lifecycle_data_for(&DEPL_GVK, &owner_ns_name, EMPTY_POD_SPEC_HASH), Some(&expected_lifecycle_data) ); } @@ -320,7 +352,7 @@ fn test_record_pod_lifecycle_with_new_pod_no_tracked_owner( .unwrap(); let unused_hash = 0; - assert_eq!(tracer.pod_owners.lifecycle_data_for(&owner_ns_name, unused_hash), None); + assert_eq!(tracer.pod_owners.lifecycle_data_for(&DEPL_GVK, &owner_ns_name, unused_hash), None); } #[rstest] @@ -337,12 +369,14 @@ fn test_record_pod_lifecycle_with_new_pod_hash( let new_lifecycle_data = PodLifecycleData::Finished(5, 45); let gvk = GVK::from_owner_ref(&owner_ref).unwrap(); tracer.config.tracked_objects.get_mut(&gvk).unwrap().track_lifecycle = track_lifecycle; - tracer.index.insert(owner_ns_name.clone(), EMPTY_OBJ_HASH); + tracer.index.insert(DEPL_GVK.clone(), owner_ns_name.clone(), TEST_DEPL_HASH); tracer .record_pod_lifecycle(&ns_name, Some(test_pod), vec![owner_ref], &new_lifecycle_data.clone()) .unwrap(); - let lifecycle_data = tracer.pod_owners.lifecycle_data_for(&owner_ns_name, EMPTY_POD_SPEC_HASH); + let lifecycle_data = tracer + .pod_owners + .lifecycle_data_for(&DEPL_GVK, &owner_ns_name, EMPTY_POD_SPEC_HASH); if track_lifecycle { assert_eq!(lifecycle_data, Some(&vec![new_lifecycle_data])); } else { @@ -361,21 +395,26 @@ fn test_record_pod_lifecycle_with_new_pod_existing_hash( let mut expected_lifecycle_data = init_lifecycle_data.clone(); expected_lifecycle_data.push(new_lifecycle_data.clone()); - let ns_name = test_pod.namespaced_name(); + let pod_ns_name = test_pod.namespaced_name(); let owner_ns_name = format!("{}/{}", TEST_NAMESPACE, owner_ref.name); - tracer.index.insert(owner_ns_name.clone(), EMPTY_OBJ_HASH); + tracer.index.insert(DEPL_GVK.clone(), owner_ns_name.clone(), TEST_DEPL_HASH); tracer.pod_owners = PodOwnersMap::new_from_parts( - HashMap::from([(owner_ns_name.clone(), HashMap::from([(EMPTY_POD_SPEC_HASH, init_lifecycle_data)]))]), - HashMap::from([("asdf".into(), (owner_ns_name.clone(), 1234, 0))]), + HashMap::from([( + (DEPL_GVK.clone(), owner_ns_name.clone()), + HashMap::from([(EMPTY_POD_SPEC_HASH, init_lifecycle_data)]), + )]), + HashMap::from([("asdf".into(), ((DEPL_GVK.clone(), owner_ns_name.clone()), 1234, 0))]), ); tracer - .record_pod_lifecycle(&ns_name, Some(test_pod), vec![owner_ref], &new_lifecycle_data) + .record_pod_lifecycle(&pod_ns_name, Some(test_pod), vec![owner_ref], &new_lifecycle_data) .unwrap(); assert_eq!( - tracer.pod_owners.lifecycle_data_for(&owner_ns_name, EMPTY_POD_SPEC_HASH), + tracer + .pod_owners + .lifecycle_data_for(&DEPL_GVK, &owner_ns_name, EMPTY_POD_SPEC_HASH), Some(&expected_lifecycle_data) ); } @@ -390,21 +429,20 @@ fn test_record_pod_lifecycle_with_existing_pod( let init_lifecycle_data = vec![PodLifecycleData::Running(5)]; let expected_lifecycle_data = vec![new_lifecycle_data.clone()]; - let ns_name = test_pod.namespaced_name(); + let pod_ns_name = test_pod.namespaced_name(); let owner_ns_name = format!("{}/{}", TEST_NAMESPACE, owner_ref.name); - tracer.index.insert(owner_ns_name.clone(), EMPTY_OBJ_HASH); - tracer.pod_owners = PodOwnersMap::new_from_parts( - HashMap::from([(owner_ns_name.clone(), HashMap::from([(EMPTY_POD_SPEC_HASH, init_lifecycle_data)]))]), - HashMap::from([(ns_name.clone(), (owner_ns_name.clone(), EMPTY_POD_SPEC_HASH, 0))]), - ); + tracer.index.insert(DEPL_GVK.clone(), owner_ns_name.clone(), TEST_DEPL_HASH); + tracer.pod_owners = mock_pod_owners_map(&pod_ns_name, EMPTY_POD_SPEC_HASH, &owner_ns_name, init_lifecycle_data, 0); tracer - .record_pod_lifecycle(&ns_name, Some(test_pod), vec![owner_ref], &new_lifecycle_data) + .record_pod_lifecycle(&pod_ns_name, Some(test_pod), vec![owner_ref], &new_lifecycle_data) .unwrap(); assert_eq!( - tracer.pod_owners.lifecycle_data_for(&owner_ns_name, EMPTY_POD_SPEC_HASH), + tracer + .pod_owners + .lifecycle_data_for(&DEPL_GVK, &owner_ns_name, EMPTY_POD_SPEC_HASH), Some(&expected_lifecycle_data) ); } diff --git a/sk-store/src/watchers/dyn_obj_watcher.rs b/sk-store/src/watchers/dyn_obj_watcher.rs index 7eb23826..62509355 100644 --- a/sk-store/src/watchers/dyn_obj_watcher.rs +++ b/sk-store/src/watchers/dyn_obj_watcher.rs @@ -35,7 +35,7 @@ use sk_core::k8s::{ ApiSet, GVK, }; -use sk_core::prelude::*; +use tracing::*; use crate::{ TraceStorable, @@ -43,7 +43,7 @@ use crate::{ TrackedObjectConfig, }; -pub type KubeObjectStream = Pin>> + Send>>; +pub type KubeObjectStream = Pin)>> + Send>>; // Watch a (customizable) list of objects. Since we don't know what these object types will be at // runtime, we have to use the DynamicObject API, which gives us everything in JSON format that we @@ -67,7 +67,7 @@ impl DynObjWatcher { ) -> anyhow::Result<(DynObjWatcher, Receiver)> { let mut apis = vec![]; for gvk in tracked_objects.keys() { - let stream = build_stream_for_tracked_obj(apiset, gvk).await?; + let stream = build_stream_for_tracked_obj(apiset, gvk.clone()).await?; apis.push(stream); } @@ -91,7 +91,9 @@ impl DynObjWatcher { let ts = self.clock.now_ts(); match res { - Ok(evt) => self.handle_obj_event(evt, ts), + Ok((ref gvk, evt)) => self.handle_obj_event(gvk, evt, ts).unwrap_or_else(|err| { + skerr!(err, "could not handle event"); + }), Err(err) => { skerr!(err, "watcher received error on stream"); }, @@ -99,14 +101,14 @@ impl DynObjWatcher { } } - fn handle_obj_event(&mut self, evt: Event, ts: i64) { + fn handle_obj_event(&mut self, gvk: &GVK, evt: Event, ts: i64) -> EmptyResult { // We don't expect the trace store to panic, but if it does we should panic here too let mut store = self.store.lock().unwrap(); match evt { - Event::Applied(obj) => store.create_or_update_obj(&obj, ts, None), - Event::Deleted(obj) => store.delete_obj(&obj, ts), + Event::Applied(obj) => store.create_or_update_obj(&obj, ts, None)?, + Event::Deleted(obj) => store.delete_obj(&obj, ts)?, Event::Restarted(objs) => { - store.update_all_objs(&objs, ts); + store.update_all_objs_for_gvk(gvk, &objs, ts)?; // When the watcher first starts up it does a List call, which (internally) gets // converted into a "Restarted" event that contains all of the listed objects. @@ -116,29 +118,27 @@ impl DynObjWatcher { self.is_ready = true; // unlike golang, sending is non-blocking - if let Err(e) = self.ready_tx.send(true) { - error!("failed to update dynobjwatcher ready status: {e:?}") - } + self.ready_tx.send(true)?; } }, }; + Ok(()) } } -async fn build_stream_for_tracked_obj(apiset: &mut ApiSet, gvk: &GVK) -> anyhow::Result { +async fn build_stream_for_tracked_obj(apiset: &mut ApiSet, gvk: GVK) -> anyhow::Result { // TODO if this fails (e.g., because some custom resource isn't present in the cluster) // it will prevent the tracer from starting up let api_version = gvk.api_version().clone(); let kind = gvk.kind.clone(); // The "unnamespaced" api variant can list/watch in all namespaces - let (api, _) = apiset.unnamespaced_api_by_gvk(gvk).await?; + let (api, _) = apiset.unnamespaced_api_by_gvk(&gvk).await?; Ok(watcher(api.clone(), Default::default()) // All these objects need to be cloned because they're moved into the stream here - .modify(move |obj| { - sanitize_obj(obj, &api_version, &kind); - }) + .modify(move |obj| sanitize_obj(obj, &api_version, &kind)) + .map_ok(move |obj| (gvk.clone(), obj)) .map_err(|e| e.into()) .boxed()) } diff --git a/sk-store/src/watchers/pod_watcher.rs b/sk-store/src/watchers/pod_watcher.rs index 4abce357..8cc0b3d5 100644 --- a/sk-store/src/watchers/pod_watcher.rs +++ b/sk-store/src/watchers/pod_watcher.rs @@ -28,11 +28,11 @@ use kube::runtime::watcher::{ use sk_core::errors::*; use sk_core::k8s::{ ApiSet, - KubeResourceExt, OwnersCache, PodLifecycleData, }; use sk_core::prelude::*; +use tracing::*; use crate::{ TraceStorable, diff --git a/sk-store/src/watchers/tests/mod.rs b/sk-store/src/watchers/tests/mod.rs index a27b4fc8..e74c883e 100644 --- a/sk-store/src/watchers/tests/mod.rs +++ b/sk-store/src/watchers/tests/mod.rs @@ -1,7 +1,6 @@ mod pod_watcher_test; use rstest::*; -use sk_core::k8s::testutils::*; use tracing_test::traced_test; use super::*; diff --git a/sk-store/src/watchers/tests/pod_watcher_test.rs b/sk-store/src/watchers/tests/pod_watcher_test.rs index 7c1f1e5c..704f00d5 100644 --- a/sk-store/src/watchers/tests/pod_watcher_test.rs +++ b/sk-store/src/watchers/tests/pod_watcher_test.rs @@ -13,7 +13,6 @@ use kube::runtime::watcher::Event; use mockall::predicate; use sk_core::k8s::{ ApiSet, - KubeResourceExt, OwnersCache, PodLifecycleData, }; diff --git a/sk-tracer/src/main.rs b/sk-tracer/src/main.rs index 6e24e017..f0d8c27a 100644 --- a/sk-tracer/src/main.rs +++ b/sk-tracer/src/main.rs @@ -27,6 +27,7 @@ use sk_store::{ TraceStore, TracerConfig, }; +use tracing::*; use crate::errors::ExportResponseError;