diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index d43d9fdcea8e..0a13dc3c0136 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -806,6 +806,8 @@ The server successfully detected this situation and will download merged part fr M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \ M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \ M(GWPAsanFree, "Number of free operations done by GWPAsan") \ + M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.") \ + M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.") \ #ifdef APPLY_FOR_EXTERNAL_EVENTS diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index a52be0c8ba9c..c3f3b7384761 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -166,7 +166,8 @@ namespace DB M(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overridden by a merge tree setting)", 0) \ M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \ M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \ - M(Bool, disable_insertion_and_mutation, false, "Disable all insert/alter/delete queries. This setting will be enabled if someone needs read-only nodes to prevent insertion and mutation affect reading performance.", 0) + M(Bool, disable_insertion_and_mutation, false, "Disable all insert/alter/delete queries. This setting will be enabled if someone needs read-only nodes to prevent insertion and mutation affect reading performance.", 0) \ + M(UInt64, input_format_parquet_metadata_cache_max_entries, 100000, "Maximum number of parquet file metadata to cache.", 0) /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 4f9abb306274..1982b56178de 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -1293,6 +1293,7 @@ class IColumn; M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \ M(DateTimeOverflowBehavior, date_time_overflow_behavior, "ignore", "Overflow mode for Date, Date32, DateTime, DateTime64 types. Possible values: 'ignore', 'throw', 'saturate'.", 0) \ M(Bool, validate_experimental_and_suspicious_types_inside_nested_types, true, "Validate usage of experimental and suspicious types inside nested types like Array/Map/Tuple", 0) \ + M(Bool, input_format_parquet_use_metadata_cache, false, "Enable parquet file metadata caching.", 0) \ // End of FORMAT_FACTORY_SETTINGS diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index b3e7a59c5e2f..85917d33624a 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -92,7 +92,8 @@ static std::initializer_list +#include +#include #include #include #include @@ -34,6 +37,12 @@ namespace CurrentMetrics extern const Metric ParquetDecoderThreadsScheduled; } +namespace ProfileEvents +{ + extern const Event ParquetMetaDataCacheHits; + extern const Event ParquetMetaDataCacheMisses; +} + namespace DB { @@ -426,6 +435,15 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } +ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_cache_entries) + : CacheBase(max_cache_entries) {} + +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_cache_entries) +{ + static ParquetFileMetaDataCache instance(max_cache_entries); + return &instance; +} + ParquetBlockInputFormat::ParquetBlockInputFormat( ReadBuffer & buf, const Block & header_, @@ -450,20 +468,58 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat() pool->wait(); } -void ParquetBlockInputFormat::initializeIfNeeded() +std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() { - if (std::exchange(is_initialized, true)) + createArrowFileIfNotCreated(); + return parquet::ReadMetaData(arrow_file); +} + +std::shared_ptr ParquetBlockInputFormat::getFileMetaData() +{ + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + return readMetadataFromFile(); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_entries)->getOrSet( + metadata_cache.key, + [&]() + { + return readMetadataFromFile(); + } + ); + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + return parquet_file_metadata; +} + +void ParquetBlockInputFormat::createArrowFileIfNotCreated() +{ + if (arrow_file) + { return; + } // Create arrow file adapter. // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that // we'll need to read (which we know in advance). Use max_download_threads for that. arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); +} + +void ParquetBlockInputFormat::initializeIfNeeded() +{ + if (std::exchange(is_initialized, true)) + return; if (is_stopped) return; - metadata = parquet::ReadMetaData(arrow_file); + metadata = getFileMetaData(); std::shared_ptr schema; THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema)); @@ -494,6 +550,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast(format_settings.parquet.max_block_size)); }; + bool has_row_groups_to_read = false; + for (int row_group = 0; row_group < num_row_groups; ++row_group) { if (skip_row_groups.contains(row_group)) @@ -515,6 +573,12 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size(); auto rows = adaptive_chunk_size(row_group); row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; + has_row_groups_to_read = true; + } + + if (has_row_groups_to_read) + { + createArrowFileIfNotCreated(); } } @@ -843,6 +907,14 @@ const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const return previous_block_missing_values; } +void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings.input_format_parquet_use_metadata_cache; + metadata_cache.max_entries = server_settings.input_format_parquet_metadata_cache_max_entries; +} + + ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) : ISchemaReader(in_), format_settings(format_settings_) { diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index ed528cc077c3..edcc15e603f0 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -2,6 +2,7 @@ #include "config.h" #if USE_PARQUET +#include #include #include #include @@ -65,6 +66,8 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } + void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override; + private: Chunk read() override; @@ -83,6 +86,11 @@ class ParquetBlockInputFormat : public IInputFormat void threadFunction(size_t row_group_batch_idx); + void createArrowFileIfNotCreated(); + std::shared_ptr readMetadataFromFile(); + + std::shared_ptr getFileMetaData(); + // Data layout in the file: // // row group 0 @@ -288,6 +296,12 @@ class ParquetBlockInputFormat : public IInputFormat std::exception_ptr background_exception = nullptr; std::atomic is_stopped{0}; bool is_initialized = false; + struct Cache + { + String key; + bool use_cache = false; + UInt64 max_entries{0}; + } metadata_cache; }; class ParquetSchemaReader : public ISchemaReader @@ -306,6 +320,16 @@ class ParquetSchemaReader : public ISchemaReader std::shared_ptr metadata; }; +class ParquetFileMetaDataCache : public CacheBase +{ +public: + static ParquetFileMetaDataCache * instance(UInt64 max_cache_entries); + void clear() {} + +private: + ParquetFileMetaDataCache(UInt64 max_cache_entries); +}; + } #endif diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 04e319cd0b89..24dede94f588 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -381,6 +381,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (need_only_count) input_format->needOnlyCount(); + if (object_info->getPath().length()) + input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + builder.init(Pipe(input_format)); if (read_from_format_info.columns_description.hasDefaults()) diff --git a/tests/queries/0_stateless/03262_parquet_s3_metadata_cache.reference b/tests/queries/0_stateless/03262_parquet_s3_metadata_cache.reference new file mode 100644 index 000000000000..51fdf048b8ac --- /dev/null +++ b/tests/queries/0_stateless/03262_parquet_s3_metadata_cache.reference @@ -0,0 +1,3 @@ +10 +10 +10 diff --git a/tests/queries/0_stateless/03262_parquet_s3_metadata_cache.sql b/tests/queries/0_stateless/03262_parquet_s3_metadata_cache.sql new file mode 100644 index 000000000000..f453dcba0c66 --- /dev/null +++ b/tests/queries/0_stateless/03262_parquet_s3_metadata_cache.sql @@ -0,0 +1,28 @@ +-- Tags: no-parallel, no-fasttest + +DROP TABLE IF EXISTS t_parquet_03262; + +CREATE TABLE t_parquet_03262 (a UInt64) +ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet) +PARTITION BY a; + +INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_cache'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +DROP TABLE t_parquet_03262;