Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): multicast udp socket support #22099

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog.d/5732_multicast_udp_socket_sources.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `socket` source with `udp` mode now supports joining multicast groups via the `multicast_groups` option
of that source. This allows the source to receive multicast packets from the specified multicast groups.

Note that in order to work properly, the `socket` address must be set to `0.0.0.0` and not
jorgehermo9 marked this conversation as resolved.
Show resolved Hide resolved
to `127.0.0.1` (localhost) or any other specific IP address. If other IP address is used, the host's interface
will filter out the multicast packets as the packet target IP (multicast) would not match the host's interface IP.

authors: jorgehermo9
27 changes: 26 additions & 1 deletion lib/vector-config/src/stdlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
hash::Hash,
net::SocketAddr,
net::{Ipv4Addr, SocketAddr},
num::{
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64,
NonZeroU8, NonZeroUsize,
Expand Down Expand Up @@ -402,6 +402,31 @@ impl ToValue for SocketAddr {
}
}

impl Configurable for Ipv4Addr {
jorgehermo9 marked this conversation as resolved.
Show resolved Hide resolved
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::Ipv4Addr")
}

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("An IPv4 address.");
metadata
}

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`,
jorgehermo9 marked this conversation as resolved.
Show resolved Hide resolved
// but we eventually should have validation since the format for the possible permutations
// is well-known and can be easily codified.
Ok(generate_string_schema())
}
}

impl ToValue for Ipv4Addr {
fn to_value(&self) -> Value {
Value::String(self.to_string())
}
}

impl Configurable for PathBuf {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::PathBuf")
Expand Down
40 changes: 40 additions & 0 deletions src/internal_events/socket.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::Ipv4Addr;

use metrics::{counter, histogram};
use vector_lib::internal_event::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL};
use vector_lib::{
Expand Down Expand Up @@ -135,6 +137,44 @@ impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
}
}

#[derive(Debug)]
pub struct SocketMulticastGroupJoinError<E> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have doubts about this naming. SocketMulticastJoinError, UdpSocketMulticastJoinError, UdpSocketMulticastGroupJoinError....?

This error will be created only from UDP mode, as the multicast concept only exists UDP.

pub error: E,
pub group_addr: Ipv4Addr,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the group_addr and interface because they're the arguments required for join_multicast_v4

.join_multicast_v4(group_addr, interface)

pub interface: Ipv4Addr,
}

impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired from the SocketBindError implementation

fn emit(self) {
// Multicast groups are only used in UDP mode
let mode = SocketMode::Udp.as_str();
pront marked this conversation as resolved.
Show resolved Hide resolved
let group_addr = self.group_addr.to_string();
let interface = self.interface.to_string();

error!(
message = "Error joining multicast group.",
error = %self.error,
error_code = "socket_multicast_group_join",
error_type = error_type::IO_FAILED,
stage = error_stage::RECEIVING,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error_stage receiving? I found that stage weird in this case. I think INITIALIZING or so would fit more in this case... but I see that error_stage::RECEIVING is used also for socket binding error and etc.. Just noticed this and wanted to raise it, not for addressing in this PR though

%mode,
%group_addr,
%interface,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
"error_code" => "socket_multicast_group_join",
"error_type" => error_type::IO_FAILED,
"stage" => error_stage::RECEIVING,
"mode" => mode,
"group_addr" => group_addr,
"interface" => interface,
Comment on lines +171 to +172
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to add those two variables (group_addr and interface) to counter tags? Or should only be included in the error! log?

)
.increment(1);
}
}

#[derive(Debug)]
pub struct SocketReceiveError<E> {
pub mode: SocketMode,
Expand Down
145 changes: 135 additions & 10 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ mod test {
use approx::assert_relative_eq;
use std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -373,8 +373,11 @@ mod test {
sources::util::net::SocketListenAddr,
test_util::{
collect_n, collect_n_limited,
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
components::{
assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
SOCKET_PUSH_SOURCE_TAGS,
},
next_addr, next_addr_any, random_string, send_lines, send_lines_tls, wait_for_tcp,
},
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
SourceSender,
Expand Down Expand Up @@ -898,21 +901,36 @@ mod test {
}

//////// UDP TESTS ////////
fn send_lines_udp(addr: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
send_packets_udp(addr, lines.into_iter().map(|line| line.into()))
fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
send_lines_udp_from(next_addr(), to, lines)
}

fn send_lines_udp_from(
jorgehermo9 marked this conversation as resolved.
Show resolved Hide resolved
from: SocketAddr,
to: SocketAddr,
lines: impl IntoIterator<Item = String>,
) -> SocketAddr {
send_packets_udp_from(from, to, lines.into_iter().map(|line| line.into()))
}

fn send_packets_udp(addr: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
let bind = next_addr();
let socket = UdpSocket::bind(bind)
fn send_packets_udp(to: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
send_packets_udp_from(next_addr(), to, packets)
}

fn send_packets_udp_from(
from: SocketAddr,
to: SocketAddr,
packets: impl IntoIterator<Item = Bytes>,
) -> SocketAddr {
let socket = UdpSocket::bind(from)
.map_err(|error| panic!("{:}", error))
.ok()
.unwrap();

for packet in packets {
assert_eq!(
socket
.send_to(&packet, addr)
.send_to(&packet, to)
.map_err(|error| panic!("{:}", error))
.ok()
.unwrap(),
Expand All @@ -926,7 +944,7 @@ mod test {
thread::sleep(Duration::from_millis(10));

// Done
bind
from
}

async fn init_udp_with_shutdown(
Expand Down Expand Up @@ -1303,6 +1321,113 @@ mod test {
.await;
}

#[tokio::test]
async fn multicast_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
// The socket address must be `IPADDR_ANY` (0.0.0.0) in order to receive multicast packets
let socket_address = next_addr_any();
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
let multicast_socket_address =
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![multicast_ip_address];
init_udp_with_config(tx, config).await;

// We must send packets to the same interface the `socket_address` is bound to
// in order to receive the multicast packets the `from` socket sends.
// To do so, we use the `IPADDR_ANY` address
let from = next_addr_any();
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);

let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);
})
.await;
}

#[tokio::test]
async fn multiple_multicast_addresses_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let multicast_ip_addresses = (2..12)
.map(|i| format!("224.0.0.{i}").parse().unwrap())
.collect::<Vec<Ipv4Addr>>();
let multicast_ip_socket_addresses = multicast_ip_addresses
.iter()
.map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
.collect::<Vec<SocketAddr>>();
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = multicast_ip_addresses;
init_udp_with_config(tx, config).await;

let from = next_addr_any();
for multicast_ip_socket_address in multicast_ip_socket_addresses {
send_lines_udp_from(
from,
multicast_ip_socket_address,
[multicast_ip_socket_address.to_string()],
);

let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
multicast_ip_socket_address.to_string().into()
);
}
})
.await;
}

#[tokio::test]
async fn multicast_and_unicast_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
let multicast_socket_address =
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![multicast_ip_address];
init_udp_with_config(tx, config).await;

let from = next_addr_any();
// Send packet to multicast address
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);

// Send packet to unicast address
send_lines_udp_from(from, socket_address, ["test".to_string()]);
let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);
})
.await;
}

#[tokio::test]
async fn udp_invalid_multicast_group() {
assert_source_error(&COMPONENT_ERROR_TAGS, async {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored this a little in dc09eb5. Didn't know that assert_source_error existed. Is it ok to check that only the tag error_type is updated? (COMPONENT_ERROR_TAGS only contains that tag)

we are using a bit more tags in that error
image

let (tx, _rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![invalid_multicast_ip_address];
init_udp_with_config(tx, config).await;
})
.await;
}

////////////// UNIX TEST LIBS //////////////

#[cfg(unix)]
Expand Down
50 changes: 49 additions & 1 deletion src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::{Ipv4Addr, SocketAddr};

use super::default_host_key;
use bytes::BytesMut;
use chrono::Utc;
Expand All @@ -20,7 +22,8 @@ use crate::{
codecs::Decoder,
event::Event,
internal_events::{
SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError,
SocketReceiveError, StreamClosedError,
},
net,
serde::default_decoding,
Expand All @@ -41,6 +44,21 @@ pub struct UdpConfig {
#[configurable(derived)]
address: SocketListenAddr,

/// List of IPv4 multicast groups to join on socket's binding process.
jorgehermo9 marked this conversation as resolved.
Show resolved Hide resolved
///
/// In order to read multicast packets, this source's listening address should be set to `0.0.0.0`.
/// If any other address is used (such as `127.0.0.1` or an specific interface address), the
/// listening interface will filter out all multicast packets received,
/// as their target IP would be the one of the multicast group
/// and it will not match the socket's bound IP.
///
/// Note that this setting will only work if the source's address
/// is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported
/// with multicast groups).
#[serde(default)]
#[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
pub(super) multicast_groups: Vec<Ipv4Addr>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
Expand Down Expand Up @@ -118,6 +136,7 @@ impl UdpConfig {
pub fn from_address(address: SocketListenAddr) -> Self {
Self {
address,
multicast_groups: Vec::new(),
max_length: default_max_length(),
host_key: None,
port_key: default_port_key(),
Expand Down Expand Up @@ -152,6 +171,35 @@ pub(super) fn udp(
})
})?;

if !config.multicast_groups.is_empty() {
socket.set_multicast_loop_v4(true).unwrap();
let listen_addr = match config.address() {
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
// We could support Ipv6 multicast with the
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method
// and specifying the interface index as `0`, in order to bind all interfaces.
unimplemented!("IPv6 multicast is not supported")
}
SocketListenAddr::SystemdFd(_) => {
jorgehermo9 marked this conversation as resolved.
Show resolved Hide resolved
unimplemented!("Multicast for systemd fd sockets is not supported")
}
};
for group_addr in config.multicast_groups {
let interface = *listen_addr.ip();
socket
.join_multicast_v4(group_addr, interface)
.map_err(|error| {
emit!(SocketMulticastGroupJoinError {
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested https://github.com/vectordotdev/vector/pull/22099/files#r1927304213

This is logged as
image

does it look ok?

error,
group_addr,
interface,
})
})?;
info!(message = "Joined multicast group.", group = %group_addr);
pront marked this conversation as resolved.
Show resolved Hide resolved
}
}

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
Expand Down
4 changes: 4 additions & 0 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ pub fn next_addr() -> SocketAddr {
next_addr_for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST))
}

pub fn next_addr_any() -> SocketAddr {
pront marked this conversation as resolved.
Show resolved Hide resolved
next_addr_for_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
}

pub fn next_addr_v6() -> SocketAddr {
next_addr_for_ip(IpAddr::V6(Ipv6Addr::LOCALHOST))
}
Expand Down
Loading