Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet File Metadata caching implementation #541

Open
wants to merge 10 commits into
base: project-antalya
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,8 @@ The server successfully detected this situation and will download merged part fr
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
M(GWPAsanFree, "Number of free operations done by GWPAsan") \
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.") \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.") \


#ifdef APPLY_FOR_EXTERNAL_EVENTS
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,8 @@ class IColumn;
M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \
M(DateTimeOverflowBehavior, date_time_overflow_behavior, "ignore", "Overflow mode for Date, Date32, DateTime, DateTime64 types. Possible values: 'ignore', 'throw', 'saturate'.", 0) \
M(Bool, validate_experimental_and_suspicious_types_inside_nested_types, true, "Validate usage of experimental and suspicious types inside nested types like Array/Map/Tuple", 0) \
M(Bool, parquet_use_metadata_cache, false, "Enable parquet file metadata caching.", 0) \
M(UInt64, parquet_metadata_cache_max_entries, 100000, "Maximum number of parquet file metadata to cache.", 0) \


// End of FORMAT_FACTORY_SETTINGS
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class IInputFormat : public SourceWithKeyCondition

void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

Expand Down
53 changes: 52 additions & 1 deletion src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#if USE_PARQUET

#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Formats/FormatFactory.h>
Expand Down Expand Up @@ -34,6 +36,12 @@ namespace CurrentMetrics
extern const Metric ParquetDecoderThreadsScheduled;
}

namespace ProfileEvents
{
extern const Event ParquetMetaDataCacheHits;
extern const Event ParquetMetaDataCacheMisses;
}

namespace DB
{

Expand Down Expand Up @@ -426,6 +434,19 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}

ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_cache_entries)
: CacheBase(max_cache_entries) {}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_cache_entries)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this has to be a singleton, but what if the user performs queries with different values for parquet_metadata_cache_max_entries?

I am still nto sure how to fix this problem AND if we need to fix it at all

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document that the cache will be sized to the setting value at "first time use"! Dynamic resizing is not possible. I have seen some settings documented like that.

Dynamic resize of cache, maybe upsize only, can be implemented later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
static ParquetFileMetaDataCache * instance = nullptr;
static std::once_flag once;
std::call_once(once, [&] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's take a step back. IIRC, static initialization in c++ 11 or greater is guaranteed to happen only once and it is thread safe.

From the cpp standard:

such a variable is initialized the first time control passes through its declaration; such a variable is considered initialized upon the completion of its initialization. [...] If control enters the declaration concurrently while the variable is being initialized, the concurrent execution shall wait for completion of the initialization.

So, wouldn't something like the below suffice?:

static ParquetFileMetaDataCache instance(max_cache_entries);
return instance;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

instance = new ParquetFileMetaDataCache(max_cache_entries);
});
return instance;
}

ParquetBlockInputFormat::ParquetBlockInputFormat(
ReadBuffer & buf,
const Block & header_,
Expand All @@ -450,6 +471,28 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat()
pool->wait();
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
{
if (!metadata_cache.use_cache || !metadata_cache.key.length())
{
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want metadata cache miss metrics being incremented when use_metadata_cache=false?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I coded the increment in the first pass, but removed it so that users need not see a new alarming "miss" metric if they are unaware/disabled the in-memory cache feature (and satisfied with the existing local disk caching feature). Let me know what you think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, I did not quite follow your explanation. As far as I can tell (by reading the code), whenever initializeIfNeeded() is called (and it is called once per file, iirc), it'll try to fetch metadata either from cache or from input source.

If cache is disabled, it'll still increment the cache miss. It doesn't look right to me at a first glance, but if there has been a discussion and this was the chosen design, that's ok.

Am I missing something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I mixed up my revisions and reasoning. I have removed the increment if cache is disabled. Thanks!

return parquet::ReadMetaData(arrow_file);
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_entries)->getOrSet(
metadata_cache.key,
[this]()
{
return parquet::ReadMetaData(arrow_file);
}
);
if (loaded)
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
else
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
return parquet_file_metadata;
}

void ParquetBlockInputFormat::initializeIfNeeded()
{
if (std::exchange(is_initialized, true))
Expand All @@ -463,7 +506,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
if (is_stopped)
return;

metadata = parquet::ReadMetaData(arrow_file);
metadata = getFileMetaData();

std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
Expand Down Expand Up @@ -843,6 +886,14 @@ const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
return previous_block_missing_values;
}

void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings.parquet_use_metadata_cache;
metadata_cache.max_entries = settings.parquet_metadata_cache_max_entries;
}


ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: ISchemaReader(in_), format_settings(format_settings_)
{
Expand Down
21 changes: 21 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "config.h"
#if USE_PARQUET

#include <Common/CacheBase.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
Expand Down Expand Up @@ -65,6 +66,8 @@ class ParquetBlockInputFormat : public IInputFormat

size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override;
arthurpassos marked this conversation as resolved.
Show resolved Hide resolved

private:
Chunk read() override;

Expand All @@ -83,6 +86,8 @@ class ParquetBlockInputFormat : public IInputFormat

void threadFunction(size_t row_group_batch_idx);

std::shared_ptr<parquet::FileMetaData> getFileMetaData();

// Data layout in the file:
//
// row group 0
Expand Down Expand Up @@ -288,6 +293,12 @@ class ParquetBlockInputFormat : public IInputFormat
std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
bool is_initialized = false;
struct Cache
{
String key;
bool use_cache = false;
UInt64 max_entries{0};
} metadata_cache;
};

class ParquetSchemaReader : public ISchemaReader
Expand All @@ -306,6 +317,16 @@ class ParquetSchemaReader : public ISchemaReader
std::shared_ptr<parquet::FileMetaData> metadata;
};

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance(UInt64 max_cache_entries);
void clear() {}

private:
ParquetFileMetaDataCache(UInt64 max_cache_entries);
};

}

#endif
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
if (need_only_count)
input_format->needOnlyCount();

if (object_info->getPath().length())
input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);

builder.init(Pipe(input_format));

if (read_from_format_info.columns_description.hasDefaults())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
10
10
10
28 changes: 28 additions & 0 deletions tests/queries/0_stateless/03262_parquet_s3_metadata_cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Tags: no-parallel, no-fasttest
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a few tests for parquet_metadata_cache_max_entries?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would have to be an integration test, maybe with 10s or 100's of parquet files. I can add it in another PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If local files also benefited from metadata cache, an integration test wouldn't be needed I suppose. But doesn't look like we want to do it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For local Parquet files, OS file cache will be in effect.


DROP TABLE IF EXISTS t_parquet_03262;

CREATE TABLE t_parquet_03262 (a UInt64)
ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet)
PARTITION BY a;

INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS parquet_use_metadata_cache=1;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS parquet_use_metadata_cache=1,custom_x='test03262';

SYSTEM FLUSH LOGS;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
FROM system.query_log
where query like '%test03262%'
AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

DROP TABLE t_parquet_03262;
Loading