Skip to content

Commit

Permalink
chore(deps): Bump to Rust 1.72.0 (vectordotdev#18389)
Browse files Browse the repository at this point in the history
* chore(deps): Bump to Rust 1.72.0

Signed-off-by: Jesse Szwedko <[email protected]>

* clippy --fix

Signed-off-by: Jesse Szwedko <[email protected]>

* clippy

Signed-off-by: Jesse Szwedko <[email protected]>

* clippy

Signed-off-by: Jesse Szwedko <[email protected]>

* Add panic documentation

Signed-off-by: Jesse Szwedko <[email protected]>

* Spelling

Signed-off-by: Jesse Szwedko <[email protected]>

* Add clippy allow for arc_with_non_send_sync

Signed-off-by: Jesse Szwedko <[email protected]>

* additional clippy allow

Signed-off-by: Jesse Szwedko <[email protected]>

* Correct clippy name

Signed-off-by: Jesse Szwedko <[email protected]>

* cargo fmt

Signed-off-by: Jesse Szwedko <[email protected]>

* Move clippy allow up

Couldn't figure out how to get it working when applying directly to structs with derivative

Signed-off-by: Jesse Szwedko <[email protected]>

* Update lib/vector-buffers/src/topology/acks.rs

Co-authored-by: Doug Smith <[email protected]>

---------

Signed-off-by: Jesse Szwedko <[email protected]>
Co-authored-by: Doug Smith <[email protected]>
  • Loading branch information
jszwedko and dsmith3197 authored Sep 1, 2023
1 parent fd0ccd5 commit 7849d80
Show file tree
Hide file tree
Showing 76 changed files with 289 additions and 231 deletions.
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load('ext://helm_resource', 'helm_resource', 'helm_repo')
docker_build(
ref='timberio/vector',
context='.',
build_args={'RUST_VERSION': '1.71.1'},
build_args={'RUST_VERSION': '1.72.0'},
dockerfile='tilt/Dockerfile'
)

Expand Down
8 changes: 4 additions & 4 deletions lib/prometheus-parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ mod test {

#[test]
fn test_parse_text() {
let input = r##"
let input = r#"
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
Expand Down Expand Up @@ -512,7 +512,7 @@ mod test {
rpc_duration_seconds{quantile="0.99"} 76656
rpc_duration_seconds_sum 1.7560473e+07
rpc_duration_seconds_count 4.588206224e+09
"##;
"#;
let output = parse_text(input).unwrap();
assert_eq!(output.len(), 7);
match_group!(output[0], "http_requests_total", Counter => |metrics: &MetricMap<SimpleMetric>| {
Expand Down Expand Up @@ -651,7 +651,7 @@ mod test {

#[test]
fn test_errors() {
let input = r##"name{registry="default" content_type="html"} 1890"##;
let input = r#"name{registry="default" content_type="html"} 1890"#;
let error = parse_text(input).unwrap_err();
assert!(matches!(
error,
Expand Down Expand Up @@ -681,7 +681,7 @@ mod test {
}
));

let input = r##"name{registry="} 1890"##;
let input = r#"name{registry="} 1890"#;
let error = parse_text(input).unwrap_err();
assert!(matches!(
error,
Expand Down
8 changes: 4 additions & 4 deletions lib/prometheus-parser/src/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ mod test {
assert_eq!(left, tail);
assert_eq!(r, "");

let input = wrap(r#"a\\ asdf"#);
let input = wrap(r"a\\ asdf");
let (left, r) = Metric::parse_escaped_string(&input).unwrap();
assert_eq!(left, tail);
assert_eq!(r, "a\\ asdf");
Expand All @@ -427,7 +427,7 @@ mod test {
assert_eq!(left, tail);
assert_eq!(r, "\"\\\n");

let input = wrap(r#"\\n"#);
let input = wrap(r"\\n");
let (left, r) = Metric::parse_escaped_string(&input).unwrap();
assert_eq!(left, tail);
assert_eq!(r, "\\n");
Expand Down Expand Up @@ -671,7 +671,7 @@ mod test {

#[test]
fn test_parse_line() {
let input = r##"
let input = r#"
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
Expand Down Expand Up @@ -708,7 +708,7 @@ mod test {
rpc_duration_seconds{quantile="0.99"} 76656
rpc_duration_seconds_sum 1.7560473e+07
rpc_duration_seconds_count 2693
"##;
"#;
assert!(input.lines().map(Line::parse).all(|r| r.is_ok()));
}
}
5 changes: 5 additions & 0 deletions lib/vector-buffers/src/test/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ macro_rules! await_timeout {
}};
}

/// Run a future with a temporary directory.
///
/// # Panics
///
/// Will panic if function cannot create a temp directory.
pub async fn with_temp_dir<F, Fut, V>(f: F) -> V
where
F: FnOnce(&Path) -> Fut,
Expand Down
1 change: 1 addition & 0 deletions lib/vector-buffers/src/test/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ macro_rules! message_wrapper {
}

impl EventCount for $id {
#[allow(clippy::redundant_closure_call)]
fn event_count(&self) -> usize {
usize::try_from($event_count(self)).unwrap_or(usize::MAX)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/test/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Variant {
let (sender, receiver) = builder
.build(String::from("benches"), Span::none())
.await
.expect("topology build should not fail");
.unwrap_or_else(|_| unreachable!("topology build should not fail"));

(sender, receiver)
}
Expand Down
16 changes: 13 additions & 3 deletions lib/vector-buffers/src/topology/acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ where
///
/// Acknowledgements should be given by the caller to update the acknowledgement state before
/// trying to get any eligible markers.
///
/// # Panics
///
/// Will panic if adding ack amount overflows.
pub fn add_acknowledgements(&mut self, amount: N) {
self.unclaimed_acks = self
.unclaimed_acks
Expand Down Expand Up @@ -315,6 +319,10 @@ where
///
/// When other pending markers are present, and the given ID is logically behind the next
/// expected marker ID, `Err(MarkerError::MonotonicityViolation)` is returned.
///
/// # Panics
///
/// Panics if pending markers is empty when last pending marker is an unknown size.
pub fn add_marker(
&mut self,
id: N,
Expand All @@ -341,7 +349,7 @@ where
let last_marker = self
.pending_markers
.back_mut()
.expect("pending markers should not be empty");
.unwrap_or_else(|| unreachable!("pending markers should not be empty"));

last_marker.len = PendingMarkerLength::Assumed(len);
}
Expand Down Expand Up @@ -425,13 +433,15 @@ where
let PendingMarker { id, data, .. } = self
.pending_markers
.pop_front()
.expect("pending markers cannot be empty");
.unwrap_or_else(|| unreachable!("pending markers cannot be empty"));

if acks_to_claim > N::min_value() {
self.unclaimed_acks = self
.unclaimed_acks
.checked_sub(&acks_to_claim)
.expect("should not be able to claim more acks than are unclaimed");
.unwrap_or_else(|| {
unreachable!("should not be able to claim more acks than are unclaimed")
});
}

self.acked_marker_id = id.wrapping_add(&len.len());
Expand Down
6 changes: 4 additions & 2 deletions lib/vector-buffers/src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ impl<T: Bufferable> TopologyBuilder<T> {
/// This is a convenience method for `vector` as it is used for inter-transform channels, and we
/// can simplifying needing to require callers to do all the boilerplate to create the builder,
/// create the stage, installing buffer usage metrics that aren't required, and so on.
///
#[allow(clippy::print_stderr)]
pub async fn standalone_memory(
max_events: NonZeroUsize,
when_full: WhenFull,
Expand All @@ -193,7 +195,7 @@ impl<T: Bufferable> TopologyBuilder<T> {
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.expect("should not fail to directly create a memory buffer");
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
Expand Down Expand Up @@ -228,7 +230,7 @@ impl<T: Bufferable> TopologyBuilder<T> {
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.expect("should not fail to directly create a memory buffer");
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
Expand Down
13 changes: 9 additions & 4 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ impl<T: Bufferable> LimitedSender<T> {
.limiter
.clone()
.acquire_many_owned(permits_required)
.await else {
return Err(SendError(item))
.await
else {
return Err(SendError(item));
};

self.inner
.data
.push((permits, item))
.expect("acquired permits but channel reported being full");
.unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full"));
self.inner.read_waker.notify_one();

trace!("Sent item.");
Expand All @@ -130,6 +131,10 @@ impl<T: Bufferable> LimitedSender<T> {
/// `Err(TrySendError::Disconnected)` be returned with the given `item`. If the channel has
/// insufficient capacity for the item, then `Err(TrySendError::InsufficientCapacity)` will be
/// returned with the given `item`.
///
/// # Panics
///
/// Will panic if adding ack amount overflows.
pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
// Calculate how many permits we need, and try to acquire them all without waiting.
let permits_required = self.get_required_permits_for_item(&item);
Expand All @@ -151,7 +156,7 @@ impl<T: Bufferable> LimitedSender<T> {
self.inner
.data
.push((permits, item))
.expect("acquired permits but channel reported being full");
.unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full"));
self.inner.read_waker.notify_one();

trace!("Attempt to send item succeeded.");
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/topology/channel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl<T: Bufferable> BufferSender<T> {
sent_to_base = false;
self.overflow
.as_mut()
.expect("overflow must exist")
.unwrap_or_else(|| unreachable!("overflow must exist"))
.send(item)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ where
return Err(BuildError::InvalidParameter {
param_name: "max_record_size",
reason: "must be less than 2^64 bytes".to_string(),
})
});
};

if max_record_size_converted > max_data_file_size {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,6 @@ where
)
.field("writer_done", &self.writer_done.load(Ordering::Acquire))
.field("last_flush", &self.last_flush.load())
.finish()
.finish_non_exhaustive()
}
}
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ where
// If there's an error decoding the item, just fall back to the slow path,
// because this file might actually be where we left off, so we don't want
// to incorrectly skip ahead or anything.
break
break;
};

// We have to remove 1 from the event count here because otherwise the ID would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl fmt::Debug for Record {
.field("event_count", &self.event_count)
.field("encoded_len", &self.encoded_len())
.field("archived_len", &self.archived_len())
.finish()
.finish_non_exhaustive()
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/vector-common/src/event_test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn contains_name_once(pattern: &str) -> Result<(), String> {
EVENTS_RECORDED.with(|events| {
let mut n_events = 0;
let mut names = String::new();
for event in events.borrow().iter() {
for event in &*events.borrow() {
if event.ends_with(pattern) {
if n_events > 0 {
names.push_str(", ");
Expand Down Expand Up @@ -44,7 +44,7 @@ pub fn clear_recorded_events() {
#[allow(clippy::print_stdout)]
pub fn debug_print_events() {
EVENTS_RECORDED.with(|events| {
for event in events.borrow().iter() {
for event in &*events.borrow() {
println!("{event}");
}
});
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-common/src/finalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl EventFinalizers {

/// Merges the event finalizers from `other` into the collection.
pub fn merge(&mut self, other: Self) {
self.0.extend(other.0.into_iter());
self.0.extend(other.0);
}

/// Updates the status of all event finalizers in the collection.
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-config-common/src/schema/json_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ impl<T: Clone> Extend<T> for SingleOrVec<T> {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
match self {
Self::Single(item) => {
*self = Self::Vec(iter::once(*item.clone()).chain(iter.into_iter()).collect());
*self = Self::Vec(iter::once(*item.clone()).chain(iter).collect());
}
Self::Vec(items) => items.extend(iter),
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-config-macros/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde_derive_internals::{ast as serde_ast, attr as serde_attr};

mod container;
mod field;
pub(self) mod util;
mod util;
mod variant;

pub use container::Container;
Expand Down
6 changes: 1 addition & 5 deletions lib/vector-config/src/schema/visitors/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,7 @@ fn merge_schema_instance_type(
source: Option<&SingleOrVec<InstanceType>>,
) {
merge_optional_with(destination, source, |existing, new| {
let mut deduped = existing
.into_iter()
.chain(new.into_iter())
.cloned()
.collect::<Vec<_>>();
let mut deduped = existing.into_iter().chain(new).cloned().collect::<Vec<_>>();
deduped.dedup();

*existing = deduped.into();
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-config/src/schema/visitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod scoped_visit;
mod unevaluated;

#[cfg(test)]
pub(self) mod test;
mod test;

pub use self::human_name::GenerateHumanFriendlyNameVisitor;
pub use self::inline_single::InlineSingleUseReferencesVisitor;
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ impl LogSchema {
///
/// This should only be used where the result will either be cached,
/// or performance isn't critical, since this requires memory allocation.
///
/// # Panics
///
/// Panics if the path in `self.message_key` is invalid.
pub fn owned_message_path(&self) -> OwnedTargetPath {
self.message_key
.path
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/discriminant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ fn hash_f64<H: Hasher>(hasher: &mut H, value: f64) {
}

fn hash_array<H: Hasher>(hasher: &mut H, array: &[Value]) {
for val in array.iter() {
for val in array {
hash_value(hasher, val);
}
}

fn hash_map<H: Hasher>(hasher: &mut H, map: &BTreeMap<String, Value>) {
for (key, val) in map.iter() {
for (key, val) in map {
hasher.write(key.as_bytes());
hash_value(hasher, val);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl LogEvent {
for field in fields {
let field_path = event_path!(field.as_ref());
let Some(incoming_val) = incoming.remove(field_path) else {
continue
continue;
};
match self.get_mut(field_path) {
None => {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/lua/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<'a> FromLua<'a> for Event {
from: value.type_name(),
to: "Event",
message: Some("Event should be a Lua table".to_string()),
})
});
};
match (table.raw_get("log")?, table.raw_get("metric")?) {
(LuaValue::Table(log), LuaValue::Nil) => {
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/event/lua/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub fn table_is_timestamp(t: &LuaTable<'_>) -> LuaResult<bool> {
/// # Errors
///
/// This function will fail if the table is malformed.
///
/// # Panics
///
/// Panics if the resulting timestamp is invalid.
#[allow(clippy::needless_pass_by_value)] // constrained by mlua types
pub fn table_to_timestamp(t: LuaTable<'_>) -> LuaResult<DateTime<Utc>> {
let year = t.raw_get("year")?;
Expand Down
Loading

0 comments on commit 7849d80

Please sign in to comment.