Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Bumped parquet2 (#1035)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 1, 2022
1 parent 47d5bd2 commit fe1e576
Show file tree
Hide file tree
Showing 11 changed files with 9 additions and 32 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ futures = { version = "0.3", optional = true }
ahash = { version = "0.7", optional = true }

# parquet support
parquet2 = { version = "0.12", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.13", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down
1 change: 0 additions & 1 deletion arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ fn main() -> Result<()> {

let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
for group in row_groups {
writer.write(group?)?;
}
Expand Down
1 change: 0 additions & 1 deletion benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {

let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
for group in row_groups {
writer.write(group?)?;
}
Expand Down
1 change: 0 additions & 1 deletion examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ fn write_batch(path: &str, schema: Schema, columns: Chunk<Arc<dyn Array>>) -> Re

let mut writer = FileWriter::try_new(file, schema, options)?;

writer.start()?;
for group in row_groups {
writer.write(group?)?;
}
Expand Down
1 change: 0 additions & 1 deletion examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()>
let mut writer = FileWriter::try_new(file, schema, options)?;

// Write the file.
writer.start()?;
for group in row_groups {
writer.write(group?)?;
}
Expand Down
1 change: 0 additions & 1 deletion src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ fn main() -> Result<()> {
let mut writer = FileWriter::try_new(file, schema, options)?;

// Write the file.
writer.start()?;
for group in row_groups {
writer.write(group?)?;
}
Expand Down
5 changes: 0 additions & 5 deletions src/io/parquet/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ impl<W: Write> FileWriter<W> {
})
}

/// Writes the header of the file
pub fn start(&mut self) -> Result<()> {
Ok(self.writer.start()?)
}

/// Writes a row group to the file.
pub fn write(&mut self, row_group: RowGroupIter<'_, Error>) -> Result<()> {
Ok(self.writer.write(row_group)?)
Expand Down
26 changes: 8 additions & 18 deletions src/io/parquet/write/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ where
) -> Result<Self, Error> {
let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?;
let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());
let mut writer = FileStreamer::new(
let writer = FileStreamer::new(
writer,
parquet_schema.clone(),
ParquetWriteOptions {
Expand All @@ -91,16 +91,9 @@ where
},
created_by,
);
let task = Some(
async move {
writer.start().await?;
Ok(Some(writer))
}
.boxed(),
);
Ok(Self {
writer: None,
task,
writer: Some(writer),
task: None,
options,
schema,
encoding,
Expand Down Expand Up @@ -196,7 +189,7 @@ where
match futures::ready!(this.poll_complete(cx)) {
Ok(()) => {
let writer = this.writer.take();
if let Some(writer) = writer {
if let Some(mut writer) = writer {
let meta = std::mem::take(&mut this.metadata);
let metadata = if meta.is_empty() {
None
Expand All @@ -209,13 +202,10 @@ where
};
let kv_meta = add_arrow_schema(&this.schema, metadata);

this.task = Some(
writer
.end(kv_meta)
.map_ok(|_| None)
.map_err(Error::from)
.boxed(),
);
this.task = Some(Box::pin(async move {
writer.end(kv_meta).map_err(Error::from).await?;
Ok(None)
}));
this.poll_complete(cx)
} else {
Poll::Ready(Ok(()))
Expand Down
1 change: 0 additions & 1 deletion tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,6 @@ fn integration_write(schema: &Schema, batches: &[Chunk<Arc<dyn Array>>]) -> Resu

let mut writer = FileWriter::try_new(writer, schema.clone(), options)?;

writer.start()?;
for group in row_groups {
writer.write(group?)?;
}
Expand Down
1 change: 0 additions & 1 deletion tests/it/io/parquet/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ fn read_with_indexes(
let writer = vec![];
let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
writer.write(row_group)?;
writer.end(None)?;
let data = writer.into_inner();
Expand Down
1 change: 0 additions & 1 deletion tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ fn round_trip(
let writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(writer, schema, options)?;

writer.start()?;
for group in row_groups {
writer.write(group?)?;
}
Expand Down

0 comments on commit fe1e576

Please sign in to comment.