diff --git a/Cargo.lock b/Cargo.lock index b5b3dce754083..456bb45b250f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3386,6 +3386,8 @@ dependencies = [ "mz-repr", "mz-stash", "num_enum", + "proptest", + "proptest-derive", "prost", "prost-build", "rdkafka", diff --git a/src/dataflow-types/Cargo.toml b/src/dataflow-types/Cargo.toml index 29a0eb84e92eb..71cfcaccfc3b1 100644 --- a/src/dataflow-types/Cargo.toml +++ b/src/dataflow-types/Cargo.toml @@ -52,6 +52,8 @@ tokio-util = { version = "0.7.1", features = ["codec"] } tracing = "0.1.34" url = { version = "2.2.2", features = ["serde"] } uuid = { version = "0.8.2", features = ["serde", "v4"] } +proptest = { git = "https://github.com/MaterializeInc/proptest.git", default-features = false, features = ["std"]} +proptest-derive = { git = "https://github.com/MaterializeInc/proptest.git"} [build-dependencies] prost-build = { version = "0.10.1", features = ["vendored"] } diff --git a/src/dataflow-types/build.rs b/src/dataflow-types/build.rs index 9d58e4ae90de7..7bfb2331048a8 100644 --- a/src/dataflow-types/build.rs +++ b/src/dataflow-types/build.rs @@ -9,7 +9,18 @@ fn main() { prost_build::Config::new() - .type_attribute(".", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") - .compile_protos(&["dataflow-types/src/postgres_source.proto"], &[".."]) + .extern_path(".mz_repr.global_id", "::mz_repr::global_id") + .extern_path(".mz_repr.proto", "::mz_repr::proto") + .type_attribute( + ".mz_dataflow_types.postgres_source", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .compile_protos( + &[ + "dataflow-types/src/logging.proto", + "dataflow-types/src/postgres_source.proto", + ], + &[".."], + ) .unwrap(); } diff --git a/src/dataflow-types/src/logging.proto b/src/dataflow-types/src/logging.proto new file mode 100644 index 0000000000000..d9afb16415e67 --- /dev/null +++ b/src/dataflow-types/src/logging.proto @@ -0,0 +1,66 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; +import "repr/src/global_id.proto"; +import "repr/src/proto.proto"; + +package mz_dataflow_types.logging; + +message ProtoActiveLog { + ProtoLogVariant key = 1; + mz_repr.global_id.ProtoGlobalId value = 2; +} + +message ProtoTimelyLog { + oneof kind { + google.protobuf.Empty operates = 1; + google.protobuf.Empty channels = 2; + google.protobuf.Empty elapsed = 3; + google.protobuf.Empty histogram = 4; + google.protobuf.Empty addresses = 5; + google.protobuf.Empty parks = 6; + google.protobuf.Empty messages_sent = 7; + google.protobuf.Empty messages_received = 8; + google.protobuf.Empty reachability = 9; + } +} + +message ProtoDifferentialLog { + oneof kind { + google.protobuf.Empty arrangement_batches = 1; + google.protobuf.Empty arrangement_records = 2; + google.protobuf.Empty sharing = 3; + } +} + +message ProtoMaterializedLog { + oneof kind { + google.protobuf.Empty dataflow_current = 1; + google.protobuf.Empty dataflow_dependency = 2; + google.protobuf.Empty frontier_current = 3; + google.protobuf.Empty peek_current = 4; + google.protobuf.Empty peek_duration = 5; + } +} +message ProtoLogVariant { + oneof kind { + ProtoTimelyLog timely = 1; + ProtoDifferentialLog differential = 2; + ProtoMaterializedLog materialized = 3; + } +} + +message ProtoLoggingConfig { + mz_repr.proto.ProtoU128 granularity_ns = 1; + repeated ProtoActiveLog active_logs = 2; + bool log_logging = 3; +} diff --git a/src/dataflow-types/src/logging.rs b/src/dataflow-types/src/logging.rs index e115580454d18..42cd397ef235f 100644 --- a/src/dataflow-types/src/logging.rs +++ b/src/dataflow-types/src/logging.rs @@ -9,12 +9,16 @@ use std::collections::HashMap; +use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; +use mz_repr::proto::{FromProtoIfSome, ProtoRepr, TryFromProtoError, TryIntoIfSome}; use mz_repr::{GlobalId, RelationDesc, ScalarType}; +include!(concat!(env!("OUT_DIR"), "/mz_dataflow_types.logging.rs")); + /// Logging configuration. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Arbitrary, Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct LoggingConfig { pub granularity_ns: u128, pub active_logs: HashMap, @@ -29,14 +33,97 @@ impl LoggingConfig { } } -#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] +impl From<&LoggingConfig> for ProtoLoggingConfig { + fn from(x: &LoggingConfig) -> Self { + ProtoLoggingConfig { + granularity_ns: Some(x.granularity_ns.into_proto()), + active_logs: x.active_logs.iter().map(Into::into).collect(), + log_logging: x.log_logging, + } + } +} + +impl TryFrom for LoggingConfig { + type Error = TryFromProtoError; + + fn try_from(x: ProtoLoggingConfig) -> Result { + Ok(LoggingConfig { + granularity_ns: x + .granularity_ns + .from_proto_if_some("ProtoLoggingConfig::granularity_ns")?, + active_logs: x + .active_logs + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + log_logging: x.log_logging, + }) + } +} + +impl From<(&LogVariant, &GlobalId)> for ProtoActiveLog { + fn from(x: (&LogVariant, &GlobalId)) -> Self { + ProtoActiveLog { + key: Some(x.0.into()), + value: Some(x.1.into()), + } + } +} + +impl TryFrom for (LogVariant, GlobalId) { + type Error = TryFromProtoError; + + fn try_from(x: ProtoActiveLog) -> Result { + Ok(( + x.key.try_into_if_some("ProtoActiveLog::key")?, + x.value.try_into_if_some("ProtoActiveLog::value")?, + )) + } +} + +#[derive(Arbitrary, Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub enum LogVariant { Timely(TimelyLog), Differential(DifferentialLog), Materialized(MaterializedLog), } -#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] +impl From<&LogVariant> for ProtoLogVariant { + fn from(x: &LogVariant) -> Self { + ProtoLogVariant { + kind: Some(match x { + LogVariant::Timely(timely) => proto_log_variant::Kind::Timely(timely.into()), + LogVariant::Differential(differential) => { + proto_log_variant::Kind::Differential(differential.into()) + } + LogVariant::Materialized(materialize) => { + proto_log_variant::Kind::Materialized(materialize.into()) + } + }), + } + } +} + +impl TryFrom for LogVariant { + type Error = TryFromProtoError; + + fn try_from(x: ProtoLogVariant) -> Result { + match x.kind { + Some(proto_log_variant::Kind::Timely(timely)) => { + Ok(LogVariant::Timely(timely.try_into()?)) + } + Some(proto_log_variant::Kind::Differential(differential)) => { + Ok(LogVariant::Differential(differential.try_into()?)) + } + Some(proto_log_variant::Kind::Materialized(materialized)) => { + Ok(LogVariant::Materialized(materialized.try_into()?)) + } + None => Err(TryFromProtoError::missing_field("ProtoLogVariant::kind")), + } + } +} + +#[derive(Arbitrary, Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub enum TimelyLog { Operates, Channels, @@ -49,14 +136,82 @@ pub enum TimelyLog { Reachability, } -#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] +impl From<&TimelyLog> for ProtoTimelyLog { + fn from(x: &TimelyLog) -> Self { + use proto_timely_log::Kind::*; + ProtoTimelyLog { + kind: Some(match x { + TimelyLog::Operates => Operates(()), + TimelyLog::Channels => Channels(()), + TimelyLog::Elapsed => Elapsed(()), + TimelyLog::Histogram => Histogram(()), + TimelyLog::Addresses => Addresses(()), + TimelyLog::Parks => Parks(()), + TimelyLog::MessagesSent => MessagesSent(()), + TimelyLog::MessagesReceived => MessagesReceived(()), + TimelyLog::Reachability => Reachability(()), + }), + } + } +} + +impl TryFrom for TimelyLog { + type Error = TryFromProtoError; + + fn try_from(x: ProtoTimelyLog) -> Result { + use proto_timely_log::Kind::*; + match x.kind { + Some(Operates(())) => Ok(TimelyLog::Operates), + Some(Channels(())) => Ok(TimelyLog::Channels), + Some(Elapsed(())) => Ok(TimelyLog::Elapsed), + Some(Histogram(())) => Ok(TimelyLog::Histogram), + Some(Addresses(())) => Ok(TimelyLog::Addresses), + Some(Parks(())) => Ok(TimelyLog::Parks), + Some(MessagesSent(())) => Ok(TimelyLog::MessagesSent), + Some(MessagesReceived(())) => Ok(TimelyLog::MessagesReceived), + Some(Reachability(())) => Ok(TimelyLog::Reachability), + None => Err(TryFromProtoError::missing_field("ProtoTimelyLog::kind")), + } + } +} + +#[derive(Arbitrary, Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub enum DifferentialLog { ArrangementBatches, ArrangementRecords, Sharing, } -#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] +impl From<&DifferentialLog> for ProtoDifferentialLog { + fn from(x: &DifferentialLog) -> Self { + use proto_differential_log::Kind::*; + ProtoDifferentialLog { + kind: Some(match x { + DifferentialLog::ArrangementBatches => ArrangementBatches(()), + DifferentialLog::ArrangementRecords => ArrangementRecords(()), + DifferentialLog::Sharing => Sharing(()), + }), + } + } +} + +impl TryFrom for DifferentialLog { + type Error = TryFromProtoError; + + fn try_from(x: ProtoDifferentialLog) -> Result { + use proto_differential_log::Kind::*; + match x.kind { + Some(ArrangementBatches(())) => Ok(DifferentialLog::ArrangementBatches), + Some(ArrangementRecords(())) => Ok(DifferentialLog::ArrangementRecords), + Some(Sharing(())) => Ok(DifferentialLog::Sharing), + None => Err(TryFromProtoError::missing_field( + "ProtoDifferentialLog::kind", + )), + } + } +} + +#[derive(Arbitrary, Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub enum MaterializedLog { DataflowCurrent, DataflowDependency, @@ -65,6 +220,39 @@ pub enum MaterializedLog { PeekDuration, } +impl From<&MaterializedLog> for ProtoMaterializedLog { + fn from(x: &MaterializedLog) -> Self { + use proto_materialized_log::Kind::*; + ProtoMaterializedLog { + kind: Some(match x { + MaterializedLog::DataflowCurrent => DataflowCurrent(()), + MaterializedLog::DataflowDependency => DataflowDependency(()), + MaterializedLog::FrontierCurrent => FrontierCurrent(()), + MaterializedLog::PeekCurrent => PeekCurrent(()), + MaterializedLog::PeekDuration => PeekDuration(()), + }), + } + } +} + +impl TryFrom for MaterializedLog { + type Error = TryFromProtoError; + + fn try_from(x: ProtoMaterializedLog) -> Result { + use proto_materialized_log::Kind::*; + match x.kind { + Some(DataflowCurrent(())) => Ok(MaterializedLog::DataflowCurrent), + Some(DataflowDependency(())) => Ok(MaterializedLog::DataflowDependency), + Some(FrontierCurrent(())) => Ok(MaterializedLog::FrontierCurrent), + Some(PeekCurrent(())) => Ok(MaterializedLog::PeekCurrent), + Some(PeekDuration(())) => Ok(MaterializedLog::PeekDuration), + None => Err(TryFromProtoError::missing_field( + "ProtoMaterializedLog::kind", + )), + } + } +} + impl LogVariant { /// By which columns should the logs be indexed. /// @@ -232,3 +420,19 @@ impl LogVariant { } } } + +#[cfg(test)] +mod tests { + use super::*; + use mz_repr::proto::protobuf_roundtrip; + use proptest::prelude::*; + + proptest! { + #[test] + fn logging_config_protobuf_roundtrip(expect in any::()) { + let actual = protobuf_roundtrip::<_, ProtoLoggingConfig>(&expect); + assert!(actual.is_ok()); + assert_eq!(actual.unwrap(), expect); + } + } +} diff --git a/src/repr/build.rs b/src/repr/build.rs index 0864f5e6d0be3..a7da1cea9a96c 100644 --- a/src/repr/build.rs +++ b/src/repr/build.rs @@ -13,6 +13,7 @@ fn main() { &[ "repr/src/chrono.proto", "repr/src/global_id.proto", + "repr/src/proto.proto", "repr/src/row.proto", "repr/src/strconv.proto", "repr/src/relation_and_scalar.proto", diff --git a/src/repr/src/proto.proto b/src/repr/src/proto.proto new file mode 100644 index 0000000000000..e8bd9d648c7cb --- /dev/null +++ b/src/repr/src/proto.proto @@ -0,0 +1,17 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +package mz_repr.proto; + +message ProtoU128{ + uint64 hi = 1; + uint64 lo = 2; +} diff --git a/src/repr/src/proto.rs b/src/repr/src/proto.rs index f6af451537c4c..fa576d1955046 100644 --- a/src/repr/src/proto.rs +++ b/src/repr/src/proto.rs @@ -12,6 +12,8 @@ use mz_ore::cast::CastFrom; use std::{char::CharTryFromError, num::TryFromIntError}; +include!(concat!(env!("OUT_DIR"), "/mz_repr.proto.rs")); + /// An error thrown when trying to convert from a `*.proto`-generated type /// `Proto$T` to `$T`. #[derive(Debug)] @@ -131,6 +133,20 @@ impl ProtoRepr for u8 { } } +impl ProtoRepr for u128 { + type Repr = ProtoU128; + + fn into_proto(self: Self) -> Self::Repr { + let lo = (self & (u64::MAX as u128)) as u64; + let hi = (self >> 64) as u64; + ProtoU128 { hi, lo } + } + + fn from_proto(repr: Self::Repr) -> Result { + Ok((repr.hi as u128) << 64 | (repr.lo as u128)) + } +} + impl ProtoRepr for Option { type Repr = Option; @@ -162,6 +178,24 @@ where } } +/// Convenience syntax for trying to convert a `Self` value of type +/// `Option` to `T` if the value is `Some(value)`, or returning +/// [`TryFromProtoError::MissingField`] if the value is `None`. +pub trait FromProtoIfSome { + fn from_proto_if_some(self, field: S) -> Result; +} + +/// A blanket implementation for `Option` where `U` is the +/// `ProtoRepr::Repr` type for `T`. +impl FromProtoIfSome for Option +where + T: ProtoRepr, +{ + fn from_proto_if_some(self, field: S) -> Result { + T::from_proto(self.ok_or_else(|| TryFromProtoError::missing_field(field))?) + } +} + pub fn protobuf_roundtrip<'t, T, U>(val: &'t T) -> anyhow::Result where T: TryFrom,