From 79b8b84c10fd893c929cd3d1dc4ec765938bb9c2 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 2 Feb 2025 13:56:36 +0300 Subject: [PATCH] fix build --- .../arrow/accessor/sub_columns/accessor.cpp | 121 ++++++-------- .../arrow/accessor/sub_columns/accessor.h | 8 +- .../accessor/sub_columns/columns_storage.h | 6 +- .../accessor/sub_columns/constructor.cpp | 53 ++++-- .../accessor/sub_columns/data_extractor.h | 16 -- .../accessor/sub_columns/direct_builder.cpp | 6 +- .../arrow/accessor/sub_columns/iterators.h | 20 ++- .../accessor/sub_columns/others_storage.h | 10 +- .../arrow/accessor/sub_columns/settings.h | 7 + .../arrow/accessor/sub_columns/stats.cpp | 26 +++ .../arrow/accessor/sub_columns/stats.h | 32 ++++ .../changes/compaction/abstract/merger.h | 4 +- .../changes/compaction/sub_columns/logic.h | 152 ++++++++++++------ .../arrow/accessor/abstract/accessor.h | 34 ++-- ydb/library/formats/arrow/arrow_helpers.cpp | 2 +- .../formats/arrow/protos/accessor.proto | 5 +- 16 files changed, 320 insertions(+), 182 deletions(-) diff --git a/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp index 33b951159fe0..e5020502cbc6 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp @@ -9,74 +9,38 @@ #include #include +#include +#include + namespace NKikimr::NArrow::NAccessor { std::vector TSubColumnsArray::DoSplitBySizes( - const TColumnLoader& loader, const TString& fullSerializedData, const std::vector& splitSizes) { - std::vector result; - auto table = Records->BuildTableVerified(); - auto rb = NArrow::ToBatch(table); - AFL_VERIFY(GetRecordsCount()); - - ui32 idxCurrent = 0; - for (ui32 i = 0; i < splitSizes.size(); ++i) { - const ui32 recordsCount = 1.0 * splitSizes[i] / fullSerializedData.size() * GetRecordsCount(); - AFL_VERIFY(recordsCount >= 1); - auto rbSlice = rb->Slice(idxCurrent, recordsCount); - auto subColumnsChunk = std::make_shared(rbSlice); - result.emplace_back(subColumnsChunk, - loader.GetAccessorConstructor()->SerializeToString(subColumnsChunk, loader.BuildAccessorContext(GetRecordsCount()))); - } - - return result; + const TColumnLoader& /*loader*/, const TString& /*fullSerializedData*/, const std::vector& /*splitSizes*/) { + AFL_VERIFY(false); + return {}; } -TSubColumnsArray::TSubColumnsArray(const std::shared_ptr& sourceArray, const std::shared_ptr& adapter) - : TBase(sourceArray->GetRecordsCount(), EType::SubColumnsArray, sourceArray->GetDataType()) { +TConclusion> TSubColumnsArray::Make( + const std::shared_ptr& sourceArray, const std::shared_ptr& adapter) { AFL_VERIFY(adapter); AFL_VERIFY(sourceArray); - TDataBuilder builder; - auto builders = NArrow::MakeBuilders(Schema, sourceArray->GetRecordsCount()); + NSubColumns::TDataBuilder builder; IChunkedArray::TReader reader(sourceArray); std::vector> storage; for (ui32 i = 0; i < reader.GetRecordsCount();) { auto address = reader.GetReadChunk(i); storage.emplace_back(address.GetArray()); - adapter->AddDataToBuilders(address.GetArray(), Schema, builders); + adapter->AddDataToBuilders(address.GetArray(), builder); i += address.GetArray()->length(); AFL_VERIFY(i <= reader.GetRecordsCount()); } - auto arrays = NArrow::Finish(std::move(builders)); - - Records = std::make_shared( - arrow::RecordBatch::Make(Schema, sourceArray->GetRecordsCount(), NArrow::Finish(std::move(builders)))); - Records->AddField(std::make_shared("__ORIGINAL", arrow::utf8()), sourceArray).Validate(); -} - -TSubColumnsArray::TSubColumnsArray( - const std::shared_ptr& schema, const std::vector& columns, const TChunkConstructionData& externalInfo) - : TBase(externalInfo.GetRecordsCount(), EType::SubColumnsArray, externalInfo.GetColumnType()) - , Schema(schema) { - AFL_VERIFY(schema); - AFL_VERIFY((ui32)schema->num_fields() == columns.size())("schema", schema->ToString())("columns", columns.size()); - std::vector> fields; - std::vector> arrays; - for (ui32 i = 0; i < (ui32)schema->num_fields(); ++i) { - fields.emplace_back(schema->field(i)); - auto loader = std::make_shared(externalInfo.GetDefaultSerializer(), std::make_shared(), - std::make_shared("__ORIGINAL", externalInfo.GetColumnType()), externalInfo.GetDefaultValue(), 0); - std::vector chunks = { TDeserializeChunkedArray::TChunk(externalInfo.GetRecordsCount(), columns[i]) }; - arrays.emplace_back(std::make_shared(externalInfo.GetRecordsCount(), loader, std::move(chunks))); - } - - Records = std::make_shared(std::move(fields), std::move(arrays)); + return builder.Finish(); } TSubColumnsArray::TSubColumnsArray(const std::shared_ptr& type, const ui32 recordsCount) - : TBase(recordsCount, EType::SubColumnsArray, type) { - Schema = std::make_shared(arrow::FieldVector({ std::make_shared("__ORIGINAL", type) })); - Records = std::make_shared(arrow::RecordBatch::Make( - Schema, recordsCount, std::vector>({ NArrow::TThreadSimpleArraysCache::GetNull(type, recordsCount) }))); + : TBase(recordsCount, EType::SubColumnsArray, type) + , ColumnsData(NSubColumns::TColumnsData::BuildEmpty(recordsCount)) + , OthersData(NSubColumns::TOthersData::BuildEmpty()) { } TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others, @@ -89,24 +53,42 @@ TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColu TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& externalInfo) const { TString blobData; NKikimrArrowAccessorProto::TSubColumnsAccessor proto; - const TString blobSchema = NArrow::SerializeSchema(*Schema); - *proto.MutableSchema()->MutableDescriptionSize() = blobSchema.size(); - AFL_VERIFY((ui32)Schema->num_fields() == Records->num_columns()); - std::vector rbBlobs; + std::vector blobRanges; + blobRanges.emplace_back(ColumnsData.GetStats().SerializeAsString(externalInfo.GetDefaultSerializer())); + proto.SetColumnStatsSize(blobRanges.back().size()); + + blobRanges.emplace_back(OthersData.GetStats().SerializeAsString(externalInfo.GetDefaultSerializer())); + proto.SetOtherStatsSize(blobRanges.back().size()); + ui32 columnIdx = 0; + for (auto&& i : ColumnsData.GetRecords()->GetColumns()) { + TChunkConstructionData cData(ColumnsData.GetRecords()->GetRecordsCount(), nullptr, arrow::utf8(), externalInfo.GetDefaultSerializer()); + blobRanges.emplace_back( + ColumnsData.GetStats().GetAccessorConstructor(columnIdx++, ColumnsData.GetRecords()->GetRecordsCount()).SerializeToString(i, cData)); + auto* cInfo = proto.AddKeyColumns(); + cInfo->SetSize(blobRanges.back().size()); + } + + for (auto&& i : OthersData.GetRecords()->GetColumns()) { + TChunkConstructionData cData(ColumnsData.GetRecords()->GetRecordsCount(), nullptr, arrow::utf8(), externalInfo.GetDefaultSerializer()); + blobRanges.emplace_back(NPlain::TConstructor().SerializeToString(i, cData)); + auto* cInfo = proto.AddOtherColumns(); + cInfo->SetSize(blobRanges.back().size()); + } + proto.SetOtherRecordsCount(OthersData.GetRecords()->GetRecordsCount()); + ui64 blobsSize = 0; - for (auto&& i : Schema->fields()) { - auto rb = NArrow::ToBatch(Records->BuildTableVerified(TGeneralContainer::TTableConstructionContext({ i->name() }))); - rbBlobs.emplace_back(externalInfo.GetDefaultSerializer()->SerializePayload(rb)); - *proto.AddColumns()->MutableDescriptionSize() = rbBlobs.back().size(); - blobsSize += rbBlobs.back().size(); + for (auto&& i : blobRanges) { + blobsSize += i.size(); } + const TString protoString = proto.SerializeAsString(); TString result; TStringOutput so(result); so.Reserve(protoString.size() + sizeof(ui32) + blobsSize); const ui32 protoSize = protoString.size(); so.Write(&protoSize, sizeof(protoSize)); - for (auto&& s : rbBlobs) { + so.Write(protoString.data(), protoSize); + for (auto&& s : blobRanges) { so.Write(s.data(), s.size()); } so.Finish(); @@ -114,37 +96,36 @@ TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& extern } IChunkedArray::TLocalDataAddress TSubColumnsArray::DoGetLocalData( - const std::optional& chunkCurrent, const ui64 position) const { + const std::optional& /*chunkCurrent*/, const ui64 /*position*/) const { auto it = BuildUnorderedIterator(); ui32 recordsCount = 0; auto builder = NArrow::MakeBuilder(arrow::binary()); - auto* binaryBuilder = static_cast(builder.get()); + auto* binaryBuilder = static_cast(builder.get()); while (it.IsValid()) { NJson::TJsonValue value; - auto onStartRecord = [](const ui32 index) { + auto onStartRecord = [&](const ui32 index) { AFL_VERIFY(recordsCount++ == index); }; - auto onFinishRecord = []() { + auto onFinishRecord = [&]() { auto bJson = NBinaryJson::SerializeToBinaryJson(value.GetStringRobust()); if (const TString* val = std::get_if(&bJson)) { AFL_VERIFY(false); } else if (const NBinaryJson::TBinaryJson* val = std::get_if(&bJson)) { - binaryBuilder->Append(val->data(), val->size()); + TStatusValidator::Validate(binaryBuilder->Append(val->data(), val->size())); } else { AFL_VERIFY(false); } }; - auto onRecordKV = [&](const ui32 index, const std::string_view value, const bool isColumn) { + auto onRecordKV = [&](const ui32 index, const std::string_view valueView, const bool isColumn) { if (isColumn) { - value.InsertValue(ColumnsData.GetStats().GetColumnName(index), value); + value.InsertValue(ColumnsData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size())); } else { - value.InsertValue(OthersData.GetStats().GetColumnName(index), value); + value.InsertValue(OthersData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size())); } }; it.ReadRecords(1, onStartRecord, onRecordKV, onFinishRecord); } - AFL_VERIFY(recordsCount == chunkCurrent.GetLength())("real", recordsCount)("expected", chunkCurrent.GetLength()); - return TLocalDataAddress(NArrow::FinishBuilder(builder), 0, 0); + return TLocalDataAddress(NArrow::FinishBuilder(std::move(builder)), 0, 0); } } // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/core/formats/arrow/accessor/sub_columns/accessor.h b/ydb/core/formats/arrow/accessor/sub_columns/accessor.h index 9584ae1bb1c1..51210eea405c 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/accessor.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/accessor.h @@ -57,12 +57,8 @@ class TSubColumnsArray: public IChunkedArray { TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others, const std::shared_ptr& type, const ui32 recordsCount); - TSubColumnsArray(const std::shared_ptr& batch); - - TSubColumnsArray(const std::shared_ptr& sourceArray, const std::shared_ptr& adapter); - - TSubColumnsArray( - const std::shared_ptr& schema, const std::vector& columns, const TChunkConstructionData& externalInfo); + static TConclusion> Make( + const std::shared_ptr& sourceArray, const std::shared_ptr& adapter); TSubColumnsArray(const std::shared_ptr& type, const ui32 recordsCount); diff --git a/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h b/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h index 67ef8c5cbdf6..a9ae90cd4d1d 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h @@ -13,9 +13,13 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns { class TColumnsData { private: TDictStats Stats; - std::shared_ptr Records; + YDB_READONLY_DEF(std::shared_ptr, Records); public: + static TColumnsData BuildEmpty(const ui32 recordsCount) { + return TColumnsData(TDictStats::BuildEmpty(), std::make_shared(recordsCount)); + } + ui64 GetRawSize() const { return Records->GetRawSizeVerified(); } diff --git a/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp index 9458dccd0405..562c62ad180e 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp @@ -1,6 +1,9 @@ #include "accessor.h" #include "constructor.h" +#include +#include + namespace NKikimr::NArrow::NAccessor::NSubColumns { TConclusion> TConstructor::DoConstructDefault(const TChunkConstructionData& externalInfo) const { @@ -19,19 +22,49 @@ TConclusion> TConstructor::DoDeserializeFromStrin return TConclusionStatus::Fail("cannot parse proto"); } currentIndex += protoSize; - std::shared_ptr columnStats = externalInfo.GetDefaultSerializer()->Deserialize( - TStringBuf(originalData.data() + currentIndex, proto.GetColumnStatsSize()), TDictStats::GetSchema()); + std::shared_ptr rbColumnStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize( + TString(originalData.data() + currentIndex, proto.GetColumnStatsSize()), TDictStats::GetSchema())); + TDictStats columnStats(rbColumnStats); currentIndex += proto.GetColumnStatsSize(); - std::shared_ptr otherStats = externalInfo.GetDefaultSerializer()->Deserialize( - TStringBuf(originalData.data() + currentIndex, proto.GetOtherStatsSize()), TDictStats::GetSchema()); + std::shared_ptr rbOtherStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize( + TString(originalData.data() + currentIndex, proto.GetOtherStatsSize()), TDictStats::GetSchema())); + TDictStats otherStats(rbOtherStats); currentIndex += proto.GetOtherStatsSize(); - std::vector columns; - for (ui32 i = 0; i < (ui32)schema->num_fields(); ++i) { - columns.emplace_back(originalData.substr(currentIndex, proto.GetColumns(i).GetDescriptionSize())); - currentIndex += proto.GetColumns(i).GetDescriptionSize(); + std::shared_ptr columnKeysContainer; + { + std::vector> columns; + auto schema = columnStats.GetSchema(); + AFL_VERIFY(rbColumnStats->num_rows() == proto.GetKeyColumns().size()); + for (ui32 i = 0; i < (ui32)proto.GetKeyColumns().size(); ++i) { + std::shared_ptr columnLoader = std::make_shared(externalInfo.GetDefaultSerializer(), + columnStats.GetAccessorConstructor(i, externalInfo.GetRecordsCount()), schema->field(i), nullptr, 0); + std::vector chunks = { TDeserializeChunkedArray::TChunk( + externalInfo.GetRecordsCount(), originalData.substr(currentIndex, proto.GetKeyColumns(i).GetSize())) }; + columns.emplace_back(std::make_shared(externalInfo.GetRecordsCount(), columnLoader, std::move(chunks))); + currentIndex += proto.GetKeyColumns(i).GetSize(); + } + columnKeysContainer = std::make_shared(schema, std::move(columns)); + } + std::shared_ptr otherKeysContainer; + { + std::vector> columns; + AFL_VERIFY(rbOtherStats->num_rows() == proto.GetOtherColumns().size()); + auto schema = otherStats.GetSchema(); + for (ui32 i = 0; i < (ui32)proto.GetOtherColumns().size(); ++i) { + std::shared_ptr columnLoader = std::make_shared(externalInfo.GetDefaultSerializer(), + otherStats.GetAccessorConstructor(i, proto.GetOtherRecordsCount()), schema->field(i), nullptr, 0); + std::vector chunks = { TDeserializeChunkedArray::TChunk( + proto.GetOtherRecordsCount(), originalData.substr(currentIndex, proto.GetOtherColumns(i).GetSize())) }; + columns.emplace_back(std::make_shared(proto.GetOtherRecordsCount(), columnLoader, std::move(chunks))); + currentIndex += proto.GetOtherColumns(i).GetSize(); + } + otherKeysContainer = std::make_shared(schema, std::move(columns)); } - return std::make_shared(schema, std::move(columns), externalInfo); + TColumnsData columnData(columnStats, columnKeysContainer); + TOthersData otherData(otherStats, otherKeysContainer); + return std::make_shared( + std::move(columnData), std::move(otherData), externalInfo.GetColumnType(), externalInfo.GetRecordsCount()); } NKikimrArrowAccessorProto::TConstructor TConstructor::DoSerializeToProto() const { @@ -46,7 +79,7 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons TConclusion> TConstructor::DoConstruct( const std::shared_ptr& originalData, const TChunkConstructionData& /*externalInfo*/) const { - return std::make_shared(originalData, DataExtractor); + return NAccessor::TSubColumnsArray::Make(originalData, DataExtractor).DetachResult(); } TString TConstructor::DoSerializeToString(const std::shared_ptr& columnData, const TChunkConstructionData& externalInfo) const { diff --git a/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h b/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h index 7a408fea70cb..59ff373fe078 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h @@ -11,35 +11,19 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns { class IDataAdapter { private: - virtual TConclusion> DoBuildSchemaForData(const std::shared_ptr& sourceArray) const = 0; virtual TConclusionStatus DoAddDataToBuilders(const std::shared_ptr& sourceArray, TDataBuilder& dataBuilder) const = 0; public: virtual ~IDataAdapter() = default; - TConclusion> BuildSchemaForData(const std::shared_ptr& sourceArray) const { - return DoBuildSchemaForData(sourceArray); - } TConclusionStatus AddDataToBuilders(const std::shared_ptr& sourceArray, TDataBuilder& dataBuilder) const; }; class TFirstLevelSchemaData: public IDataAdapter { private: - virtual TConclusion> DoBuildSchemaForData(const std::shared_ptr& sourceArray) const override; - virtual TConclusionStatus DoAddDataToBuilders(const std::shared_ptr& sourceArray, TDataBuilder& dataBuilder) const override; public: }; -class TFirstLevelKVData: public IDataAdapter { -private: - virtual TConclusion> DoBuildSchemaForData( - const std::shared_ptr& /*sourceArray*/) const override { - return std::shared_ptr(); - } - -public: -}; - } // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp index 3f5454a8de0c..6ec14d9cb681 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp @@ -37,7 +37,7 @@ std::shared_ptr TDataBuilder::Finish() { for (auto&& i : rIt->second) { if (columnAccessorsCount < TSettings::ColumnAccessorsCountLimit) { columnElements.emplace_back(i); - if (i->GetRecordIndexes().size() * 20 < CurrentRecordIndex) { + if (TSettings::IsSparsed(i->GetRecordIndexes().size(), CurrentRecordIndex)) { i->BuildSparsedAccessor(CurrentRecordIndex); } else { i->BuildPlainAccessor(CurrentRecordIndex); @@ -77,14 +77,14 @@ TOthersData TDataBuilder::MergeOthers(const std::vector& other auto othersBuilder = TOthersData::MakeMergedBuilder(); while (heap.size()) { std::pop_heap(heap.begin(), heap.end()); - othersBuilder.Add(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), heap.back().GetValue()); + othersBuilder->Add(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), heap.back().GetValue()); if (!heap.back().Next()) { heap.pop_back(); } else { std::push_heap(heap.begin(), heap.end()); } } - return othersBuilder.Finish(BuildStats(otherKeys)); + return othersBuilder->Finish(BuildStats(otherKeys)); } } // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/iterators.h b/ydb/core/formats/arrow/accessor/sub_columns/iterators.h index 6d3c4fef9cfd..6fedf1fef9a2 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/iterators.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/iterators.h @@ -223,10 +223,21 @@ class TReadIteratorOrderedKeys { } template - void ReadRecords(const ui32 recordsCount, const TStartRecordActor& startRecordActor, const TKVActor& kvActor, + void ReadRecord(const ui32 recordIndex, const TStartRecordActor& startRecordActor, const TKVActor& kvActor, const TFinishRecordActor& finishRecordActor) { - for (ui32 i = 0; i < recordsCount; ++i) { - startRecordActor(i); + while (SortedIterators.size()) { + if (SortedIterators.front()->GetRecordIndex() > recordIndex) { + return; + } else if (SortedIterators.front()->GetRecordIndex() < recordIndex) { + std::pop_heap(SortedIterators.begin(), SortedIterators.end(), TIteratorsComparator()); + if (!itColumn.Next()) { + SortedIterators.pop_back(); + } else { + std::push_heap(SortedIterators.begin(), SortedIterators.end(), TIteratorsComparator()); + } + continue; + } + startRecordActor(recordIndex); while (SortedIterators.size() && SortedIterators.front()->GetRecordIndex() == i) { std::pop_heap(SortedIterators.begin(), SortedIterators.end(), TIteratorsComparator()); auto& itColumn = *SortedIterators.back(); @@ -238,7 +249,10 @@ class TReadIteratorOrderedKeys { } } finishRecordActor(); + return; } + startRecordActor(recordIndex); + finishRecordActor(); } }; diff --git a/ydb/core/formats/arrow/accessor/sub_columns/others_storage.h b/ydb/core/formats/arrow/accessor/sub_columns/others_storage.h index f695c72fbf4e..1a5c156f6081 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/others_storage.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/others_storage.h @@ -14,9 +14,13 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns { class TOthersData { private: TDictStats Stats; - std::shared_ptr Records; + YDB_READONLY_DEF(std::shared_ptr, Records); public: + static TOthersData BuildEmpty() { + return TOthersData(TDictStats::BuildEmpty(), std::make_shared(0)); + } + ui64 GetRawSize() const { return Records->GetRawSizeVerified(); } @@ -127,8 +131,8 @@ class TOthersData { TOthersData Finish(const TDictStats& stats); }; - static TBuilderWithStats MakeMergedBuilder() { - return TBuilderWithStats(); + static std::shared_ptr MakeMergedBuilder() { + return std::make_shared(); } }; diff --git a/ydb/core/formats/arrow/accessor/sub_columns/settings.h b/ydb/core/formats/arrow/accessor/sub_columns/settings.h index 1df1ab809fe5..96b5d461f73c 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/settings.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/settings.h @@ -8,8 +8,15 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns { class TSettings { +private: + static const ui32 SparsedDetectorKff = 4; + public: static const ui32 ColumnAccessorsCountLimit = 1024; + static bool IsSparsed(const ui32 keyUsageCount, const ui32 recordsCount) { + AFL_VERIFY(recordsCount); + return keyUsageCount * SparsedDetectorKff < recordsCount; + } }; } // namespace NKikimr::NArrow::NAccessor::NSubColumns diff --git a/ydb/core/formats/arrow/accessor/sub_columns/stats.cpp b/ydb/core/formats/arrow/accessor/sub_columns/stats.cpp index 704242af1a4a..0f57a138804d 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/stats.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/stats.cpp @@ -1,5 +1,10 @@ +#include "settings.h" #include "stats.h" +#include +#include +#include + #include namespace NKikimr::NArrow::NAccessor::NSubColumns { @@ -74,6 +79,27 @@ TDictStats::TDictStats(const std::shared_ptr& original) DataSize = std::static_pointer_cast(Original->column(2)); } +bool TDictStats::IsSparsed(const ui32 columnIndex, const ui32 recordsCount) const { + return TSettings::IsSparsed(GetColumnRecordsCount(columnIndex), recordsCount); +} + +TConstructorContainer TDictStats::GetAccessorConstructor(const ui32 columnIndex, const ui32 recordsCount) const { + if (IsSparsed(columnIndex, recordsCount)) { + return std::make_shared(); + } else { + return std::make_shared(); + } +} + +TDictStats TDictStats::BuildEmpty() { + return TDictStats(MakeEmptyBatch(GetSchema())); +} + +TString TDictStats::SerializeAsString(const std::shared_ptr& serializer) const { + AFL_VERIFY(serializer); + return serializer->SerializePayload(Original); +} + TDictStats::TBuilder::TBuilder() { Builders = NArrow::MakeBuilders(GetSchema()); AFL_VERIFY(Builders.size() == 3); diff --git a/ydb/core/formats/arrow/accessor/sub_columns/stats.h b/ydb/core/formats/arrow/accessor/sub_columns/stats.h index 156b05876131..b2971c29b378 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/stats.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/stats.h @@ -1,4 +1,6 @@ #pragma once +#include + #include #include @@ -19,6 +21,30 @@ class TDictStats { std::shared_ptr DataSize; public: + static TDictStats BuildEmpty(); + TString SerializeAsString(const std::shared_ptr& serializer) const; + + std::optional GetKeyIndexOptional(const std::string_view keyName) const { + for (ui32 i = 0; i < DataNames->length(); ++i) { + const auto arrView = DataNames->GetView(i); + if (std::string_view(arrView.data(), arrView.size()) == keyName) { + return i; + } + } + return std::nullopt; + } + + ui32 GetKeyIndexVerified(const std::string_view keyName) const { + for (ui32 i = 0; i < DataNames->length(); ++i) { + const auto arrView = DataNames->GetView(i); + if (std::string_view(arrView.data(), arrView.size()) == keyName) { + return i; + } + } + AFL_VERIFY(false); + return 0; + } + class TRTStats { private: YDB_READONLY_DEF(TString, KeyName); @@ -97,6 +123,10 @@ class TDictStats { } std::string_view GetColumnName(const ui32 index) const; + TString GetColumnNameString(const ui32 index) const { + auto view = GetColumnName(index); + return TString(view.data(), view.size()); + } ui32 GetColumnRecordsCount(const ui32 index) const; ui32 GetColumnSize(const ui32 index) const; @@ -107,6 +137,8 @@ class TDictStats { return result; } + TConstructorContainer GetAccessorConstructor(const ui32 columnIndex, const ui32 recordsCount) const; + bool IsSparsed(const ui32 columnIndex, const ui32 recordsCount) const; TDictStats(const std::shared_ptr& original); }; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h index 6c73b7b2ea8d..a30280483cc2 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h @@ -10,7 +10,7 @@ class TMergingChunkContext { std::shared_ptr RecordIdxArray; public: - const ui32 GetRecordsCount() const { + ui32 GetRecordsCount() const { return IdxArray->length(); } @@ -141,7 +141,7 @@ class IColumnMerger { std::vector Execute(const TChunkMergeContext& context, TMergingContext& mergeContext) { const auto& chunk = mergeContext.GetChunk(context.GetBatchIdx()); - AFL_VERIFY(context.GetRecordsCount() == chunk.GetIdxArray()->length()); + AFL_VERIFY(context.GetRecordsCount() == chunk.GetIdxArray().length()); return DoExecute(context, mergeContext); } }; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.h b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.h index a516d205d7f7..5db44afa27b0 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/sub_columns/logic.h @@ -1,6 +1,9 @@ #pragma once +#include #include +#include #include +#include #include #include @@ -11,6 +14,13 @@ class TSubColumnsMerger: public IColumnMerger { private: static inline auto Registrator = TFactory::TRegistrator(NArrow::NAccessor::TGlobalConst::SubColumnsDataAccessorName); using TBase = IColumnMerger; + using TDictStats = NArrow::NAccessor::NSubColumns::TDictStats; + using TOthersData = NArrow::NAccessor::NSubColumns::TOthersData; + using TColumnsData = NArrow::NAccessor::NSubColumns::TColumnsData; + using TSparsedBuilder = NArrow::NAccessor::TSparsedArray::TSparsedBuilder; + using TPlainBuilder = NArrow::NAccessor::TTrivialArray::TPlainBuilder; + using TSubColumnsArray = NArrow::NAccessor::TSubColumnsArray; + using TReadIteratorOrderedKeys = NArrow::NAccessor::NSubColumns::TReadIteratorOrderedKeys; std::vector> Sources; std::vector OrderedIterators; ui32 OutputRecordsCount = 0; @@ -19,6 +29,18 @@ class TSubColumnsMerger: public IColumnMerger { std::optional ResultOtherStats; class TRemapColumns { private: + class TRemapInfo { + private: + YDB_READONLY(ui32, CommonKeyIndex, 0); + YDB_READONLY(bool, IsColumnKey, false); + + public: + TRemapInfo(const ui32 keyIndex, const bool isColumnKey) + : CommonKeyIndex(keyIndex) + , IsColumnKey(isColumnKey) { + } + }; + class TSourceAddress { private: const ui32 SourceIndex; @@ -38,7 +60,6 @@ class TSubColumnsMerger: public IColumnMerger { }; std::map RemapInfo; - std::vector>> RemapInfo; public: void AddRemap(const ui32 sourceIdx, const TDictStats& sourceColumnStats, const TDictStats& sourceOtherStats, @@ -47,7 +68,7 @@ class TSubColumnsMerger: public IColumnMerger { if (auto commonKeyIndex = resultColumnStats.GetKeyIndexOptional(sourceColumnStats.GetColumnName(i))) { AFL_VERIFY(RemapInfo.emplace(TSourceAddress(sourceIdx, i, true), TRemapInfo(*commonKeyIndex, true)).second); } else { - const ui32 commonKeyIndex = resultOtherStats.GetKeyIndexVerified(sourceColumnStats.GetColumnName(i)); + commonKeyIndex = resultOtherStats.GetKeyIndexVerified(sourceColumnStats.GetColumnName(i)); AFL_VERIFY(RemapInfo.emplace(TSourceAddress(sourceIdx, i, true), TRemapInfo(*commonKeyIndex, false)).second); } } @@ -55,24 +76,12 @@ class TSubColumnsMerger: public IColumnMerger { if (auto commonKeyIndex = resultColumnStats.GetKeyIndexOptional(sourceOtherStats.GetColumnName(i))) { AFL_VERIFY(RemapInfo.emplace(TSourceAddress(sourceIdx, i, false), TRemapInfo(*commonKeyIndex, true)).second); } else { - const ui32 commonKeyIndex = resultOtherStats.GetKeyIndexVerified(sourceOtherStats.GetColumnName(i)); + commonKeyIndex = resultOtherStats.GetKeyIndexVerified(sourceOtherStats.GetColumnName(i)); AFL_VERIFY(RemapInfo.emplace(TSourceAddress(sourceIdx, i, false), TRemapInfo(*commonKeyIndex, false)).second); } } } - class TRemapInfo { - private: - YDB_READONLY(ui32, CommonKeyIndex, 0); - YDB_READONLY(bool, IsColumnKey, false); - - public: - TRemapInfo(const ui32 keyIndex, const bool isColumnKey) - : CommonKeyIndex(keyIndex) - , IsColumnKey(isColumnKey) { - } - }; - TRemapInfo RemapIndex(const ui32 sourceIdx, const ui32 sourceKeyIndex, const bool isColumnKey) const { auto it = RemapInfo.find(TSourceAddress(sourceIdx, sourceKeyIndex, isColumnKey)); AFL_VERIFY(it != RemapInfo.end()); @@ -89,7 +98,10 @@ class TSubColumnsMerger: public IColumnMerger { if (i->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray) { Sources.emplace_back(std::static_pointer_cast(i)); } else { - auto subColumnsAccessor = mergeContext.GetAccessorConstructor().Construct(i); + auto subColumnsAccessor = Context.GetLoader() + ->GetAccessorConstructor() + ->Construct(i, Context.GetLoader()->BuildAccessorContext(i->GetRecordsCount())) + .DetachResult(); Sources.emplace_back(std::static_pointer_cast(subColumnsAccessor)); } } @@ -100,13 +112,14 @@ class TSubColumnsMerger: public IColumnMerger { InputRecordsCount += i->GetRecordsCount(); } auto commonStats = TDictStats::Merge(stats); - auto splitted = commonStats.SplitByVolume(NSubColumns::TSettings::ColumnAccessorsCountLimit); + auto splitted = commonStats.SplitByVolume(NArrow::NAccessor::NSubColumns::TSettings::ColumnAccessorsCountLimit); ResultColumnStats = splitted.ExtractColumns(); ResultOtherStats = splitted.ExtractOthers(); - ui32 sourceIdx = 0; - for (auto&& i : Sources) { - RemapKeyIndex.AddRemap(sourceIdx++, i, ResultColumnStats, ResultOtherStats); + for (ui32 sourceIdx = 0; sourceIdx < Sources.size(); ++sourceIdx) { + auto source = Sources[sourceIdx]; + RemapKeyIndex.AddRemap( + sourceIdx, source->GetColumnsData().GetStats(), source->GetOthersData().GetStats(), *ResultColumnStats, *ResultOtherStats); } } @@ -117,33 +130,59 @@ class TSubColumnsMerger: public IColumnMerger { const TDictStats ResultColumnStats; const TDictStats ResultOtherStats; const TChunkMergeContext& Context; - TOthersData::TBuilderWithStats OthersBuilder; + std::shared_ptr OthersBuilder; + ui32 RecordIndex = 0; class TGeneralAccessorBuilder { private: std::variant Builder; + public: + TGeneralAccessorBuilder(TSparsedBuilder&& builder) + : Builder(std::move(builder)) { + } + + TGeneralAccessorBuilder(TPlainBuilder&& builder) + : Builder(std::move(builder)) { + } + void AddRecord(const ui32 recordIndex, const std::string_view value) { struct TVisitor { - void operator(TSparsedBuilder& builder) const { + private: + const ui32 RecordIndex; + const std::string_view Value; + + public: + void operator()(TSparsedBuilder& builder) const { builder.AddRecord(RecordIndex, Value); } - void operator(TPlainBuilder& builder) const { + void operator()(TPlainBuilder& builder) const { builder.AddRecord(RecordIndex, Value); } + TVisitor(const ui32 recordIndex, const std::string_view value) + : RecordIndex(recordIndex) + , Value(value) { + } }; - Builder.visit(TVisitor(recordIndex, value)); + std::visit(TVisitor(recordIndex, value), Builder); } - std::shared_ptr Finish(const ui32 recordIndex, const std::string_view value) { + std::shared_ptr Finish(const ui32 recordsCount) { struct TVisitor { - void operator(TSparsedBuilder& builder) const { - return builder.Finish(); + private: + const ui32 RecordsCount; + + public: + std::shared_ptr operator()(TSparsedBuilder& builder) const { + return builder.Finish(RecordsCount); } - void operator(TPlainBuilder& builder) const { - return builder.Finish(); + std::shared_ptr operator()(TPlainBuilder& builder) const { + return builder.Finish(RecordsCount); + } + TVisitor(const ui32 recordsCount) + : RecordsCount(recordsCount) { } }; - return Builder.visit(TVisitor()); + return std::visit(TVisitor(recordsCount), Builder); } }; @@ -152,30 +191,34 @@ class TSubColumnsMerger: public IColumnMerger { void FlushData() { AFL_VERIFY(RecordIndex); - auto portionOthersData = OthersBuilder.Finish(ResultOtherStats); - std::vector> arrays; + auto portionOthersData = OthersBuilder->Finish(ResultOtherStats); + std::vector> arrays; for (auto&& i : ColumnBuilders) { arrays.emplace_back(i.Finish(RecordIndex)); } - TColumnsData cData(ResultColumnStats, std::make_shared(ResultColumnStats.BuildSchema(), arrays)); - Results.emplace_back(std::make_shared(std::move(cData), std::move(portionOthersData))); + TColumnsData cData( + ResultColumnStats, std::make_shared(ResultColumnStats.BuildSchema()->fields(), std::move(arrays))); + Results.emplace_back( + std::make_shared(std::move(cData), std::move(portionOthersData), arrow::binary(), RecordIndex)); Initialize(); } void Initialize() { ColumnBuilders.clear(); for (ui32 i = 0; i < ResultColumnStats.GetColumnsCount(); ++i) { - if (ResultColumnStats.GetColumnRecordsCount(i) * 10 < OutputRecordsCount) { - ColumnBuilders.emplace_back(TGeneralAccessorBuilder::MakeSparsedBuilder()); + if (ResultColumnStats.IsSparsed(i, OutputRecordsCount)) { + ColumnBuilders.emplace_back(TSparsedBuilder()); } else { - ColumnBuilders.emplace_back(TGeneralAccessorBuilder::MakePlainBuilder()); + ColumnBuilders.emplace_back(TPlainBuilder()); } } OthersBuilder = TOthersData::MakeMergedBuilder(); RecordIndex = 0; } + public: - TMergedBuilder(const TDictStats& columnStats, const TDictStats& otherStats, const ui32 inputRecordsCount, const ui32 outputRecordsCount, + TMergedBuilder(const NArrow::NAccessor::NSubColumns::TDictStats& columnStats, + const NArrow::NAccessor::NSubColumns::TDictStats& otherStats, const ui32 inputRecordsCount, const ui32 outputRecordsCount, const TChunkMergeContext& context) : OutputRecordsCount(outputRecordsCount) , InputRecordsCount(inputRecordsCount) @@ -183,13 +226,17 @@ class TSubColumnsMerger: public IColumnMerger { , ResultOtherStats(otherStats) , Context(context) , OthersBuilder(TOthersData::MakeMergedBuilder()) { + Y_UNUSED(InputRecordsCount); Initialize(); } class TPortionColumn: public TColumnPortionResult { public: void AddChunk(const std::shared_ptr& cArray, const TColumnMergeContext& cmContext) { - Chunks.emplace_back(std::make_shared(cArray->SerializeToString(), cArray, + AFL_VERIFY(cArray); + AFL_VERIFY(cArray->GetRecordsCount()); + auto accContext = cmContext.GetLoader()->BuildAccessorContext(cArray->GetRecordsCount()); + Chunks.emplace_back(std::make_shared(cArray->SerializeToString(accContext), cArray, TChunkAddress(cmContext.GetColumnId(), Chunks.size()), cmContext.GetIndexInfo().GetColumnFeaturesVerified(cmContext.GetColumnId()))); } @@ -201,8 +248,9 @@ class TSubColumnsMerger: public IColumnMerger { } std::vector portions; for (auto&& i : Results) { - portions.emplace_back(TPortionColumn(cmContext.GetColumnId())); - portions.back().AddChunk(i, cmContext); + TPortionColumn pColumn(cmContext.GetColumnId()); + pColumn.AddChunk(i, cmContext); + portions.emplace_back(std::move(pColumn)); } return std::move(portions); } @@ -217,37 +265,37 @@ class TSubColumnsMerger: public IColumnMerger { } void AddColumnKV(const ui32 commonKeyIndex, const std::string_view value) { AFL_VERIFY(commonKeyIndex < ColumnBuilders.size()); - ColumnBuilders[commonKeyIndex].AddRecord(RecordIndex, value) + ColumnBuilders[commonKeyIndex].AddRecord(RecordIndex, value); } void AddOtherKV(const ui32 commonKeyIndex, const std::string_view value) { - OthersBuilder.Add(RecordIndex, commonKeyIndex, value); + OthersBuilder->Add(RecordIndex, commonKeyIndex, value); } }; virtual std::vector DoExecute(const TChunkMergeContext& context, TMergingContext& mergeContext) override { AFL_VERIFY(ResultColumnStats && ResultOtherStats); auto& mergeChunkContext = mergeContext.GetChunk(context.GetBatchIdx()); - TMergedBuilder builder(*ResultColumnStats, *ResultOtherStats); + TMergedBuilder builder(*ResultColumnStats, *ResultOtherStats, InputRecordsCount, OutputRecordsCount, context); for (ui32 i = 0; i < context.GetRecordsCount(); ++i) { - const ui32 sourceIdx = mergeChunkContext.GetIdxArray()->Value(i); - const ui32 recordIdx = mergeChunkContext.GetRecordIdxArray()->Value(i); - const auto startRecord = [](const ui32 /*sourceRecordIndex*/) { + const ui32 sourceIdx = mergeChunkContext.GetIdxArray().Value(i); + const ui32 recordIdx = mergeChunkContext.GetRecordIdxArray().Value(i); + const auto startRecord = [&](const ui32 /*sourceRecordIndex*/) { builder.StartRecord(); }; - const auto addKV = [sourceIdx](const ui32 sourceKeyIndex, const std::string_view value, const bool isColumnKey) { + const auto addKV = [&](const ui32 sourceKeyIndex, const std::string_view value, const bool isColumnKey) { auto commonKeyInfo = RemapKeyIndex.RemapIndex(sourceIdx, sourceKeyIndex, isColumnKey); - if (commonKeyInfo.IsColumnKey()) { + if (commonKeyInfo.GetIsColumnKey()) { builder.AddColumnKV(commonKeyInfo.GetCommonKeyIndex(), value); } else { builder.AddOtherKV(commonKeyInfo.GetCommonKeyIndex(), value); } }; - const auto finishRecord = []() { + const auto finishRecord = [&]() { builder.FinishRecord(); }; - OrderedIterators[sourceIdx].ReadRecords(1, startRecord, addKV, finishRecord); + OrderedIterators[sourceIdx].ReadRecords(recordIdx, startRecord, addKV, finishRecord); } - return builder.Finish(); + return builder.Finish(Context); } public: diff --git a/ydb/library/formats/arrow/accessor/abstract/accessor.h b/ydb/library/formats/arrow/accessor/abstract/accessor.h index 985c6028edab..143e5ad7a830 100644 --- a/ydb/library/formats/arrow/accessor/abstract/accessor.h +++ b/ydb/library/formats/arrow/accessor/abstract/accessor.h @@ -1,4 +1,5 @@ #pragma once + #include #include @@ -8,6 +9,10 @@ #include #include +namespace NKikimr::NArrow::NSerialization { +class ISerializer; +} + namespace NKikimr::NArrow::NAccessor { class TColumnLoader; @@ -106,8 +111,7 @@ class IChunkedArray { TString DebugString() const { TStringBuilder sb; - sb << "start=" << GlobalStartPosition << ";finish=" << GlobalFinishPosition - << ";addresses_count=" << Addresses.size() << ";"; + sb << "start=" << GlobalStartPosition << ";finish=" << GlobalFinishPosition << ";addresses_count=" << Addresses.size() << ";"; for (auto&& i : Addresses) { sb << "addresses=" << i.DebugString() << ";"; } @@ -123,8 +127,7 @@ class IChunkedArray { public: TFullChunkedArrayAddress(const std::shared_ptr& arr, TAddressChain&& address) : Array(arr) - , Address(std::move(address)) - { + , Address(std::move(address)) { AFL_VERIFY(Address.GetSize()); AFL_VERIFY(Array); AFL_VERIFY(Array->GetRecordsCount()); @@ -168,8 +171,7 @@ class IChunkedArray { TFullDataAddress(const std::shared_ptr& arr, TAddressChain&& address) : Array(arr) - , Address(std::move(address)) - { + , Address(std::move(address)) { AFL_VERIFY(Array); AFL_VERIFY(Address.GetSize()); } @@ -187,8 +189,7 @@ class IChunkedArray { TLocalDataAddress(const std::shared_ptr& arr, const ui32 start, const ui32 chunkIdx) : Array(arr) - , Address(start, start + TValidator::CheckNotNull(arr)->length(), chunkIdx) - { + , Address(start, start + TValidator::CheckNotNull(arr)->length(), chunkIdx) { } TLocalDataAddress(const std::shared_ptr& arr, const TCommonChunkAddress& address) @@ -214,8 +215,7 @@ class IChunkedArray { TAddress(const std::shared_ptr& arr, const ui64 position) : Array(arr) - , Position(position) - { + , Position(position) { AFL_VERIFY(!!Array); AFL_VERIFY(position < (ui32)Array->length()); } @@ -230,13 +230,20 @@ class IChunkedArray { virtual std::optional DoGetRawSize() const = 0; virtual std::shared_ptr DoGetScalar(const ui32 index) const = 0; - virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray(const std::optional& /*chunkCurrent*/, const ui64 /*position*/) const { + virtual TLocalChunkedArrayAddress DoGetLocalChunkedArray( + const std::optional& /*chunkCurrent*/, const ui64 /*position*/) const { AFL_VERIFY(false); return TLocalChunkedArrayAddress(nullptr, 0, 0); } virtual TLocalDataAddress DoGetLocalData(const std::optional& chunkCurrent, const ui64 position) const = 0; protected: + std::shared_ptr GetArraySchema() const { + const arrow::FieldVector fields = { std::make_shared("val", GetDataType()) }; + return std::make_shared(fields); + } + + TLocalChunkedArrayAddress GetLocalChunkedArray(const std::optional& chunkCurrent, const ui64 position) const { return DoGetLocalChunkedArray(chunkCurrent, position); } @@ -251,8 +258,7 @@ class IChunkedArray { ui64 idx = 0; if (chunkCurrent) { if (position < chunkCurrent->GetFinishPosition()) { - return accessor.OnArray( - chunkCurrent->GetChunkIndex(), chunkCurrent->GetStartPosition()); + return accessor.OnArray(chunkCurrent->GetChunkIndex(), chunkCurrent->GetStartPosition()); } else if (position == chunkCurrent->GetFinishPosition() && chunkCurrent->GetChunkIndex() + 1 < accessor.GetChunksCount()) { return accessor.OnArray(chunkCurrent->GetChunkIndex() + 1, position); } @@ -301,8 +307,10 @@ class IChunkedArray { public: TLocalDataAddress GetLocalData(const std::optional& chunkCurrent, const ui64 position) const { + AFL_VERIFY(position < GetRecordsCount())("position", position)("records_count", GetRecordsCount()); return DoGetLocalData(chunkCurrent, position); } + class TReader { private: std::shared_ptr ChunkedArray; diff --git a/ydb/library/formats/arrow/arrow_helpers.cpp b/ydb/library/formats/arrow/arrow_helpers.cpp index 4314fab348b5..d8120f5cd4d3 100644 --- a/ydb/library/formats/arrow/arrow_helpers.cpp +++ b/ydb/library/formats/arrow/arrow_helpers.cpp @@ -210,7 +210,7 @@ std::unique_ptr MakeBuilder(const std::shared_ptr FinishBuilder(std::unique_ptr&& builders) { +std::shared_ptr FinishBuilder(std::unique_ptr&& builder) { std::shared_ptr array; TStatusValidator::Validate(builder->Finish(&array)); return array; diff --git a/ydb/library/formats/arrow/protos/accessor.proto b/ydb/library/formats/arrow/protos/accessor.proto index 07ed5fb869d5..a0c460243570 100644 --- a/ydb/library/formats/arrow/protos/accessor.proto +++ b/ydb/library/formats/arrow/protos/accessor.proto @@ -39,6 +39,7 @@ message TSubColumnsAccessor { message TColumn { optional uint32 Size = 1; } - repeated TColumn Columns = 3; - optional TColumn Others = 4; + repeated TColumn KeyColumns = 3; + repeated TColumn OtherColumns = 4; + optional uint32 OtherRecordsCount = 5; } \ No newline at end of file