diff --git a/qlog/src/events/mod.rs b/qlog/src/events/mod.rs index dbd35183ef..227cfe80fd 100644 --- a/qlog/src/events/mod.rs +++ b/qlog/src/events/mod.rs @@ -27,6 +27,7 @@ use crate::Bytes; use crate::Token; use h3::*; +use moqt::MOQTEventType; use qpack::*; use quic::*; @@ -56,6 +57,8 @@ pub enum EventType { GenericEventType(GenericEventType), + MOQTEventType(MOQTEventType), + #[default] None, } @@ -302,6 +305,8 @@ impl From for EventImportance { EventType::QpackEventType(QpackEventType::InstructionParsed) => EventImportance::Base, + EventType::MOQTEventType(_) => EventImportance::Core, + _ => unimplemented!(), } } @@ -396,12 +401,10 @@ impl From<&EventData> for EventType { EventData::MtuUpdated { .. } => EventType::ConnectivityEventType( ConnectivityEventType::MtuUpdated, ), - EventData::KeyUpdated { .. } => EventType::SecurityEventType(SecurityEventType::KeyUpdated), EventData::KeyDiscarded { .. } => EventType::SecurityEventType(SecurityEventType::KeyDiscarded), - EventData::VersionInformation { .. } => EventType::TransportEventType( TransportEventType::VersionInformation, @@ -439,7 +442,6 @@ impl From<&EventData> for EventType { EventType::TransportEventType(TransportEventType::FramesProcessed), EventData::DataMoved { .. } => EventType::TransportEventType(TransportEventType::DataMoved), - EventData::RecoveryParametersSet { .. } => EventType::RecoveryEventType(RecoveryEventType::ParametersSet), EventData::MetricsUpdated { .. } => @@ -456,7 +458,6 @@ impl From<&EventData> for EventType { EventType::RecoveryEventType( RecoveryEventType::MarkedForRetransmit, ), - EventData::H3ParametersSet { .. } => EventType::Http3EventType(Http3EventType::ParametersSet), EventData::H3ParametersRestored { .. } => @@ -469,7 +470,6 @@ impl From<&EventData> for EventType { EventType::Http3EventType(Http3EventType::FrameParsed), EventData::H3PushResolved { .. } => EventType::Http3EventType(Http3EventType::PushResolved), - EventData::QpackStateUpdated { .. } => EventType::QpackEventType(QpackEventType::StateUpdated), EventData::QpackStreamStateUpdated { .. } => @@ -484,7 +484,6 @@ impl From<&EventData> for EventType { EventType::QpackEventType(QpackEventType::InstructionCreated), EventData::QpackInstructionParsed { .. } => EventType::QpackEventType(QpackEventType::InstructionParsed), - EventData::ConnectionError { .. } => EventType::GenericEventType(GenericEventType::ConnectionError), EventData::ApplicationError { .. } => @@ -497,6 +496,41 @@ impl From<&EventData> for EventType { EventType::GenericEventType(GenericEventType::Message), EventData::Marker { .. } => EventType::GenericEventType(GenericEventType::Marker), + + EventData::MOQTControlMessageCreated { .. } => + EventType::MOQTEventType(MOQTEventType::ControlMessageCreated), + EventData::MOQTControlMessageParsed { .. } => + EventType::MOQTEventType(MOQTEventType::ControlMessageParsed), + EventData::MOQTStreamTypeSet { .. } => + EventType::MOQTEventType(MOQTEventType::StreamTypeSet), + EventData::MOQTObjectDatagramCreated { .. } => + EventType::MOQTEventType(MOQTEventType::ObjectDatagramCreated), + EventData::MOQTObjectDatagramParsed { .. } => + EventType::MOQTEventType(MOQTEventType::ObjectDatagramParsed), + EventData::MOQTObjectDatagramStatusCreated { .. } => + EventType::MOQTEventType( + MOQTEventType::ObjectDatagramStatusCreated, + ), + EventData::MOQTObjectDatagramStatusParsed { .. } => + EventType::MOQTEventType( + MOQTEventType::ObjectDatagramStatusParsed, + ), + EventData::MOQTSubgroupHeaderCreated { .. } => + EventType::MOQTEventType(MOQTEventType::SubgroupHeaderCreated), + EventData::MOQTSubgroupHeaderParsed { .. } => + EventType::MOQTEventType(MOQTEventType::SubgroupHeaderParsed), + EventData::MOQTSubgroupObjectCreated { .. } => + EventType::MOQTEventType(MOQTEventType::SubgroupObjectCreated), + EventData::MOQTSubgroupObjectParsed { .. } => + EventType::MOQTEventType(MOQTEventType::SubgroupObjectParsed), + EventData::MOQTFetchHeaderCreated { .. } => + EventType::MOQTEventType(MOQTEventType::FetchHeaderCreated), + EventData::MOQTFetchHeaderParsed { .. } => + EventType::MOQTEventType(MOQTEventType::FetchHeaderParsed), + EventData::MOQTFetchObjectCreated { .. } => + EventType::MOQTEventType(MOQTEventType::FetchObjectCreated), + EventData::MOQTFetchObjectParsed { .. } => + EventType::MOQTEventType(MOQTEventType::FetchObjectParsed), } } } @@ -512,7 +546,7 @@ pub enum DataRecipient { } #[serde_with::skip_serializing_none] -#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] pub struct RawInfo { pub length: Option, pub payload_length: Option, @@ -693,6 +727,52 @@ pub enum EventData { marker_type: String, message: Option, }, + + // MOQT + #[serde(rename = "moqt:control_message_created")] + MOQTControlMessageCreated(moqt::MOQTControlMessageCreated), + + #[serde(rename = "moqt:control_message_parsed")] + MOQTControlMessageParsed(moqt::MOQTControlMessageParsed), + + #[serde(rename = "moqt:stream_type_set")] + MOQTStreamTypeSet(moqt::MOQTStreamTypeSet), + + #[serde(rename = "moqt:object_datagram_created")] + MOQTObjectDatagramCreated(moqt::MOQTObjectDatagramCreated), + + #[serde(rename = "moqt:object_datagram_parsed")] + MOQTObjectDatagramParsed(moqt::MOQTObjectDatagramParsed), + + #[serde(rename = "moqt:object_datagram_status_created")] + MOQTObjectDatagramStatusCreated(moqt::MOQTObjectDatagramStatusCreated), + + #[serde(rename = "moqt:object_datagram_status_parsed")] + MOQTObjectDatagramStatusParsed(moqt::MOQTObjectDatagramStatusParsed), + + #[serde(rename = "moqt:subgroup_header_created")] + MOQTSubgroupHeaderCreated(moqt::MOQTSubgroupHeaderCreated), + + #[serde(rename = "moqt:subgroup_header_parsed")] + MOQTSubgroupHeaderParsed(moqt::MOQTSubgroupHeaderParsed), + + #[serde(rename = "moqt:subgroup_object_created")] + MOQTSubgroupObjectCreated(moqt::MOQTSubgroupObjectCreated), + + #[serde(rename = "moqt:subgroup_object_parsed")] + MOQTSubgroupObjectParsed(moqt::MOQTSubgroupObjectParsed), + + #[serde(rename = "moqt:fetch_header_created")] + MOQTFetchHeaderCreated(moqt::MOQTFetchHeaderCreated), + + #[serde(rename = "moqt:fetch_header_parsed")] + MOQTFetchHeaderParsed(moqt::MOQTFetchHeaderParsed), + + #[serde(rename = "moqt:fetch_object_created")] + MOQTFetchObjectCreated(moqt::MOQTFetchObjectCreated), + + #[serde(rename = "moqt:fetch_object_parsed")] + MOQTFetchObjectParsed(moqt::MOQTFetchObjectParsed), } impl EventData { @@ -754,5 +834,6 @@ pub mod quic; pub mod connectivity; pub mod h3; +pub mod moqt; pub mod qpack; pub mod security; diff --git a/qlog/src/events/moqt.rs b/qlog/src/events/moqt.rs new file mode 100644 index 0000000000..1d60cc27e6 --- /dev/null +++ b/qlog/src/events/moqt.rs @@ -0,0 +1,442 @@ +// Copyright (C) 2025, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use serde::Deserialize; +use serde::Serialize; + +use super::RawInfo; + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +#[serde(rename_all = "snake_case")] +pub enum MOQTOwner { + Local, + Remote, +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +#[serde(rename_all = "snake_case")] +pub enum MOQTStreamType { + SubgroupHeader, + FetchHeader, + #[default] + Unknown, +} + +#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Debug)] +#[serde(rename_all = "snake_case")] +pub enum MOQTEventType { + StreamTypeSet, + ControlMessageCreated, + ControlMessageParsed, + ObjectDatagramCreated, + ObjectDatagramParsed, + ObjectDatagramStatusCreated, + ObjectDatagramStatusParsed, + SubgroupHeaderCreated, + SubgroupHeaderParsed, + SubgroupObjectCreated, + SubgroupObjectParsed, + FetchHeaderCreated, + FetchHeaderParsed, + FetchObjectCreated, + FetchObjectParsed, +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct MOQTParameter { + pub name: String, + pub length: u64, + pub value: Option, +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct MOQTExtensionHeader { + header_type: u64, + header_value: Option, + header_length: Option, + payload: Option, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +#[serde(tag = "frame_type")] +#[serde(rename_all = "snake_case")] +// Strictly, the qlog spec says that all these control messages have a type +// field. But instead of making that a rust object property, just use serde to +// ensure it goes out on the wire. This means that deserialization of control +// messages also works automatically. +pub enum MOQTControlMessage { + ClientSetup { + number_of_supported_versions: u64, + supported_versions: Vec, + number_of_parameters: u64, + setup_parameters: Vec, + }, + + SeverSetup { + selected_version: u64, + number_of_parameters: u64, + setup_parameters: Vec, + }, + + Goaway { + new_session_uri: RawInfo, + }, + + Subscribe { + subscribe_id: u64, + track_alias: u64, + // track_namespace: TODO pending tuple decision + track_name: RawInfo, + subscriber_priority: u8, + group_order: u8, + filter_type: u64, + start_group: Option, + start_object: Option, + end_group: Option, + number_of_parameters: u64, + subscribe_parameters: Vec, + }, + + SubscribeUpdate { + subscribe_id: u64, + start_group: u64, + start_object: u64, + end_group: u64, + subscriber_priority: u8, + number_of_parameters: u64, + subscribe_parameters: Vec, + }, + + Unsubscribe { + subscribe_id: u64, + }, + + Fetch { + subscribe_id: u64, + subscriber_priority: u8, + group_order: u8, + fetch_type: u64, + // track_namespace: TODO pending tuple decision + track_name: Option, + start_group: Option, + start_object: Option, + end_group: Option, + end_object: Option, + joining_subscribe_id: Option, + preceding_group_offset: Option, + number_of_parameters: u64, + parameters: Vec, + }, + + FetchCancel { + subscribe_id: u64, + }, + + AnnounceOk { + // track_namespace: TODO pending tuple decision + }, + + AnnounceError { + // track_namespace: TODO pending tuple decision + error_code: u64, + reason_phrase: RawInfo, + }, + + AnnounceCancel { + // track_namespace: TODO pending tuple decision + error_code: u64, + reason_phrase: RawInfo, + }, + + TrackStatusRequest { + // track_namespace: TODO pending tuple decision + track_name: RawInfo, + }, + + SubscribeAnnounces { + // track_namespace: TODO pending tuple decision + number_of_parameters: u64, + parameters: Vec, + }, + + UnsubscribeAnnounces { + // track_namespace: TODO pending tuple decision + }, + + SubscribeOk { + subscribe_id: u64, + expires: u8, + group_order: u8, + content_exists: u8, + largest_group_id: Option, + largest_object_id: Option, + number_of_parameters: u64, + parameters: Vec, + }, + + SubscribeError { + subscribe_id: u64, + error_code: u64, + reason_phrase: RawInfo, + track_alias: u64, + }, + + FetchOk { + subscribe_id: u64, + group_order: u8, + end_of_track: u8, + largest_group_id: Option, + largest_object_id: Option, + number_of_parameters: u64, + parameters: Vec, + }, + + FetchError { + subscribe_id: u64, + error_code: u64, + reason_phrase: RawInfo, + }, + + SubscribeDone { + subscribe_id: u64, + status_code: u64, + stream_count: u64, + reason_phrase: RawInfo, + }, + + MaxSubscribeId { + subscribe_id: u64, + }, + + SubscribedBlocked { + maximum_subscribe_id: u64, + }, + + Announce { + // track_namespace: TODO pending tuple decision + number_of_parameters: u64, + parameters: Vec, + }, + + Unannounce { + // track_namespace: TODO pending tuple decision + }, + + TrackStatus { + // track_namespace: TODO pending tuple decision + track_name: RawInfo, + status_code: u64, + last_group_id: Option, + last_object_id: Option, + }, + + SubscribeAnnouncesOk { + // track_namespace: TODO pending tuple decision + }, + + SubscribeAnnouncesError { + // track_namespace: TODO pending tuple decision + error_code: u64, + reason_phrase: RawInfo, + }, + + Unknown, +} + +impl Default for MOQTControlMessage { + fn default() -> Self { + Self::Unknown + } +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTControlMessageCreated { + pub stream_id: u64, + pub length: Option, + pub message: MOQTControlMessage, + + pub raw: Option, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTControlMessageParsed { + pub stream_id: u64, + pub length: Option, + pub message: MOQTControlMessage, + + pub raw: Option, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTStreamTypeSet { + pub owner: Option, + pub stream_id: u64, + pub stream_type: MOQTStreamType, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTObjectDatagramCreated { + pub track_alias: u64, + pub group_id: u64, + pub object_id: u64, + pub publisher_priority: u8, + pub extension_headers_length: u64, + pub extension_headers: Option>, + pub object_payload: RawInfo, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTObjectDatagramParsed { + pub track_alias: u64, + pub group_id: u64, + pub object_id: u64, + pub publisher_priority: u8, + pub extension_headers_length: u64, + pub extension_headers: Option>, + pub object_payload: RawInfo, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTObjectDatagramStatusCreated { + pub track_alias: u64, + pub group_id: u64, + pub object_id: u64, + pub publisher_priority: u8, + pub extension_headers_length: u64, + pub extension_headers: Option>, + pub object_status: u64, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTObjectDatagramStatusParsed { + pub track_alias: u64, + pub group_id: u64, + pub object_id: u64, + pub publisher_priority: u8, + pub extension_headers_length: u64, + pub extension_headers: Option>, + pub object_status: u64, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTSubgroupHeaderCreated { + pub stream_id: u64, + pub track_alias: u64, + pub group_id: u64, + pub object_id: u64, + pub publisher_priority: u8, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTSubgroupHeaderParsed { + pub stream_id: u64, + pub track_alias: u64, + pub group_id: u64, + pub object_id: u64, + pub publisher_priority: u8, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTSubgroupObjectCreated { + pub stream_id: u64, + pub group_id: Option, + pub subgroup_id: Option, + pub object_id: u64, + pub extension_headers_length: u64, + pub extension_headers: Option>, + pub object_payload_length: u64, + pub object_status: Option, + pub object_payload: Option, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTSubgroupObjectParsed { + pub stream_id: u64, + pub group_id: Option, + pub subgroup_id: Option, + pub object_id: u64, + pub extension_headers_length: u64, + pub extension_headers: Option>, + pub object_payload_length: u64, + pub object_status: Option, + pub object_payload: Option, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTFetchHeaderCreated { + pub stream_id: u64, + pub subscribe_id: u64, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTFetchHeaderParsed { + pub stream_id: u64, + pub subscribe_id: u64, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTFetchObjectCreated { + pub stream_id: u64, + pub group_id: Option, + pub subgroup_id: Option, + pub object_id: u64, + pub publisher_priority: u8, + pub extension_headers_length: u64, + pub extension_headers: Option>, + pub object_payload_length: u64, + pub object_status: Option, + pub object_payload: Option, +} + +#[serde_with::skip_serializing_none] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] +pub struct MOQTFetchObjectParsed { + pub stream_id: u64, + pub group_id: Option, + pub subgroup_id: Option, + pub object_id: u64, + pub publisher_priority: u8, + pub extension_headers_length: u64, + pub extension_headers: Option>, + pub object_payload_length: u64, + pub object_status: Option, + pub object_payload: Option, +}