Skip to content

Commit

Permalink
Merge pull request #466 from influxdata/hiltontj/filter-leftover-gen1
Browse files Browse the repository at this point in the history
fix: filter leftover gen1 files from compactor based on time
  • Loading branch information
hiltontj authored Jan 26, 2025
2 parents fd67b25 + a869992 commit 7d92329
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
2 changes: 1 addition & 1 deletion influxdb3_enterprise/buffer/src/modes/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl ChunkContainer for ReadMode {
.get_parquet_files_and_writer_markers(
&db_schema.name,
&table_def.table_name,
filter.original_filters(),
filter,
);

buffer_chunks.extend(
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_enterprise/buffer/src/modes/read_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl ChunkContainer for ReadWriteMode {
.get_parquet_files_and_writer_markers(
&db_schema.name,
&table_def.table_name,
filter.original_filters(),
filter,
);

chunks.extend(
Expand Down
22 changes: 14 additions & 8 deletions influxdb3_enterprise/compactor/src/compacted_data.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Module has a view of all compacted data from a given compactor id.
use crate::catalog::CompactedCatalog;
use datafusion::prelude::Expr;
use hashbrown::HashMap;
use influxdb3_enterprise_data_layout::persist::{get_compaction_detail, get_generation_detail};
use influxdb3_enterprise_data_layout::{
Expand All @@ -11,7 +10,7 @@ use influxdb3_enterprise_data_layout::{
};
use influxdb3_enterprise_index::memory::FileIndex;
use influxdb3_id::{DbId, TableId};
use influxdb3_write::ParquetFile;
use influxdb3_write::{ChunkFilter, ParquetFile};
use object_store::ObjectStore;
use parking_lot::RwLock;
use std::fmt::Debug;
Expand Down Expand Up @@ -143,7 +142,7 @@ impl CompactedData {
&self,
database_name: &str,
table_name: &str,
filters: &[Expr],
filter: &ChunkFilter<'_>,
) -> (Vec<Arc<ParquetFile>>, Vec<Arc<NodeSnapshotMarker>>) {
let Some(db) = self.compacted_catalog.db_schema(database_name) else {
return Default::default();
Expand All @@ -158,7 +157,7 @@ impl CompactedData {
return Default::default();
};

table.get_parquet_files_and_host_markers(filters)
table.get_parquet_files_and_host_markers(filter)
}

pub(crate) fn update_detail_with_generations(
Expand Down Expand Up @@ -373,12 +372,19 @@ impl CompactedTable {

fn get_parquet_files_and_host_markers(
&self,
filters: &[Expr],
filter: &ChunkFilter<'_>,
) -> (Vec<Arc<ParquetFile>>, Vec<Arc<NodeSnapshotMarker>>) {
let mut parquet_files = self.file_index.parquet_files_for_filter(filters);
let mut parquet_files = self
.file_index
.parquet_files_for_filter(filter.original_filters());

// add the gen1 files
for f in &self.compaction_detail.leftover_gen1_files {
// add the gen1 files, filtered by their min/max times
for f in self
.compaction_detail
.leftover_gen1_files
.iter()
.filter(|file| filter.test_time_stamp_min_max(file.file.min_time, file.file.max_time))
{
parquet_files.push(Arc::clone(&f.file));
}

Expand Down

0 comments on commit 7d92329

Please sign in to comment.