-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet File Metadata caching implementation #541
base: project-antalya
Are you sure you want to change the base?
Changes from 1 commit
d63fd14
415b351
b928250
f8a2ad9
3a992b0
a78f188
9add7d8
465e96e
861bdf5
5a7a8ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ | |
|
||
#if USE_PARQUET | ||
|
||
#include <Core/Settings.h> | ||
#include <Common/ProfileEvents.h> | ||
#include <Common/logger_useful.h> | ||
#include <Common/ThreadPool.h> | ||
#include <Formats/FormatFactory.h> | ||
|
@@ -34,6 +36,12 @@ namespace CurrentMetrics | |
extern const Metric ParquetDecoderThreadsScheduled; | ||
} | ||
|
||
namespace ProfileEvents | ||
{ | ||
extern const Event ParquetMetaDataCacheHits; | ||
extern const Event ParquetMetaDataCacheMisses; | ||
} | ||
|
||
namespace DB | ||
{ | ||
|
||
|
@@ -426,6 +434,22 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa | |
return hyperrectangle; | ||
} | ||
|
||
std::mutex ParquetFileMetaDataCache::mutex; | ||
|
||
ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_cache_entries) | ||
: CacheBase(max_cache_entries) {} | ||
|
||
ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_cache_entries) | ||
{ | ||
static ParquetFileMetaDataCache * instance = nullptr; | ||
if (!instance) | ||
{ | ||
std::lock_guard lock(mutex); | ||
instance = new ParquetFileMetaDataCache(max_cache_entries); | ||
} | ||
return instance; | ||
} | ||
|
||
ParquetBlockInputFormat::ParquetBlockInputFormat( | ||
ReadBuffer & buf, | ||
const Block & header_, | ||
|
@@ -450,6 +474,28 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat() | |
pool->wait(); | ||
} | ||
|
||
std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData() | ||
{ | ||
if (!use_metadata_cache || !metadata_cache_key.length()) | ||
{ | ||
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really want metadata cache miss metrics being incremented when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I coded the increment in the first pass, but removed it so that users need not see a new alarming "miss" metric if they are unaware/disabled the in-memory cache feature (and satisfied with the existing local disk caching feature). Let me know what you think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am sorry, I did not quite follow your explanation. As far as I can tell (by reading the code), whenever If cache is disabled, it'll still increment the cache miss. It doesn't look right to me at a first glance, but if there has been a discussion and this was the chosen design, that's ok. Am I missing something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, I mixed up my revisions and reasoning. I have removed the increment if cache is disabled. Thanks! |
||
return parquet::ReadMetaData(arrow_file); | ||
} | ||
|
||
auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache_max_entries)->getOrSet( | ||
metadata_cache_key, | ||
[this]() | ||
{ | ||
return parquet::ReadMetaData(arrow_file); | ||
} | ||
); | ||
if (loaded) | ||
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); | ||
else | ||
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); | ||
return parquet_file_metadata; | ||
} | ||
|
||
void ParquetBlockInputFormat::initializeIfNeeded() | ||
{ | ||
if (std::exchange(is_initialized, true)) | ||
|
@@ -463,7 +509,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() | |
if (is_stopped) | ||
return; | ||
|
||
metadata = parquet::ReadMetaData(arrow_file); | ||
metadata = getFileMetaData(); | ||
|
||
std::shared_ptr<arrow::Schema> schema; | ||
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema)); | ||
|
@@ -843,6 +889,14 @@ const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const | |
return previous_block_missing_values; | ||
} | ||
|
||
void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) | ||
{ | ||
metadata_cache_key = key_; | ||
use_metadata_cache = settings.parquet_use_metadata_cache; | ||
metadata_cache_max_entries = settings.parquet_metadata_cache_max_entries; | ||
} | ||
|
||
|
||
ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) | ||
: ISchemaReader(in_), format_settings(format_settings_) | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
#include "config.h" | ||
#if USE_PARQUET | ||
|
||
#include <Common/CacheBase.h> | ||
#include <Processors/Formats/IInputFormat.h> | ||
#include <Processors/Formats/ISchemaReader.h> | ||
#include <Formats/FormatSettings.h> | ||
|
@@ -65,6 +66,8 @@ class ParquetBlockInputFormat : public IInputFormat | |
|
||
size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } | ||
|
||
void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; | ||
arthurpassos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private: | ||
Chunk read() override; | ||
|
||
|
@@ -83,6 +86,8 @@ class ParquetBlockInputFormat : public IInputFormat | |
|
||
void threadFunction(size_t row_group_batch_idx); | ||
|
||
std::shared_ptr<parquet::FileMetaData> getFileMetaData(); | ||
|
||
// Data layout in the file: | ||
// | ||
// row group 0 | ||
|
@@ -288,6 +293,9 @@ class ParquetBlockInputFormat : public IInputFormat | |
std::exception_ptr background_exception = nullptr; | ||
std::atomic<int> is_stopped{0}; | ||
bool is_initialized = false; | ||
String metadata_cache_key; | ||
bool use_metadata_cache = false; | ||
UInt64 metadata_cache_max_entries{0}; | ||
arthurpassos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
|
||
class ParquetSchemaReader : public ISchemaReader | ||
|
@@ -306,6 +314,17 @@ class ParquetSchemaReader : public ISchemaReader | |
std::shared_ptr<parquet::FileMetaData> metadata; | ||
}; | ||
|
||
class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData> | ||
{ | ||
public: | ||
static ParquetFileMetaDataCache * instance(UInt64 max_cache_entries); | ||
void clear() {} | ||
|
||
private: | ||
ParquetFileMetaDataCache(UInt64 max_cache_entries); | ||
static std::mutex mutex; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It appears to me Why do we need another one? And why is it static? I think I am missing something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, basic double checked locking pattern of singleton initialization. I will replace this with the modern equivalent that uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check my comment in https://github.com/Altinity/ClickHouse/pull/541/files#r1875950244. I might be missing something, tho There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
}; | ||
|
||
} | ||
|
||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
10 | ||
10 | ||
10 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
-- Tags: no-parallel, no-fasttest | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you also add a few tests for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would have to be an integration test, maybe with 10s or 100's of parquet files. I can add it in another PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If local files also benefited from metadata cache, an integration test wouldn't be needed I suppose. But doesn't look like we want to do it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For local Parquet files, OS file cache will be in effect. |
||
|
||
DROP TABLE IF EXISTS t_parquet_03262; | ||
|
||
CREATE TABLE t_parquet_03262 (a UInt64) | ||
ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet) | ||
PARTITION BY a; | ||
|
||
INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1; | ||
|
||
SELECT COUNT(*) | ||
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) | ||
SETTINGS parquet_use_metadata_cache=1; | ||
|
||
SELECT COUNT(*) | ||
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) | ||
SETTINGS parquet_use_metadata_cache=1,custom_x='test03262'; | ||
|
||
SYSTEM FLUSH LOGS; | ||
|
||
SELECT ProfileEvents['ParquetMetaDataCacheHits'] | ||
FROM system.query_log | ||
where query like '%test03262%' | ||
AND type = 'QueryFinish' | ||
ORDER BY event_time desc | ||
LIMIT 1; | ||
|
||
DROP TABLE t_parquet_03262; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this has to be a singleton, but what if the user performs queries with different values for
parquet_metadata_cache_max_entries
?I am still nto sure how to fix this problem AND if we need to fix it at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to document that the cache will be sized to the setting value at "first time use"! Dynamic resizing is not possible. I have seen some settings documented like that.
Dynamic resize of cache, maybe upsize only, can be implemented later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/Altinity/ClickHouse/pull/541/files#r1890373736