From 8918c66af5bca25603be2ef491c07afff350a4f5 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Tue, 15 Aug 2023 14:17:48 -0400 Subject: [PATCH] fix(kubernetes_logs source): Fix events being empty when log namespacing is enabled (#18244) * save * save * save * added tests * cleanup * cleanup * clippy cleanup * fix formatting inside macro --- Cargo.lock | 1 + lib/vector-core/Cargo.toml | 1 + lib/vector-core/src/stream/expiration_map.rs | 132 +++++++ lib/vector-core/src/stream/mod.rs | 1 + src/sources/kubernetes_logs/mod.rs | 26 +- .../kubernetes_logs/partial_events_merger.rs | 346 +++++++++++++++--- .../kubernetes_logs/transform_utils/mod.rs | 14 + src/transforms/reduce/mod.rs | 82 ++--- 8 files changed, 490 insertions(+), 113 deletions(-) create mode 100644 lib/vector-core/src/stream/expiration_map.rs diff --git a/Cargo.lock b/Cargo.lock index ad21b481f6a2c..0c47d8b7d4d4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9739,6 +9739,7 @@ name = "vector-core" version = "0.1.0" dependencies = [ "async-graphql 5.0.10", + "async-stream", "async-trait", "base64 0.21.2", "bitmask-enum", diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index d96f12f09ec0b..ef12810782d0e 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] async-graphql = { version = "5.0.10", default-features = false, features = ["playground" ], optional = true } +async-stream = { version = "0.3.5", default-features = false } async-trait = { version = "0.1", default-features = false } bitmask-enum = { version = "2.2.2", default-features = false } bytes = { version = "1.4.0", default-features = false, features = ["serde"] } diff --git a/lib/vector-core/src/stream/expiration_map.rs b/lib/vector-core/src/stream/expiration_map.rs new file mode 100644 index 0000000000000..985fb5afb7bb0 --- /dev/null +++ b/lib/vector-core/src/stream/expiration_map.rs @@ -0,0 +1,132 @@ +use async_stream::stream; +use futures::{Stream, StreamExt}; +use std::time::Duration; + +#[derive(Default)] +pub struct Emitter { + values: Vec, +} + +impl Emitter { + pub fn new() -> Self { + Self { values: vec![] } + } + pub fn emit(&mut self, value: T) { + self.values.push(value); + } +} + +/// Similar to `stream.filter_map(..).flatten(..)` but also allows checking for expired events +/// and flushing when the input stream ends. +pub fn map_with_expiration( + initial_state: S, + input: impl Stream + 'static, + expiration_interval: Duration, + // called for each event + mut map_fn: M, + // called periodically to allow expiring internal state + mut expiration_fn: E, + // called once at the end of the input stream + mut flush_fn: F, +) -> impl Stream +where + M: FnMut(&mut S, T, &mut Emitter), + E: FnMut(&mut S, &mut Emitter), + F: FnMut(&mut S, &mut Emitter), +{ + let mut state = initial_state; + let mut flush_stream = tokio::time::interval(expiration_interval); + + Box::pin(stream! { + futures_util::pin_mut!(input); + loop { + let mut emitter = Emitter::::new(); + let done = tokio::select! { + _ = flush_stream.tick() => { + expiration_fn(&mut state, &mut emitter); + false + } + maybe_event = input.next() => { + match maybe_event { + None => { + flush_fn(&mut state, &mut emitter); + true + } + Some(event) => { + map_fn(&mut state, event, &mut emitter); + false + } + } + } + }; + yield futures::stream::iter(emitter.values.into_iter()); + if done { break } + } + + }) + .flatten() +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_simple() { + let input = futures::stream::iter([1, 2, 3]); + + let map_fn = |state: &mut i32, event, emitter: &mut Emitter| { + *state += event; + emitter.emit(*state); + }; + let expiration_fn = |_state: &mut i32, _emitter: &mut Emitter| { + // do nothing + }; + let flush_fn = |state: &mut i32, emitter: &mut Emitter| { + emitter.emit(*state); + }; + let stream: Vec = map_with_expiration( + 0_i32, + input, + Duration::from_secs(100), + map_fn, + expiration_fn, + flush_fn, + ) + .take(4) + .collect() + .await; + + assert_eq!(vec![1, 3, 6, 6], stream); + } + + #[tokio::test] + async fn test_expiration() { + // an input that never ends (to test expiration) + let input = futures::stream::iter([1, 2, 3]).chain(futures::stream::pending()); + + let map_fn = |state: &mut i32, event, emitter: &mut Emitter| { + *state += event; + emitter.emit(*state); + }; + let expiration_fn = |state: &mut i32, emitter: &mut Emitter| { + emitter.emit(*state); + }; + let flush_fn = |_state: &mut i32, _emitter: &mut Emitter| { + // do nothing + }; + let stream: Vec = map_with_expiration( + 0_i32, + input, + Duration::from_secs(1), + map_fn, + expiration_fn, + flush_fn, + ) + .take(4) + .collect() + .await; + + assert_eq!(vec![1, 3, 6, 6], stream); + } +} diff --git a/lib/vector-core/src/stream/mod.rs b/lib/vector-core/src/stream/mod.rs index e81e0b8b1c073..5101109694032 100644 --- a/lib/vector-core/src/stream/mod.rs +++ b/lib/vector-core/src/stream/mod.rs @@ -1,6 +1,7 @@ pub mod batcher; mod concurrent_map; mod driver; +pub mod expiration_map; mod futures_unordered_count; mod partitioned_batcher; diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 2965c4fad4171..817bd09ad6ddf 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -4,7 +4,6 @@ //! running inside the cluster as a DaemonSet. #![deny(missing_docs)] - use std::{path::PathBuf, time::Duration}; use bytes::Bytes; @@ -32,11 +31,10 @@ use vector_common::{ TimeZone, }; use vector_config::configurable_component; -use vector_core::{ - config::LegacyKey, config::LogNamespace, transform::TaskTransform, EstimatedJsonEncodedSizeOf, -}; +use vector_core::{config::LegacyKey, config::LogNamespace, EstimatedJsonEncodedSizeOf}; use vrl::value::{kind::Collection, Kind}; +use crate::sources::kubernetes_logs::partial_events_merger::merge_partial_events; use crate::{ config::{ log_schema, ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig, @@ -72,9 +70,6 @@ use self::node_metadata_annotator::NodeMetadataAnnotator; use self::parser::Parser; use self::pod_metadata_annotator::PodMetadataAnnotator; -/// The key we use for `file` field. -const FILE_KEY: &str = "file"; - /// The `self_node_name` value env var key. const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME"; @@ -781,12 +776,6 @@ impl Source { let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::>(2); - let mut parser = Parser::new(log_namespace); - let partial_events_merger = Box::new(partial_events_merger::build( - auto_partial_merge, - log_namespace, - )); - let checkpoints = checkpointer.view(); let events = file_source_rx.flat_map(futures::stream::iter); let bytes_received = register!(BytesReceived::from(Protocol::HTTP)); @@ -800,6 +789,7 @@ impl Source { ingestion_timestamp_field.as_ref(), log_namespace, ); + let file_info = annotator.annotate(&mut event, &line.filename); emit!(KubernetesLogsEventsReceived { @@ -834,14 +824,22 @@ impl Source { checkpoints.update(line.file_id, line.end_offset); event }); + + let mut parser = Parser::new(log_namespace); let events = events.flat_map(move |event| { let mut buf = OutputBuffer::with_capacity(1); parser.transform(&mut buf, event); futures::stream::iter(buf.into_events()) }); + let (events_count, _) = events.size_hint(); - let mut stream = partial_events_merger.transform(Box::pin(events)); + let mut stream = if auto_partial_merge { + merge_partial_events(events, log_namespace).left_stream() + } else { + events.right_stream() + }; + let event_processing_loop = out.send_event_stream(&mut stream); let mut lifecycle = Lifecycle::new(); diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index e4e1cf28c9bc4..3c94f05c5cd53 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -1,57 +1,303 @@ #![deny(missing_docs)] -use enrichment::TableRegistry; -use indexmap::IndexMap; +use futures::{Stream, StreamExt}; +use std::collections::HashMap; +use std::time::{Duration, Instant}; use vector_core::config::LogNamespace; +use vrl::owned_value_path; -use super::{transform_utils::optional::Optional, FILE_KEY}; -use crate::sources::kubernetes_logs::transform_utils::get_message_field; -use crate::{ - conditions::AnyCondition, - event, - transforms::reduce::{MergeStrategy, Reduce, ReduceConfig}, -}; - -/// Partial event merger. -pub type PartialEventsMerger = Optional; - -pub fn build(enabled: bool, log_namespace: LogNamespace) -> PartialEventsMerger { - let reducer = if enabled { - let key = get_message_field(log_namespace); - - // Merge the message field of each event by concatenating it, with a space delimiter. - let mut merge_strategies = IndexMap::new(); - merge_strategies.insert(key, MergeStrategy::ConcatRaw); - - // Group events by their file. - let group_by = vec![FILE_KEY.to_string()]; - - // As soon as we see an event that has no "partial" field, that's when we've hit the end of the split-up message - // we've been incrementally aggregating.. or the message was never split up to begin with because it was already - // small enough. - let ends_when = Some(AnyCondition::String(format!( - "!exists(.{})", - event::PARTIAL - ))); - - // This will default to expiring yet-to-be-completed reduced events after 30 seconds of inactivity, with an - // interval of 1 second between checking if any reduced events have expired. - let reduce_config = ReduceConfig { - group_by, - merge_strategies, - ends_when, - ..Default::default() - }; - - // TODO: This is _slightly_ gross because the semantics of `Reduce::new` could change and break things in a way - // that isn't super visible in unit tests, if at all visible. - let reduce = Reduce::new(&reduce_config, &TableRegistry::default()) - .expect("should not fail to build `kubernetes_logs`-specific partial event reducer"); - - Some(reduce) - } else { - None +use crate::event; +use crate::event::{Event, LogEvent, Value}; +use crate::sources::kubernetes_logs::transform_utils::get_message_path; + +/// The key we use for `file` field. +const FILE_KEY: &str = "file"; + +const EXPIRATION_TIME: Duration = Duration::from_secs(30); + +use bytes::BytesMut; +use lookup::OwnedTargetPath; +use vector_core::stream::expiration_map::{map_with_expiration, Emitter}; + +struct PartialEventMergeState { + buckets: HashMap, +} + +impl PartialEventMergeState { + fn add_event( + &mut self, + event: LogEvent, + file: &str, + message_path: &OwnedTargetPath, + expiration_time: Duration, + ) { + if let Some(bucket) = self.buckets.get_mut(file) { + // merging with existing event + + if let (Some(Value::Bytes(prev_value)), Some(Value::Bytes(new_value))) = + (bucket.event.get_mut(message_path), event.get(message_path)) + { + let mut bytes_mut = BytesMut::new(); + bytes_mut.extend_from_slice(prev_value); + bytes_mut.extend_from_slice(new_value); + *prev_value = bytes_mut.freeze(); + } + } else { + // new event + self.buckets.insert( + file.to_owned(), + Bucket { + event, + expiration: Instant::now() + expiration_time, + }, + ); + } + } + + fn remove_event(&mut self, file: &str) -> Option { + self.buckets.remove(file).map(|bucket| bucket.event) + } + + fn emit_expired_events(&mut self, emitter: &mut Emitter) { + let now = Instant::now(); + self.buckets.retain(|_key, bucket| { + let expired = now >= bucket.expiration; + if expired { + emitter.emit(bucket.event.clone()); + } + !expired + }); + } + + fn flush_events(&mut self, emitter: &mut Emitter) { + for (_, bucket) in self.buckets.drain() { + emitter.emit(bucket.event); + } + } +} + +struct Bucket { + event: LogEvent, + expiration: Instant, +} + +pub fn merge_partial_events( + stream: impl Stream + 'static, + log_namespace: LogNamespace, +) -> impl Stream { + merge_partial_events_with_custom_expiration(stream, log_namespace, EXPIRATION_TIME) +} + +// internal function that allows customizing the expiration time (for testing) +fn merge_partial_events_with_custom_expiration( + stream: impl Stream + 'static, + log_namespace: LogNamespace, + expiration_time: Duration, +) -> impl Stream { + let partial_flag_path = match log_namespace { + LogNamespace::Vector => { + OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, event::PARTIAL)) + } + LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(event::PARTIAL)), + }; + + let file_path = match log_namespace { + LogNamespace::Vector => { + OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, FILE_KEY)) + } + LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(FILE_KEY)), }; - Optional(reducer) + let state = PartialEventMergeState { + buckets: HashMap::new(), + }; + + let message_path = get_message_path(log_namespace); + + map_with_expiration( + state, + stream.map(|e| e.into_log()), + Duration::from_secs(1), + move |state: &mut PartialEventMergeState, + event: LogEvent, + emitter: &mut Emitter| { + // called for each event + let is_partial = event + .get(&partial_flag_path) + .and_then(|x| x.as_boolean()) + .unwrap_or(false); + + let file = event + .get(&file_path) + .and_then(|x| x.as_str()) + .map(|x| x.to_string()) + .unwrap_or_else(String::new); + + state.add_event(event, &file, &message_path, expiration_time); + if !is_partial { + if let Some(log_event) = state.remove_event(&file) { + emitter.emit(log_event); + } + } + }, + |state: &mut PartialEventMergeState, emitter: &mut Emitter| { + // check for expired events + state.emit_expired_events(emitter) + }, + |state: &mut PartialEventMergeState, emitter: &mut Emitter| { + // the source is ending, flush all pending events + state.flush_events(emitter); + }, + ) + // LogEvent -> Event + .map(|e| e.into()) +} + +#[cfg(test)] +mod test { + use super::*; + use vector_core::event::LogEvent; + use vrl::value; + + #[tokio::test] + async fn merge_single_event_legacy() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + + let input_stream = futures::stream::iter([e_1.into()]); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 1); + assert_eq!( + output[0].as_log().get(".message"), + Some(&value!("test message 1")) + ); + } + + #[tokio::test] + async fn merge_multiple_events_legacy() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message 2"); + e_2.insert("foo2", 1); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 1); + assert_eq!( + output[0].as_log().get(".message"), + Some(&value!("test message 1test message 2")) + ); + } + + #[tokio::test] + async fn multiple_events_flush_legacy() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message 2"); + e_2.insert("foo2", 1); + e_1.insert("_partial", true); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 1); + assert_eq!( + output[0].as_log().get(".message"), + Some(&value!("test message 1test message 2")) + ); + } + + #[tokio::test] + async fn multiple_events_expire_legacy() { + let mut e_1 = LogEvent::from("test message"); + e_1.insert(FILE_KEY, "foo1"); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message"); + e_2.insert(FILE_KEY, "foo2"); + e_1.insert("_partial", true); + + // and input stream that never ends + let input_stream = + futures::stream::iter([e_1.into(), e_2.into()]).chain(futures::stream::pending()); + + let output_stream = merge_partial_events_with_custom_expiration( + input_stream, + LogNamespace::Legacy, + Duration::from_secs(1), + ); + + let output: Vec = output_stream.take(2).collect().await; + assert_eq!(output.len(), 2); + assert_eq!( + output[0].as_log().get(".message"), + Some(&value!("test message")) + ); + assert_eq!( + output[1].as_log().get(".message"), + Some(&value!("test message")) + ); + } + + #[tokio::test] + async fn merge_single_event_vector_namespace() { + let mut e_1 = LogEvent::from(value!("test message 1")); + e_1.insert( + vrl::metadata_path!(super::super::Config::NAME, FILE_KEY), + "foo1", + ); + + let input_stream = futures::stream::iter([e_1.into()]); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 1); + assert_eq!(output[0].as_log().get("."), Some(&value!("test message 1"))); + assert_eq!( + output[0].as_log().get("%kubernetes_logs.file"), + Some(&value!("foo1")) + ); + } + + #[tokio::test] + async fn merge_multiple_events_vector_namespace() { + let mut e_1 = LogEvent::from(value!("test message 1")); + e_1.insert( + vrl::metadata_path!(super::super::Config::NAME, "_partial"), + true, + ); + e_1.insert( + vrl::metadata_path!(super::super::Config::NAME, FILE_KEY), + "foo1", + ); + + let mut e_2 = LogEvent::from(value!("test message 2")); + e_2.insert( + vrl::metadata_path!(super::super::Config::NAME, FILE_KEY), + "foo1", + ); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 1); + assert_eq!( + output[0].as_log().get("."), + Some(&value!("test message 1test message 2")) + ); + assert_eq!( + output[0].as_log().get("%kubernetes_logs.file"), + Some(&value!("foo1")) + ); + } } diff --git a/src/sources/kubernetes_logs/transform_utils/mod.rs b/src/sources/kubernetes_logs/transform_utils/mod.rs index c0dc4a07c3e1f..b40ee81533754 100644 --- a/src/sources/kubernetes_logs/transform_utils/mod.rs +++ b/src/sources/kubernetes_logs/transform_utils/mod.rs @@ -1,4 +1,6 @@ use vector_core::config::{log_schema, LogNamespace}; +use vrl::owned_value_path; +use vrl::path::OwnedTargetPath; pub mod optional; @@ -12,3 +14,15 @@ pub(crate) fn get_message_field(log_namespace: LogNamespace) -> String { .to_string(), } } + +pub(crate) fn get_message_path(log_namespace: LogNamespace) -> OwnedTargetPath { + match log_namespace { + LogNamespace::Vector => OwnedTargetPath::event(owned_value_path!()), + LogNamespace::Legacy => OwnedTargetPath::event( + log_schema() + .message_key() + .expect("global log_schema.message_key to be valid path") + .clone(), + ), + } +} diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 90c9294b0cb63..ba0ad8ffbd0be 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -1,3 +1,8 @@ +use futures::Stream; +use indexmap::IndexMap; +use lookup::lookup_v2::parse_target_path; +use lookup::PathPrefix; +use serde_with::serde_as; use std::collections::BTreeMap; use std::{ collections::{hash_map, HashMap}, @@ -5,13 +10,6 @@ use std::{ pin::Pin, time::{Duration, Instant}, }; - -use async_stream::stream; -use futures::{stream, Stream, StreamExt}; -use indexmap::IndexMap; -use lookup::lookup_v2::parse_target_path; -use lookup::PathPrefix; -use serde_with::serde_as; use vector_config::configurable_component; use crate::config::OutputId; @@ -30,6 +28,7 @@ use crate::config::schema::Definition; use crate::event::Value; pub use merge_strategy::*; use vector_core::config::LogNamespace; +use vector_core::stream::expiration_map::{map_with_expiration, Emitter}; use vrl::value::kind::Collection; use vrl::value::Kind; @@ -351,7 +350,7 @@ impl Reduce { }) } - fn flush_into(&mut self, output: &mut Vec) { + fn flush_into(&mut self, emitter: &mut Emitter) { let mut flush_discriminants = Vec::new(); let now = Instant::now(); for (k, t) in &self.reduce_merge_states { @@ -362,15 +361,15 @@ impl Reduce { for k in &flush_discriminants { if let Some(t) = self.reduce_merge_states.remove(k) { emit!(ReduceStaleEventFlushed); - output.push(Event::from(t.flush())); + emitter.emit(Event::from(t.flush())); } } } - fn flush_all_into(&mut self, output: &mut Vec) { + fn flush_all_into(&mut self, emitter: &mut Emitter) { self.reduce_merge_states .drain() - .for_each(|(_, s)| output.push(Event::from(s.flush()))); + .for_each(|(_, s)| emitter.emit(Event::from(s.flush()))); } fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) { @@ -386,7 +385,7 @@ impl Reduce { } } - fn transform_one(&mut self, output: &mut Vec, event: Event) { + pub(crate) fn transform_one(&mut self, emitter: &mut Emitter, event: Event) { let (starts_here, event) = match &self.starts_when { Some(condition) => condition.check(event), None => (false, event), @@ -413,12 +412,12 @@ impl Reduce { if starts_here { if let Some(state) = self.reduce_merge_states.remove(&discriminant) { - output.push(state.flush().into()); + emitter.emit(state.flush().into()); } self.push_or_new_reduce_state(event, discriminant) } else if ends_here { - output.push(match self.reduce_merge_states.remove(&discriminant) { + emitter.emit(match self.reduce_merge_states.remove(&discriminant) { Some(mut state) => { state.add_event(event, &self.merge_strategies); state.flush().into() @@ -438,45 +437,30 @@ impl Reduce { impl TaskTransform for Reduce { fn transform( self: Box, - mut input_rx: Pin + Send>>, + input_rx: Pin + Send>>, ) -> Pin + Send>> where Self: 'static, { - let mut me = self; - - let poll_period = me.flush_period; - - let mut flush_stream = tokio::time::interval(poll_period); - - Box::pin( - stream! { - loop { - let mut output = Vec::new(); - let done = tokio::select! { - _ = flush_stream.tick() => { - me.flush_into(&mut output); - false - } - maybe_event = input_rx.next() => { - match maybe_event { - None => { - me.flush_all_into(&mut output); - true - } - Some(event) => { - me.transform_one(&mut output, event); - false - } - } - } - }; - yield stream::iter(output.into_iter()); - if done { break } - } - } - .flatten(), - ) + let flush_period = self.flush_period; + + Box::pin(map_with_expiration( + self, + input_rx, + flush_period, + |me: &mut Box, event, emitter: &mut Emitter| { + // called for each event + me.transform_one(emitter, event); + }, + |me: &mut Box, emitter: &mut Emitter| { + // called periodically to check for expired events + me.flush_into(emitter); + }, + |me: &mut Box, emitter: &mut Emitter| { + // called when the input stream ends + me.flush_all_into(emitter); + }, + )) } }