diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 70cfd1836efe..8d62d0a858ac 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -230,7 +230,7 @@ impl BatchSerializer for JsonSerializer { } /// Implements [`DataSink`] for writing to a Json file. -struct JsonSink { +pub struct JsonSink { /// Config options for writing data config: FileSinkConfig, } @@ -258,10 +258,16 @@ impl DisplayAs for JsonSink { } impl JsonSink { - fn new(config: FileSinkConfig) -> Self { + /// Create from config. + pub fn new(config: FileSinkConfig) -> Self { Self { config } } + /// Retrieve the inner [`FileSinkConfig`]. + pub fn config(&self) -> &FileSinkConfig { + &self.config + } + async fn append_all( &self, data: SendableRecordBatchStream, diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 627d58e13781..4eeb58974aba 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -151,11 +151,21 @@ impl FileSinkExec { } } + /// Input execution plan + pub fn input(&self) -> &Arc { + &self.input + } + /// Returns insert sink pub fn sink(&self) -> &dyn DataSink { self.sink.as_ref() } + /// Optional sort order for output data + pub fn sort_order(&self) -> &Option> { + &self.sort_order + } + /// Returns the metrics of the underlying [DataSink] pub fn metrics(&self) -> Option { self.sink.metrics() @@ -170,7 +180,7 @@ impl DisplayAs for FileSinkExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "InsertExec: sink=")?; + write!(f, "FileSinkExec: sink=")?; self.sink.fmt_as(t, f) } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 9b6a0448f810..bc6de2348e8d 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1130,9 +1130,63 @@ message PhysicalPlanNode { SortPreservingMergeExecNode sort_preserving_merge = 21; NestedLoopJoinExecNode nested_loop_join = 22; AnalyzeExecNode analyze = 23; + JsonSinkExecNode json_sink = 24; } } +enum FileWriterMode { + APPEND = 0; + PUT = 1; + PUT_MULTIPART = 2; +} + +enum CompressionTypeVariant { + GZIP = 0; + BZIP2 = 1; + XZ = 2; + ZSTD = 3; + UNCOMPRESSED = 4; +} + +message PartitionColumn { + string name = 1; + ArrowType arrow_type = 2; +} + +message FileTypeWriterOptions { + oneof FileType { + JsonWriterOptions json_options = 1; + } +} + +message JsonWriterOptions { + CompressionTypeVariant compression = 1; +} + +message FileSinkConfig { + string object_store_url = 1; + repeated PartitionedFile file_groups = 2; + repeated string table_paths = 3; + Schema output_schema = 4; + repeated PartitionColumn table_partition_cols = 5; + FileWriterMode writer_mode = 6; + bool single_file_output = 7; + bool unbounded_input = 8; + bool overwrite = 9; + FileTypeWriterOptions file_type_writer_options = 10; +} + +message JsonSink { + FileSinkConfig config = 1; +} + +message JsonSinkExecNode { + PhysicalPlanNode input = 1; + JsonSink sink = 2; + Schema sink_schema = 3; + PhysicalSortExprNodeCollection sort_order = 4; +} + message PhysicalExtensionNode { bytes node = 1; repeated PhysicalPlanNode inputs = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 3eeb060f8d01..659a25f9fa35 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3421,6 +3421,86 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { deserializer.deserialize_struct("datafusion.ColumnStats", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CompressionTypeVariant { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::Gzip => "GZIP", + Self::Bzip2 => "BZIP2", + Self::Xz => "XZ", + Self::Zstd => "ZSTD", + Self::Uncompressed => "UNCOMPRESSED", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for CompressionTypeVariant { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "GZIP", + "BZIP2", + "XZ", + "ZSTD", + "UNCOMPRESSED", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CompressionTypeVariant; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "GZIP" => Ok(CompressionTypeVariant::Gzip), + "BZIP2" => Ok(CompressionTypeVariant::Bzip2), + "XZ" => Ok(CompressionTypeVariant::Xz), + "ZSTD" => Ok(CompressionTypeVariant::Zstd), + "UNCOMPRESSED" => Ok(CompressionTypeVariant::Uncompressed), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} impl serde::Serialize for Constraint { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -7206,7 +7286,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { deserializer.deserialize_struct("datafusion.FileScanExecConf", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for FilterExecNode { +impl serde::Serialize for FileSinkConfig { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -7214,37 +7294,112 @@ impl serde::Serialize for FilterExecNode { { use serde::ser::SerializeStruct; let mut len = 0; - if self.input.is_some() { + if !self.object_store_url.is_empty() { len += 1; } - if self.expr.is_some() { + if !self.file_groups.is_empty() { len += 1; } - let mut struct_ser = serializer.serialize_struct("datafusion.FilterExecNode", len)?; - if let Some(v) = self.input.as_ref() { - struct_ser.serialize_field("input", v)?; + if !self.table_paths.is_empty() { + len += 1; } - if let Some(v) = self.expr.as_ref() { - struct_ser.serialize_field("expr", v)?; + if self.output_schema.is_some() { + len += 1; + } + if !self.table_partition_cols.is_empty() { + len += 1; + } + if self.writer_mode != 0 { + len += 1; + } + if self.single_file_output { + len += 1; + } + if self.unbounded_input { + len += 1; + } + if self.overwrite { + len += 1; + } + if self.file_type_writer_options.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.FileSinkConfig", len)?; + if !self.object_store_url.is_empty() { + struct_ser.serialize_field("objectStoreUrl", &self.object_store_url)?; + } + if !self.file_groups.is_empty() { + struct_ser.serialize_field("fileGroups", &self.file_groups)?; + } + if !self.table_paths.is_empty() { + struct_ser.serialize_field("tablePaths", &self.table_paths)?; + } + if let Some(v) = self.output_schema.as_ref() { + struct_ser.serialize_field("outputSchema", v)?; + } + if !self.table_partition_cols.is_empty() { + struct_ser.serialize_field("tablePartitionCols", &self.table_partition_cols)?; + } + if self.writer_mode != 0 { + let v = FileWriterMode::try_from(self.writer_mode) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.writer_mode)))?; + struct_ser.serialize_field("writerMode", &v)?; + } + if self.single_file_output { + struct_ser.serialize_field("singleFileOutput", &self.single_file_output)?; + } + if self.unbounded_input { + struct_ser.serialize_field("unboundedInput", &self.unbounded_input)?; + } + if self.overwrite { + struct_ser.serialize_field("overwrite", &self.overwrite)?; + } + if let Some(v) = self.file_type_writer_options.as_ref() { + struct_ser.serialize_field("fileTypeWriterOptions", v)?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for FilterExecNode { +impl<'de> serde::Deserialize<'de> for FileSinkConfig { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "input", - "expr", + "object_store_url", + "objectStoreUrl", + "file_groups", + "fileGroups", + "table_paths", + "tablePaths", + "output_schema", + "outputSchema", + "table_partition_cols", + "tablePartitionCols", + "writer_mode", + "writerMode", + "single_file_output", + "singleFileOutput", + "unbounded_input", + "unboundedInput", + "overwrite", + "file_type_writer_options", + "fileTypeWriterOptions", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - Input, - Expr, + ObjectStoreUrl, + FileGroups, + TablePaths, + OutputSchema, + TablePartitionCols, + WriterMode, + SingleFileOutput, + UnboundedInput, + Overwrite, + FileTypeWriterOptions, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7266,8 +7421,16 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { E: serde::de::Error, { match value { - "input" => Ok(GeneratedField::Input), - "expr" => Ok(GeneratedField::Expr), + "objectStoreUrl" | "object_store_url" => Ok(GeneratedField::ObjectStoreUrl), + "fileGroups" | "file_groups" => Ok(GeneratedField::FileGroups), + "tablePaths" | "table_paths" => Ok(GeneratedField::TablePaths), + "outputSchema" | "output_schema" => Ok(GeneratedField::OutputSchema), + "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), + "writerMode" | "writer_mode" => Ok(GeneratedField::WriterMode), + "singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput), + "unboundedInput" | "unbounded_input" => Ok(GeneratedField::UnboundedInput), + "overwrite" => Ok(GeneratedField::Overwrite), + "fileTypeWriterOptions" | "file_type_writer_options" => Ok(GeneratedField::FileTypeWriterOptions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7277,44 +7440,108 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = FilterExecNode; + type Value = FileSinkConfig; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.FilterExecNode") + formatter.write_str("struct datafusion.FileSinkConfig") } - fn visit_map(self, mut map_: V) -> std::result::Result + fn visit_map(self, mut map_: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut input__ = None; - let mut expr__ = None; + let mut object_store_url__ = None; + let mut file_groups__ = None; + let mut table_paths__ = None; + let mut output_schema__ = None; + let mut table_partition_cols__ = None; + let mut writer_mode__ = None; + let mut single_file_output__ = None; + let mut unbounded_input__ = None; + let mut overwrite__ = None; + let mut file_type_writer_options__ = None; while let Some(k) = map_.next_key()? { match k { - GeneratedField::Input => { - if input__.is_some() { - return Err(serde::de::Error::duplicate_field("input")); + GeneratedField::ObjectStoreUrl => { + if object_store_url__.is_some() { + return Err(serde::de::Error::duplicate_field("objectStoreUrl")); } - input__ = map_.next_value()?; + object_store_url__ = Some(map_.next_value()?); } - GeneratedField::Expr => { - if expr__.is_some() { - return Err(serde::de::Error::duplicate_field("expr")); + GeneratedField::FileGroups => { + if file_groups__.is_some() { + return Err(serde::de::Error::duplicate_field("fileGroups")); } - expr__ = map_.next_value()?; + file_groups__ = Some(map_.next_value()?); + } + GeneratedField::TablePaths => { + if table_paths__.is_some() { + return Err(serde::de::Error::duplicate_field("tablePaths")); + } + table_paths__ = Some(map_.next_value()?); + } + GeneratedField::OutputSchema => { + if output_schema__.is_some() { + return Err(serde::de::Error::duplicate_field("outputSchema")); + } + output_schema__ = map_.next_value()?; + } + GeneratedField::TablePartitionCols => { + if table_partition_cols__.is_some() { + return Err(serde::de::Error::duplicate_field("tablePartitionCols")); + } + table_partition_cols__ = Some(map_.next_value()?); + } + GeneratedField::WriterMode => { + if writer_mode__.is_some() { + return Err(serde::de::Error::duplicate_field("writerMode")); + } + writer_mode__ = Some(map_.next_value::()? as i32); + } + GeneratedField::SingleFileOutput => { + if single_file_output__.is_some() { + return Err(serde::de::Error::duplicate_field("singleFileOutput")); + } + single_file_output__ = Some(map_.next_value()?); + } + GeneratedField::UnboundedInput => { + if unbounded_input__.is_some() { + return Err(serde::de::Error::duplicate_field("unboundedInput")); + } + unbounded_input__ = Some(map_.next_value()?); + } + GeneratedField::Overwrite => { + if overwrite__.is_some() { + return Err(serde::de::Error::duplicate_field("overwrite")); + } + overwrite__ = Some(map_.next_value()?); + } + GeneratedField::FileTypeWriterOptions => { + if file_type_writer_options__.is_some() { + return Err(serde::de::Error::duplicate_field("fileTypeWriterOptions")); + } + file_type_writer_options__ = map_.next_value()?; } } } - Ok(FilterExecNode { - input: input__, - expr: expr__, + Ok(FileSinkConfig { + object_store_url: object_store_url__.unwrap_or_default(), + file_groups: file_groups__.unwrap_or_default(), + table_paths: table_paths__.unwrap_or_default(), + output_schema: output_schema__, + table_partition_cols: table_partition_cols__.unwrap_or_default(), + writer_mode: writer_mode__.unwrap_or_default(), + single_file_output: single_file_output__.unwrap_or_default(), + unbounded_input: unbounded_input__.unwrap_or_default(), + overwrite: overwrite__.unwrap_or_default(), + file_type_writer_options: file_type_writer_options__, }) } } - deserializer.deserialize_struct("datafusion.FilterExecNode", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("datafusion.FileSinkConfig", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for FixedSizeBinary { +impl serde::Serialize for FileTypeWriterOptions { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -7322,29 +7549,34 @@ impl serde::Serialize for FixedSizeBinary { { use serde::ser::SerializeStruct; let mut len = 0; - if self.length != 0 { + if self.file_type.is_some() { len += 1; } - let mut struct_ser = serializer.serialize_struct("datafusion.FixedSizeBinary", len)?; - if self.length != 0 { - struct_ser.serialize_field("length", &self.length)?; + let mut struct_ser = serializer.serialize_struct("datafusion.FileTypeWriterOptions", len)?; + if let Some(v) = self.file_type.as_ref() { + match v { + file_type_writer_options::FileType::JsonOptions(v) => { + struct_ser.serialize_field("jsonOptions", v)?; + } + } } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for FixedSizeBinary { +impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "length", + "json_options", + "jsonOptions", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - Length, + JsonOptions, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7366,7 +7598,7 @@ impl<'de> serde::Deserialize<'de> for FixedSizeBinary { E: serde::de::Error, { match value { - "length" => Ok(GeneratedField::Length), + "jsonOptions" | "json_options" => Ok(GeneratedField::JsonOptions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7376,102 +7608,376 @@ impl<'de> serde::Deserialize<'de> for FixedSizeBinary { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = FixedSizeBinary; + type Value = FileTypeWriterOptions; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.FixedSizeBinary") + formatter.write_str("struct datafusion.FileTypeWriterOptions") } - fn visit_map(self, mut map_: V) -> std::result::Result + fn visit_map(self, mut map_: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut length__ = None; + let mut file_type__ = None; while let Some(k) = map_.next_key()? { match k { - GeneratedField::Length => { - if length__.is_some() { - return Err(serde::de::Error::duplicate_field("length")); + GeneratedField::JsonOptions => { + if file_type__.is_some() { + return Err(serde::de::Error::duplicate_field("jsonOptions")); } - length__ = - Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) - ; + file_type__ = map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::JsonOptions) +; } } } - Ok(FixedSizeBinary { - length: length__.unwrap_or_default(), + Ok(FileTypeWriterOptions { + file_type: file_type__, }) } } - deserializer.deserialize_struct("datafusion.FixedSizeBinary", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("datafusion.FileTypeWriterOptions", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for FixedSizeList { +impl serde::Serialize for FileWriterMode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where S: serde::Serializer, { - use serde::ser::SerializeStruct; - let mut len = 0; - if self.field_type.is_some() { - len += 1; - } - if self.list_size != 0 { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion.FixedSizeList", len)?; - if let Some(v) = self.field_type.as_ref() { - struct_ser.serialize_field("fieldType", v)?; - } - if self.list_size != 0 { - struct_ser.serialize_field("listSize", &self.list_size)?; - } - struct_ser.end() + let variant = match self { + Self::Append => "APPEND", + Self::Put => "PUT", + Self::PutMultipart => "PUT_MULTIPART", + }; + serializer.serialize_str(variant) } } -impl<'de> serde::Deserialize<'de> for FixedSizeList { +impl<'de> serde::Deserialize<'de> for FileWriterMode { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "field_type", - "fieldType", - "list_size", - "listSize", + "APPEND", + "PUT", + "PUT_MULTIPART", ]; - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - FieldType, - ListSize, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; + struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = FileWriterMode; - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "fieldType" | "field_type" => Ok(GeneratedField::FieldType), - "listSize" | "list_size" => Ok(GeneratedField::ListSize), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "APPEND" => Ok(FileWriterMode::Append), + "PUT" => Ok(FileWriterMode::Put), + "PUT_MULTIPART" => Ok(FileWriterMode::PutMultipart), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} +impl serde::Serialize for FilterExecNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.input.is_some() { + len += 1; + } + if self.expr.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.FilterExecNode", len)?; + if let Some(v) = self.input.as_ref() { + struct_ser.serialize_field("input", v)?; + } + if let Some(v) = self.expr.as_ref() { + struct_ser.serialize_field("expr", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for FilterExecNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "input", + "expr", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Input, + Expr, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "input" => Ok(GeneratedField::Input), + "expr" => Ok(GeneratedField::Expr), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = FilterExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.FilterExecNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut input__ = None; + let mut expr__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Input => { + if input__.is_some() { + return Err(serde::de::Error::duplicate_field("input")); + } + input__ = map_.next_value()?; + } + GeneratedField::Expr => { + if expr__.is_some() { + return Err(serde::de::Error::duplicate_field("expr")); + } + expr__ = map_.next_value()?; + } + } + } + Ok(FilterExecNode { + input: input__, + expr: expr__, + }) + } + } + deserializer.deserialize_struct("datafusion.FilterExecNode", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for FixedSizeBinary { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.length != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.FixedSizeBinary", len)?; + if self.length != 0 { + struct_ser.serialize_field("length", &self.length)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for FixedSizeBinary { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "length", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Length, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "length" => Ok(GeneratedField::Length), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = FixedSizeBinary; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.FixedSizeBinary") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut length__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Length => { + if length__.is_some() { + return Err(serde::de::Error::duplicate_field("length")); + } + length__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(FixedSizeBinary { + length: length__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.FixedSizeBinary", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for FixedSizeList { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.field_type.is_some() { + len += 1; + } + if self.list_size != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.FixedSizeList", len)?; + if let Some(v) = self.field_type.as_ref() { + struct_ser.serialize_field("fieldType", v)?; + } + if self.list_size != 0 { + struct_ser.serialize_field("listSize", &self.list_size)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for FixedSizeList { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "field_type", + "fieldType", + "list_size", + "listSize", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + FieldType, + ListSize, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "fieldType" | "field_type" => Ok(GeneratedField::FieldType), + "listSize" | "list_size" => Ok(GeneratedField::ListSize), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } } @@ -10091,119 +10597,447 @@ impl<'de> serde::Deserialize<'de> for JoinSide { }) } - fn visit_u64(self, v: u64) -> std::result::Result - where - E: serde::de::Error, - { - i32::try_from(v) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_else(|| { - serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) - }) + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "LEFT_SIDE" => Ok(JoinSide::LeftSide), + "RIGHT_SIDE" => Ok(JoinSide::RightSide), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} +impl serde::Serialize for JoinType { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::Inner => "INNER", + Self::Left => "LEFT", + Self::Right => "RIGHT", + Self::Full => "FULL", + Self::Leftsemi => "LEFTSEMI", + Self::Leftanti => "LEFTANTI", + Self::Rightsemi => "RIGHTSEMI", + Self::Rightanti => "RIGHTANTI", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for JoinType { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "INNER", + "LEFT", + "RIGHT", + "FULL", + "LEFTSEMI", + "LEFTANTI", + "RIGHTSEMI", + "RIGHTANTI", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = JoinType; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "INNER" => Ok(JoinType::Inner), + "LEFT" => Ok(JoinType::Left), + "RIGHT" => Ok(JoinType::Right), + "FULL" => Ok(JoinType::Full), + "LEFTSEMI" => Ok(JoinType::Leftsemi), + "LEFTANTI" => Ok(JoinType::Leftanti), + "RIGHTSEMI" => Ok(JoinType::Rightsemi), + "RIGHTANTI" => Ok(JoinType::Rightanti), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} +impl serde::Serialize for JsonSink { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.config.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.JsonSink", len)?; + if let Some(v) = self.config.as_ref() { + struct_ser.serialize_field("config", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for JsonSink { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "config", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Config, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "config" => Ok(GeneratedField::Config), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = JsonSink; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.JsonSink") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut config__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Config => { + if config__.is_some() { + return Err(serde::de::Error::duplicate_field("config")); + } + config__ = map_.next_value()?; + } + } + } + Ok(JsonSink { + config: config__, + }) + } + } + deserializer.deserialize_struct("datafusion.JsonSink", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for JsonSinkExecNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.input.is_some() { + len += 1; + } + if self.sink.is_some() { + len += 1; + } + if self.sink_schema.is_some() { + len += 1; + } + if self.sort_order.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.JsonSinkExecNode", len)?; + if let Some(v) = self.input.as_ref() { + struct_ser.serialize_field("input", v)?; + } + if let Some(v) = self.sink.as_ref() { + struct_ser.serialize_field("sink", v)?; + } + if let Some(v) = self.sink_schema.as_ref() { + struct_ser.serialize_field("sinkSchema", v)?; + } + if let Some(v) = self.sort_order.as_ref() { + struct_ser.serialize_field("sortOrder", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for JsonSinkExecNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "input", + "sink", + "sink_schema", + "sinkSchema", + "sort_order", + "sortOrder", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Input, + Sink, + SinkSchema, + SortOrder, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "input" => Ok(GeneratedField::Input), + "sink" => Ok(GeneratedField::Sink), + "sinkSchema" | "sink_schema" => Ok(GeneratedField::SinkSchema), + "sortOrder" | "sort_order" => Ok(GeneratedField::SortOrder), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = JsonSinkExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.JsonSinkExecNode") } - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, { - match value { - "LEFT_SIDE" => Ok(JoinSide::LeftSide), - "RIGHT_SIDE" => Ok(JoinSide::RightSide), - _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + let mut input__ = None; + let mut sink__ = None; + let mut sink_schema__ = None; + let mut sort_order__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Input => { + if input__.is_some() { + return Err(serde::de::Error::duplicate_field("input")); + } + input__ = map_.next_value()?; + } + GeneratedField::Sink => { + if sink__.is_some() { + return Err(serde::de::Error::duplicate_field("sink")); + } + sink__ = map_.next_value()?; + } + GeneratedField::SinkSchema => { + if sink_schema__.is_some() { + return Err(serde::de::Error::duplicate_field("sinkSchema")); + } + sink_schema__ = map_.next_value()?; + } + GeneratedField::SortOrder => { + if sort_order__.is_some() { + return Err(serde::de::Error::duplicate_field("sortOrder")); + } + sort_order__ = map_.next_value()?; + } + } } + Ok(JsonSinkExecNode { + input: input__, + sink: sink__, + sink_schema: sink_schema__, + sort_order: sort_order__, + }) } } - deserializer.deserialize_any(GeneratedVisitor) + deserializer.deserialize_struct("datafusion.JsonSinkExecNode", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for JoinType { +impl serde::Serialize for JsonWriterOptions { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where S: serde::Serializer, { - let variant = match self { - Self::Inner => "INNER", - Self::Left => "LEFT", - Self::Right => "RIGHT", - Self::Full => "FULL", - Self::Leftsemi => "LEFTSEMI", - Self::Leftanti => "LEFTANTI", - Self::Rightsemi => "RIGHTSEMI", - Self::Rightanti => "RIGHTANTI", - }; - serializer.serialize_str(variant) + use serde::ser::SerializeStruct; + let mut len = 0; + if self.compression != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.JsonWriterOptions", len)?; + if self.compression != 0 { + let v = CompressionTypeVariant::try_from(self.compression) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?; + struct_ser.serialize_field("compression", &v)?; + } + struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for JoinType { +impl<'de> serde::Deserialize<'de> for JsonWriterOptions { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "INNER", - "LEFT", - "RIGHT", - "FULL", - "LEFTSEMI", - "LEFTANTI", - "RIGHTSEMI", - "RIGHTANTI", + "compression", ]; - struct GeneratedVisitor; + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Compression, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = JoinType; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } - fn visit_i64(self, v: i64) -> std::result::Result - where - E: serde::de::Error, - { - i32::try_from(v) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_else(|| { - serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) - }) + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "compression" => Ok(GeneratedField::Compression), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = JsonWriterOptions; - fn visit_u64(self, v: u64) -> std::result::Result - where - E: serde::de::Error, - { - i32::try_from(v) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_else(|| { - serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) - }) + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.JsonWriterOptions") } - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, { - match value { - "INNER" => Ok(JoinType::Inner), - "LEFT" => Ok(JoinType::Left), - "RIGHT" => Ok(JoinType::Right), - "FULL" => Ok(JoinType::Full), - "LEFTSEMI" => Ok(JoinType::Leftsemi), - "LEFTANTI" => Ok(JoinType::Leftanti), - "RIGHTSEMI" => Ok(JoinType::Rightsemi), - "RIGHTANTI" => Ok(JoinType::Rightanti), - _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + let mut compression__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Compression => { + if compression__.is_some() { + return Err(serde::de::Error::duplicate_field("compression")); + } + compression__ = Some(map_.next_value::()? as i32); + } + } } + Ok(JsonWriterOptions { + compression: compression__.unwrap_or_default(), + }) } } - deserializer.deserialize_any(GeneratedVisitor) + deserializer.deserialize_struct("datafusion.JsonWriterOptions", FIELDS, GeneratedVisitor) } } impl serde::Serialize for LikeNode { @@ -14141,6 +14975,115 @@ impl<'de> serde::Deserialize<'de> for PartiallySortedPartitionSearchMode { deserializer.deserialize_struct("datafusion.PartiallySortedPartitionSearchMode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PartitionColumn { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.name.is_empty() { + len += 1; + } + if self.arrow_type.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PartitionColumn", len)?; + if !self.name.is_empty() { + struct_ser.serialize_field("name", &self.name)?; + } + if let Some(v) = self.arrow_type.as_ref() { + struct_ser.serialize_field("arrowType", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PartitionColumn { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "name", + "arrow_type", + "arrowType", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Name, + ArrowType, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "name" => Ok(GeneratedField::Name), + "arrowType" | "arrow_type" => Ok(GeneratedField::ArrowType), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PartitionColumn; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PartitionColumn") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut name__ = None; + let mut arrow_type__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Name => { + if name__.is_some() { + return Err(serde::de::Error::duplicate_field("name")); + } + name__ = Some(map_.next_value()?); + } + GeneratedField::ArrowType => { + if arrow_type__.is_some() { + return Err(serde::de::Error::duplicate_field("arrowType")); + } + arrow_type__ = map_.next_value()?; + } + } + } + Ok(PartitionColumn { + name: name__.unwrap_or_default(), + arrow_type: arrow_type__, + }) + } + } + deserializer.deserialize_struct("datafusion.PartitionColumn", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PartitionMode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16812,6 +17755,9 @@ impl serde::Serialize for PhysicalPlanNode { physical_plan_node::PhysicalPlanType::Analyze(v) => { struct_ser.serialize_field("analyze", v)?; } + physical_plan_node::PhysicalPlanType::JsonSink(v) => { + struct_ser.serialize_field("jsonSink", v)?; + } } } struct_ser.end() @@ -16856,6 +17802,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "nested_loop_join", "nestedLoopJoin", "analyze", + "json_sink", + "jsonSink", ]; #[allow(clippy::enum_variant_names)] @@ -16882,6 +17830,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { SortPreservingMerge, NestedLoopJoin, Analyze, + JsonSink, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16925,6 +17874,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "sortPreservingMerge" | "sort_preserving_merge" => Ok(GeneratedField::SortPreservingMerge), "nestedLoopJoin" | "nested_loop_join" => Ok(GeneratedField::NestedLoopJoin), "analyze" => Ok(GeneratedField::Analyze), + "jsonSink" | "json_sink" => Ok(GeneratedField::JsonSink), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -17099,6 +18049,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { return Err(serde::de::Error::duplicate_field("analyze")); } physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Analyze) +; + } + GeneratedField::JsonSink => { + if physical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("jsonSink")); + } + physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::JsonSink) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d18bacfb3bcc..75050e9d3dfa 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1486,7 +1486,7 @@ pub mod owned_table_reference { pub struct PhysicalPlanNode { #[prost( oneof = "physical_plan_node::PhysicalPlanType", - tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23" + tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24" )] pub physical_plan_type: ::core::option::Option, } @@ -1541,10 +1541,83 @@ pub mod physical_plan_node { NestedLoopJoin(::prost::alloc::boxed::Box), #[prost(message, tag = "23")] Analyze(::prost::alloc::boxed::Box), + #[prost(message, tag = "24")] + JsonSink(::prost::alloc::boxed::Box), + } +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PartitionColumn { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub arrow_type: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FileTypeWriterOptions { + #[prost(oneof = "file_type_writer_options::FileType", tags = "1")] + pub file_type: ::core::option::Option, +} +/// Nested message and enum types in `FileTypeWriterOptions`. +pub mod file_type_writer_options { + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum FileType { + #[prost(message, tag = "1")] + JsonOptions(super::JsonWriterOptions), } } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct JsonWriterOptions { + #[prost(enumeration = "CompressionTypeVariant", tag = "1")] + pub compression: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FileSinkConfig { + #[prost(string, tag = "1")] + pub object_store_url: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub file_groups: ::prost::alloc::vec::Vec, + #[prost(string, repeated, tag = "3")] + pub table_paths: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(message, optional, tag = "4")] + pub output_schema: ::core::option::Option, + #[prost(message, repeated, tag = "5")] + pub table_partition_cols: ::prost::alloc::vec::Vec, + #[prost(enumeration = "FileWriterMode", tag = "6")] + pub writer_mode: i32, + #[prost(bool, tag = "7")] + pub single_file_output: bool, + #[prost(bool, tag = "8")] + pub unbounded_input: bool, + #[prost(bool, tag = "9")] + pub overwrite: bool, + #[prost(message, optional, tag = "10")] + pub file_type_writer_options: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JsonSink { + #[prost(message, optional, tag = "1")] + pub config: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JsonSinkExecNode { + #[prost(message, optional, boxed, tag = "1")] + pub input: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(message, optional, tag = "2")] + pub sink: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub sink_schema: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub sort_order: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExtensionNode { #[prost(bytes = "vec", tag = "1")] pub node: ::prost::alloc::vec::Vec, @@ -3078,6 +3151,70 @@ impl UnionMode { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] +pub enum FileWriterMode { + Append = 0, + Put = 1, + PutMultipart = 2, +} +impl FileWriterMode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + FileWriterMode::Append => "APPEND", + FileWriterMode::Put => "PUT", + FileWriterMode::PutMultipart => "PUT_MULTIPART", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "APPEND" => Some(Self::Append), + "PUT" => Some(Self::Put), + "PUT_MULTIPART" => Some(Self::PutMultipart), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum CompressionTypeVariant { + Gzip = 0, + Bzip2 = 1, + Xz = 2, + Zstd = 3, + Uncompressed = 4, +} +impl CompressionTypeVariant { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + CompressionTypeVariant::Gzip => "GZIP", + CompressionTypeVariant::Bzip2 => "BZIP2", + CompressionTypeVariant::Xz => "XZ", + CompressionTypeVariant::Zstd => "ZSTD", + CompressionTypeVariant::Uncompressed => "UNCOMPRESSED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "GZIP" => Some(Self::Gzip), + "BZIP2" => Some(Self::Bzip2), + "XZ" => Some(Self::Xz), + "ZSTD" => Some(Self::Zstd), + "UNCOMPRESSED" => Some(Self::Uncompressed), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] pub enum PartitionMode { CollectLeft = 0, Partitioned = 1, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index a956eded9032..a628523f0e74 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -23,9 +23,11 @@ use std::sync::Arc; use arrow::compute::SortOptions; use datafusion::arrow::datatypes::Schema; -use datafusion::datasource::listing::{FileRange, PartitionedFile}; +use datafusion::datasource::file_format::json::JsonSink; +use datafusion::datasource::file_format::write::FileWriterMode; +use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use datafusion::execution::context::ExecutionProps; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::window_function::WindowFunction; @@ -39,8 +41,12 @@ use datafusion::physical_plan::windows::create_window_expr; use datafusion::physical_plan::{ functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr, }; +use datafusion_common::file_options::json_writer::JsonWriterOptions; +use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; -use datafusion_common::{not_impl_err, DataFusionError, JoinSide, Result, ScalarValue}; +use datafusion_common::{ + not_impl_err, DataFusionError, FileTypeWriterOptions, JoinSide, Result, ScalarValue, +}; use crate::common::proto_error; use crate::convert_required; @@ -697,3 +703,86 @@ impl TryFrom<&protobuf::Statistics> for Statistics { }) } } + +impl TryFrom<&protobuf::JsonSink> for JsonSink { + type Error = DataFusionError; + + fn try_from(value: &protobuf::JsonSink) -> Result { + Ok(Self::new(convert_required!(value.config)?)) + } +} + +impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { + type Error = DataFusionError; + + fn try_from(conf: &protobuf::FileSinkConfig) -> Result { + let file_groups = conf + .file_groups + .iter() + .map(TryInto::try_into) + .collect::>>()?; + let table_paths = conf + .table_paths + .iter() + .map(ListingTableUrl::parse) + .collect::>>()?; + let table_partition_cols = conf + .table_partition_cols + .iter() + .map(|protobuf::PartitionColumn { name, arrow_type }| { + let data_type = convert_required!(arrow_type)?; + Ok((name.clone(), data_type)) + }) + .collect::>>()?; + Ok(Self { + object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?, + file_groups, + table_paths, + output_schema: Arc::new(convert_required!(conf.output_schema)?), + table_partition_cols, + writer_mode: conf.writer_mode().into(), + single_file_output: conf.single_file_output, + unbounded_input: conf.unbounded_input, + overwrite: conf.overwrite, + file_type_writer_options: convert_required!(conf.file_type_writer_options)?, + }) + } +} + +impl From for FileWriterMode { + fn from(value: protobuf::FileWriterMode) -> Self { + match value { + protobuf::FileWriterMode::Append => Self::Append, + protobuf::FileWriterMode::Put => Self::Put, + protobuf::FileWriterMode::PutMultipart => Self::PutMultipart, + } + } +} + +impl From for CompressionTypeVariant { + fn from(value: protobuf::CompressionTypeVariant) -> Self { + match value { + protobuf::CompressionTypeVariant::Gzip => Self::GZIP, + protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2, + protobuf::CompressionTypeVariant::Xz => Self::XZ, + protobuf::CompressionTypeVariant::Zstd => Self::ZSTD, + protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED, + } + } +} + +impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions { + type Error = DataFusionError; + + fn try_from(value: &protobuf::FileTypeWriterOptions) -> Result { + let file_type = value + .file_type + .as_ref() + .ok_or_else(|| proto_error("Missing required field in protobuf"))?; + match file_type { + protobuf::file_type_writer_options::FileType::JsonOptions(opts) => Ok( + Self::JSON(JsonWriterOptions::new(opts.compression().into())), + ), + } + } +} diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 431b8e42cdaf..1eedbe987ec1 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use datafusion::arrow::compute::SortOptions; use datafusion::arrow::datatypes::SchemaRef; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; +use datafusion::datasource::file_format::json::JsonSink; #[cfg(feature = "parquet")] use datafusion::datasource::physical_plan::ParquetExec; use datafusion::datasource::physical_plan::{AvroExec, CsvExec}; @@ -36,6 +37,7 @@ use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::explain::ExplainExec; use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr}; use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::insert::FileSinkExec; use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion::physical_plan::joins::{CrossJoinExec, NestedLoopJoinExec}; use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; @@ -64,7 +66,9 @@ use crate::protobuf::physical_aggregate_expr_node::AggregateFunction; use crate::protobuf::physical_expr_node::ExprType; use crate::protobuf::physical_plan_node::PhysicalPlanType; use crate::protobuf::repartition_exec_node::PartitionMethod; -use crate::protobuf::{self, window_agg_exec_node, PhysicalPlanNode}; +use crate::protobuf::{ + self, window_agg_exec_node, PhysicalPlanNode, PhysicalSortExprNodeCollection, +}; use crate::{convert_required, into_required}; use self::from_proto::parse_physical_window_expr; @@ -782,7 +786,38 @@ impl AsExecutionPlan for PhysicalPlanNode { analyze.verbose, analyze.show_statistics, input, - Arc::new(analyze.schema.as_ref().unwrap().try_into()?), + Arc::new(convert_required!(analyze.schema)?), + ))) + } + PhysicalPlanType::JsonSink(sink) => { + let input = + into_physical_plan(&sink.input, registry, runtime, extension_codec)?; + + let data_sink: JsonSink = sink + .sink + .as_ref() + .ok_or_else(|| proto_error("Missing required field in protobuf"))? + .try_into()?; + let sink_schema = convert_required!(sink.sink_schema)?; + let sort_order = sink + .sort_order + .as_ref() + .map(|collection| { + collection + .physical_sort_expr_nodes + .iter() + .map(|proto| { + parse_physical_sort_expr(proto, registry, &sink_schema) + .map(Into::into) + }) + .collect::>>() + }) + .transpose()?; + Ok(Arc::new(FileSinkExec::new( + input, + Arc::new(data_sink), + Arc::new(sink_schema), + sort_order, ))) } } @@ -1415,6 +1450,48 @@ impl AsExecutionPlan for PhysicalPlanNode { }); } + if let Some(exec) = plan.downcast_ref::() { + let input = protobuf::PhysicalPlanNode::try_from_physical_plan( + exec.input().to_owned(), + extension_codec, + )?; + let sort_order = match exec.sort_order() { + Some(requirements) => { + let expr = requirements + .iter() + .map(|requirement| { + let expr: PhysicalSortExpr = requirement.to_owned().into(); + let sort_expr = protobuf::PhysicalSortExprNode { + expr: Some(Box::new(expr.expr.to_owned().try_into()?)), + asc: !expr.options.descending, + nulls_first: expr.options.nulls_first, + }; + Ok(sort_expr) + }) + .collect::>>()?; + Some(PhysicalSortExprNodeCollection { + physical_sort_expr_nodes: expr, + }) + } + None => None, + }; + + if let Some(sink) = exec.sink().as_any().downcast_ref::() { + return Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new( + protobuf::JsonSinkExecNode { + input: Some(Box::new(input)), + sink: Some(sink.try_into()?), + sink_schema: Some(exec.schema().as_ref().try_into()?), + sort_order, + }, + ))), + }); + } + + // If unknown DataSink then let extension handle it + } + let mut buf: Vec = vec![]; match extension_codec.try_encode(plan_clone.clone(), &mut buf) { Ok(_) => { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 114baab6ccc4..8201ef86b528 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -27,9 +27,14 @@ use crate::protobuf::{ physical_aggregate_expr_node, PhysicalSortExprNode, PhysicalSortExprNodeCollection, ScalarValue, }; - -use datafusion::datasource::listing::{FileRange, PartitionedFile}; -use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::{ + file_format::json::JsonSink, physical_plan::FileScanConfig, +}; +use datafusion::datasource::{ + file_format::write::FileWriterMode, + listing::{FileRange, PartitionedFile}, + physical_plan::FileSinkConfig, +}; use datafusion::logical_expr::BuiltinScalarFunction; use datafusion::physical_expr::expressions::{GetFieldAccessExpr, GetIndexedFieldExpr}; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; @@ -50,7 +55,15 @@ use datafusion::physical_plan::{ AggregateExpr, ColumnStatistics, PhysicalExpr, Statistics, WindowExpr, }; use datafusion_common::{ - internal_err, not_impl_err, stats::Precision, DataFusionError, JoinSide, Result, + file_options::{ + arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions, + csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions, + parquet_writer::ParquetWriterOptions, + }, + internal_err, not_impl_err, + parsers::CompressionTypeVariant, + stats::Precision, + DataFusionError, FileTypeWriterOptions, JoinSide, Result, }; impl TryFrom> for protobuf::PhysicalExprNode { @@ -790,3 +803,110 @@ impl TryFrom for protobuf::PhysicalSortExprNode { }) } } + +impl TryFrom<&JsonSink> for protobuf::JsonSink { + type Error = DataFusionError; + + fn try_from(value: &JsonSink) -> Result { + Ok(Self { + config: Some(value.config().try_into()?), + }) + } +} + +impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { + type Error = DataFusionError; + + fn try_from(conf: &FileSinkConfig) -> Result { + let writer_mode: protobuf::FileWriterMode = conf.writer_mode.into(); + let file_groups = conf + .file_groups + .iter() + .map(TryInto::try_into) + .collect::>>()?; + let table_paths = conf + .table_paths + .iter() + .map(ToString::to_string) + .collect::>(); + let table_partition_cols = conf + .table_partition_cols + .iter() + .map(|(name, data_type)| { + Ok(protobuf::PartitionColumn { + name: name.to_owned(), + arrow_type: Some(data_type.try_into()?), + }) + }) + .collect::>>()?; + let file_type_writer_options = &conf.file_type_writer_options; + Ok(Self { + object_store_url: conf.object_store_url.to_string(), + file_groups, + table_paths, + output_schema: Some(conf.output_schema.as_ref().try_into()?), + table_partition_cols, + writer_mode: writer_mode.into(), + single_file_output: conf.single_file_output, + unbounded_input: conf.unbounded_input, + overwrite: conf.overwrite, + file_type_writer_options: Some(file_type_writer_options.try_into()?), + }) + } +} + +impl From for protobuf::FileWriterMode { + fn from(value: FileWriterMode) -> Self { + match value { + FileWriterMode::Append => Self::Append, + FileWriterMode::Put => Self::Put, + FileWriterMode::PutMultipart => Self::PutMultipart, + } + } +} + +impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant { + fn from(value: &CompressionTypeVariant) -> Self { + match value { + CompressionTypeVariant::GZIP => Self::Gzip, + CompressionTypeVariant::BZIP2 => Self::Bzip2, + CompressionTypeVariant::XZ => Self::Xz, + CompressionTypeVariant::ZSTD => Self::Zstd, + CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed, + } + } +} + +impl TryFrom<&FileTypeWriterOptions> for protobuf::FileTypeWriterOptions { + type Error = DataFusionError; + + fn try_from(opts: &FileTypeWriterOptions) -> Result { + let file_type = match opts { + #[cfg(feature = "parquet")] + FileTypeWriterOptions::Parquet(ParquetWriterOptions { + writer_options: _, + }) => return not_impl_err!("Parquet file sink protobuf serialization"), + FileTypeWriterOptions::CSV(CsvWriterOptions { + writer_options: _, + compression: _, + }) => return not_impl_err!("CSV file sink protobuf serialization"), + FileTypeWriterOptions::JSON(JsonWriterOptions { compression }) => { + let compression: protobuf::CompressionTypeVariant = compression.into(); + protobuf::file_type_writer_options::FileType::JsonOptions( + protobuf::JsonWriterOptions { + compression: compression.into(), + }, + ) + } + FileTypeWriterOptions::Avro(AvroWriterOptions {}) => { + return not_impl_err!("Avro file sink protobuf serialization") + } + FileTypeWriterOptions::Arrow(ArrowWriterOptions {}) => { + return not_impl_err!("Arrow file sink protobuf serialization") + } + }; + Ok(Self { + file_type: Some(file_type), + }) + } +} diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 01a0916d8cd2..81e66d5ead36 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -21,15 +21,19 @@ use std::sync::Arc; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::compute::kernels::sort::SortOptions; use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema}; -use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::file_format::json::JsonSink; +use datafusion::datasource::file_format::write::FileWriterMode; +use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +use datafusion::datasource::physical_plan::{ + FileScanConfig, FileSinkConfig, ParquetExec, +}; use datafusion::execution::context::ExecutionProps; use datafusion::logical_expr::{ create_udf, BuiltinScalarFunction, JoinType, Operator, Volatility, }; use datafusion::physical_expr::window::SlidingAggregateWindowExpr; -use datafusion::physical_expr::ScalarFunctionExpr; +use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr}; use datafusion::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -41,6 +45,7 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::functions::make_scalar_function; +use datafusion::physical_plan::insert::FileSinkExec; use datafusion::physical_plan::joins::{HashJoinExec, NestedLoopJoinExec, PartitionMode}; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion::physical_plan::projection::ProjectionExec; @@ -53,8 +58,10 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; +use datafusion_common::file_options::json_writer::JsonWriterOptions; +use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; -use datafusion_common::Result; +use datafusion_common::{FileTypeWriterOptions, Result}; use datafusion_expr::{ Accumulator, AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature, StateTypeFunction, WindowFrame, WindowFrameBound, @@ -698,7 +705,7 @@ fn roundtrip_get_indexed_field_list_range() -> Result<()> { } #[test] -fn rountrip_analyze() -> Result<()> { +fn roundtrip_analyze() -> Result<()> { let field_a = Field::new("plan_type", DataType::Utf8, false); let field_b = Field::new("plan", DataType::Utf8, false); let schema = Schema::new(vec![field_a, field_b]); @@ -711,3 +718,41 @@ fn rountrip_analyze() -> Result<()> { Arc::new(schema), ))) } + +#[test] +fn roundtrip_json_sink() -> Result<()> { + let field_a = Field::new("plan_type", DataType::Utf8, false); + let field_b = Field::new("plan", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let input = Arc::new(EmptyExec::new(true, schema.clone())); + + let file_sink_config = FileSinkConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], + table_paths: vec![ListingTableUrl::parse("file:///")?], + output_schema: schema.clone(), + table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], + writer_mode: FileWriterMode::Put, + single_file_output: true, + unbounded_input: false, + overwrite: true, + file_type_writer_options: FileTypeWriterOptions::JSON(JsonWriterOptions::new( + CompressionTypeVariant::UNCOMPRESSED, + )), + }; + let data_sink = Arc::new(JsonSink::new(file_sink_config)); + let sort_order = vec![PhysicalSortRequirement::new( + Arc::new(Column::new("plan_type", 0)), + Some(SortOptions { + descending: true, + nulls_first: false, + }), + )]; + + roundtrip_test(Arc::new(FileSinkExec::new( + input, + data_sink, + schema.clone(), + Some(sort_order), + ))) +} diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index f2fe216ee864..6e4a711a0115 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -32,7 +32,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_output=false options: (compression 'zstd(10)') --TableScan: source_table projection=[col1, col2] physical_plan -InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Error case diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 066a31590ccd..766b983a7962 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] physical_plan -InsertExec: sink=CsvSink(writer_mode=Append, file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]) +FileSinkExec: sink=CsvSink(writer_mode=Append, file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]) --ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c5@4 as c5, c6@5 as c6, c7@6 as c7, c8@7 as c8, c9@8 as c9, c10@9 as c10, c11@10 as c11, c12@11 as c12, c13@12 as c13] ----SortExec: expr=[c1@0 ASC NULLS LAST] ------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index cc04c6227721..0c63a3481996 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -64,7 +64,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -InsertExec: sink=MemoryTable (partitions=1) +FileSinkExec: sink=MemoryTable (partitions=1) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -125,7 +125,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -InsertExec: sink=MemoryTable (partitions=1) +FileSinkExec: sink=MemoryTable (partitions=1) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] @@ -175,7 +175,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -InsertExec: sink=MemoryTable (partitions=8) +FileSinkExec: sink=MemoryTable (partitions=8) --ProjectionExec: expr=[a1@0 as a1, a2@1 as a2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1] @@ -217,7 +217,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -InsertExec: sink=MemoryTable (partitions=1) +FileSinkExec: sink=MemoryTable (partitions=1) --ProjectionExec: expr=[c1@0 as c1] ----SortExec: expr=[c1@0 ASC NULLS LAST] ------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 8b01a14568e7..fa1d646d1413 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -100,7 +100,7 @@ Dml: op=[Insert Into] table=[ordered_insert_test] --Projection: column1 AS a, column2 AS b ----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))... physical_plan -InsertExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[]) --SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] ----ProjectionExec: expr=[column1@0 as a, column2@1 as b] ------ValuesExec @@ -315,7 +315,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -378,7 +378,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] @@ -422,7 +422,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) --ProjectionExec: expr=[c1@0 as c1] ----SortExec: expr=[c1@0 ASC NULLS LAST] ------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true