diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp index f9caa0ccea8..0520be85d3b 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp @@ -16,22 +16,17 @@ namespace DB::DM { -DMFileReaderPool & DMFileReaderPool::instance() -{ - static DMFileReaderPool reader_pool; - return reader_pool; -} -void DMFileReaderPool::add(DMFileReader & reader) +void DMFileReaderPoolSharding::add(const String & path, DMFileReader & reader) { std::lock_guard lock(mtx); - readers[reader.path()].insert(&reader); + readers[path].insert(&reader); } -void DMFileReaderPool::del(DMFileReader & reader) +void DMFileReaderPoolSharding::del(const String & path, DMFileReader & reader) { std::lock_guard lock(mtx); - auto itr = readers.find(reader.path()); + auto itr = readers.find(path); if (itr == readers.end()) { return; @@ -43,10 +38,16 @@ void DMFileReaderPool::del(DMFileReader & reader) } } -void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col) +void DMFileReaderPoolSharding::set( + const String & path, + DMFileReader & from_reader, + int64_t col_id, + size_t start, + size_t count, + ColumnPtr & col) { std::lock_guard lock(mtx); - auto itr = readers.find(from_reader.path()); + auto itr = readers.find(path); if (itr == readers.end()) { return; @@ -62,17 +63,58 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st } // Check is there any concurrent DMFileReader with `from_reader`. -bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader) +bool DMFileReaderPoolSharding::hasConcurrentReader(const String & path) { std::lock_guard lock(mtx); - auto itr = readers.find(from_reader.path()); + auto itr = readers.find(path); return itr != readers.end() && itr->second.size() >= 2; } -DMFileReader * DMFileReaderPool::get(const std::string & name) +DMFileReader * DMFileReaderPoolSharding::get(const std::string & path) { std::lock_guard lock(mtx); - auto itr = readers.find(name); + auto itr = readers.find(path); return itr != readers.end() && !itr->second.empty() ? *(itr->second.begin()) : nullptr; } + +DMFileReaderPool & DMFileReaderPool::instance() +{ + static DMFileReaderPool reader_pool; + return reader_pool; +} + +DMFileReaderPoolSharding & DMFileReaderPool::getSharding(const String & path) +{ + return shardings[std::hash{}(path) % shardings.size()]; +} + +void DMFileReaderPool::add(DMFileReader & reader) +{ + auto path = reader.path(); + getSharding(path).add(path, reader); +} + +void DMFileReaderPool::del(DMFileReader & reader) +{ + auto path = reader.path(); + getSharding(path).del(path, reader); +} + +void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col) +{ + auto path = from_reader.path(); + getSharding(path).set(path, from_reader, col_id, start, count, col); +} + +// Check is there any concurrent DMFileReader with `from_reader`. +bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader) +{ + auto path = from_reader.path(); + return getSharding(path).hasConcurrentReader(path); +} + +DMFileReader * DMFileReaderPool::get(const std::string & path) +{ + return getSharding(path).get(path); +} } // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h index e5ec22d0545..384dcef71a4 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h @@ -218,6 +218,27 @@ class ColumnSharingCacheMap class DMFileReader; +class DMFileReaderPoolSharding +{ +public: + void add(const String & path, DMFileReader & reader); + void del(const String & path, DMFileReader & reader); + void set( + const String & path, + DMFileReader & from_reader, + int64_t col_id, + size_t start, + size_t count, + ColumnPtr & col); + bool hasConcurrentReader(const String & path); + // `get` is just for test. + DMFileReader * get(const std::string & path); + +private: + std::mutex mtx; + std::unordered_map> readers; +}; + // DMFileReaderPool holds all the DMFileReader objects, so we can easily find DMFileReader objects with the same DMFile ID. // When a DMFileReader object successfully reads a column's packs, it will try to put these packs into other DMFileReader objects' cache. class DMFileReaderPool @@ -232,14 +253,14 @@ class DMFileReaderPool void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col); bool hasConcurrentReader(DMFileReader & from_reader); // `get` is just for test. - DMFileReader * get(const std::string & name); + DMFileReader * get(const std::string & path); private: DMFileReaderPool() = default; + DMFileReaderPoolSharding & getSharding(const String & path); private: - std::mutex mtx; - std::unordered_map> readers; + std::array shardings; }; } // namespace DB::DM