Skip to content

Commit

Permalink
fix build
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Feb 2, 2025
1 parent 2de95c4 commit 79b8b84
Show file tree
Hide file tree
Showing 16 changed files with 320 additions and 182 deletions.
121 changes: 51 additions & 70 deletions ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,74 +9,38 @@
#include <ydb/library/formats/arrow/protos/accessor.pb.h>
#include <ydb/library/formats/arrow/simple_arrays_cache.h>

#include <yql/essentials/types/binary_json/format.h>
#include <yql/essentials/types/binary_json/write.h>

namespace NKikimr::NArrow::NAccessor {

std::vector<TChunkedArraySerialized> TSubColumnsArray::DoSplitBySizes(
const TColumnLoader& loader, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) {
std::vector<TChunkedArraySerialized> result;
auto table = Records->BuildTableVerified();
auto rb = NArrow::ToBatch(table);
AFL_VERIFY(GetRecordsCount());

ui32 idxCurrent = 0;
for (ui32 i = 0; i < splitSizes.size(); ++i) {
const ui32 recordsCount = 1.0 * splitSizes[i] / fullSerializedData.size() * GetRecordsCount();
AFL_VERIFY(recordsCount >= 1);
auto rbSlice = rb->Slice(idxCurrent, recordsCount);
auto subColumnsChunk = std::make_shared<TSubColumnsArray>(rbSlice);
result.emplace_back(subColumnsChunk,
loader.GetAccessorConstructor()->SerializeToString(subColumnsChunk, loader.BuildAccessorContext(GetRecordsCount())));
}

return result;
const TColumnLoader& /*loader*/, const TString& /*fullSerializedData*/, const std::vector<ui64>& /*splitSizes*/) {
AFL_VERIFY(false);
return {};
}

TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<IChunkedArray>& sourceArray, const std::shared_ptr<NSubColumns::IDataAdapter>& adapter)
: TBase(sourceArray->GetRecordsCount(), EType::SubColumnsArray, sourceArray->GetDataType()) {
TConclusion<std::shared_ptr<TSubColumnsArray>> TSubColumnsArray::Make(
const std::shared_ptr<IChunkedArray>& sourceArray, const std::shared_ptr<NSubColumns::IDataAdapter>& adapter) {
AFL_VERIFY(adapter);
AFL_VERIFY(sourceArray);
TDataBuilder builder;
auto builders = NArrow::MakeBuilders(Schema, sourceArray->GetRecordsCount());
NSubColumns::TDataBuilder builder;
IChunkedArray::TReader reader(sourceArray);
std::vector<std::shared_ptr<arrow::Array>> storage;
for (ui32 i = 0; i < reader.GetRecordsCount();) {
auto address = reader.GetReadChunk(i);
storage.emplace_back(address.GetArray());
adapter->AddDataToBuilders(address.GetArray(), Schema, builders);
adapter->AddDataToBuilders(address.GetArray(), builder);
i += address.GetArray()->length();
AFL_VERIFY(i <= reader.GetRecordsCount());
}
auto arrays = NArrow::Finish(std::move(builders));

Records = std::make_shared<TGeneralContainer>(
arrow::RecordBatch::Make(Schema, sourceArray->GetRecordsCount(), NArrow::Finish(std::move(builders))));
Records->AddField(std::make_shared<arrow::Field>("__ORIGINAL", arrow::utf8()), sourceArray).Validate();
}

TSubColumnsArray::TSubColumnsArray(
const std::shared_ptr<arrow::Schema>& schema, const std::vector<TString>& columns, const TChunkConstructionData& externalInfo)
: TBase(externalInfo.GetRecordsCount(), EType::SubColumnsArray, externalInfo.GetColumnType())
, Schema(schema) {
AFL_VERIFY(schema);
AFL_VERIFY((ui32)schema->num_fields() == columns.size())("schema", schema->ToString())("columns", columns.size());
std::vector<std::shared_ptr<arrow::Field>> fields;
std::vector<std::shared_ptr<IChunkedArray>> arrays;
for (ui32 i = 0; i < (ui32)schema->num_fields(); ++i) {
fields.emplace_back(schema->field(i));
auto loader = std::make_shared<TColumnLoader>(externalInfo.GetDefaultSerializer(), std::make_shared<NPlain::TConstructor>(),
std::make_shared<arrow::Field>("__ORIGINAL", externalInfo.GetColumnType()), externalInfo.GetDefaultValue(), 0);
std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk(externalInfo.GetRecordsCount(), columns[i]) };
arrays.emplace_back(std::make_shared<TDeserializeChunkedArray>(externalInfo.GetRecordsCount(), loader, std::move(chunks)));
}

Records = std::make_shared<TGeneralContainer>(std::move(fields), std::move(arrays));
return builder.Finish();
}

TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount)
: TBase(recordsCount, EType::SubColumnsArray, type) {
Schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("__ORIGINAL", type) }));
Records = std::make_shared<TGeneralContainer>(arrow::RecordBatch::Make(
Schema, recordsCount, std::vector<std::shared_ptr<arrow::Array>>({ NArrow::TThreadSimpleArraysCache::GetNull(type, recordsCount) })));
: TBase(recordsCount, EType::SubColumnsArray, type)
, ColumnsData(NSubColumns::TColumnsData::BuildEmpty(recordsCount))
, OthersData(NSubColumns::TOthersData::BuildEmpty()) {
}

TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others,
Expand All @@ -89,62 +53,79 @@ TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColu
TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& externalInfo) const {
TString blobData;
NKikimrArrowAccessorProto::TSubColumnsAccessor proto;
const TString blobSchema = NArrow::SerializeSchema(*Schema);
*proto.MutableSchema()->MutableDescriptionSize() = blobSchema.size();
AFL_VERIFY((ui32)Schema->num_fields() == Records->num_columns());
std::vector<TString> rbBlobs;
std::vector<TString> blobRanges;
blobRanges.emplace_back(ColumnsData.GetStats().SerializeAsString(externalInfo.GetDefaultSerializer()));
proto.SetColumnStatsSize(blobRanges.back().size());

blobRanges.emplace_back(OthersData.GetStats().SerializeAsString(externalInfo.GetDefaultSerializer()));
proto.SetOtherStatsSize(blobRanges.back().size());
ui32 columnIdx = 0;
for (auto&& i : ColumnsData.GetRecords()->GetColumns()) {
TChunkConstructionData cData(ColumnsData.GetRecords()->GetRecordsCount(), nullptr, arrow::utf8(), externalInfo.GetDefaultSerializer());
blobRanges.emplace_back(
ColumnsData.GetStats().GetAccessorConstructor(columnIdx++, ColumnsData.GetRecords()->GetRecordsCount()).SerializeToString(i, cData));
auto* cInfo = proto.AddKeyColumns();
cInfo->SetSize(blobRanges.back().size());
}

for (auto&& i : OthersData.GetRecords()->GetColumns()) {
TChunkConstructionData cData(ColumnsData.GetRecords()->GetRecordsCount(), nullptr, arrow::utf8(), externalInfo.GetDefaultSerializer());
blobRanges.emplace_back(NPlain::TConstructor().SerializeToString(i, cData));
auto* cInfo = proto.AddOtherColumns();
cInfo->SetSize(blobRanges.back().size());
}
proto.SetOtherRecordsCount(OthersData.GetRecords()->GetRecordsCount());

ui64 blobsSize = 0;
for (auto&& i : Schema->fields()) {
auto rb = NArrow::ToBatch(Records->BuildTableVerified(TGeneralContainer::TTableConstructionContext({ i->name() })));
rbBlobs.emplace_back(externalInfo.GetDefaultSerializer()->SerializePayload(rb));
*proto.AddColumns()->MutableDescriptionSize() = rbBlobs.back().size();
blobsSize += rbBlobs.back().size();
for (auto&& i : blobRanges) {
blobsSize += i.size();
}

const TString protoString = proto.SerializeAsString();
TString result;
TStringOutput so(result);
so.Reserve(protoString.size() + sizeof(ui32) + blobsSize);
const ui32 protoSize = protoString.size();
so.Write(&protoSize, sizeof(protoSize));
for (auto&& s : rbBlobs) {
so.Write(protoString.data(), protoSize);
for (auto&& s : blobRanges) {
so.Write(s.data(), s.size());
}
so.Finish();
return result;
}

IChunkedArray::TLocalDataAddress TSubColumnsArray::DoGetLocalData(
const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const {
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const {
auto it = BuildUnorderedIterator();
ui32 recordsCount = 0;
auto builder = NArrow::MakeBuilder(arrow::binary());
auto* binaryBuilder = static_cast<arrow::BinaryBuilder>(builder.get());
auto* binaryBuilder = static_cast<arrow::BinaryBuilder*>(builder.get());
while (it.IsValid()) {
NJson::TJsonValue value;
auto onStartRecord = [](const ui32 index) {
auto onStartRecord = [&](const ui32 index) {
AFL_VERIFY(recordsCount++ == index);
};
auto onFinishRecord = []() {
auto onFinishRecord = [&]() {
auto bJson = NBinaryJson::SerializeToBinaryJson(value.GetStringRobust());
if (const TString* val = std::get_if<TString>(&bJson)) {
AFL_VERIFY(false);
} else if (const NBinaryJson::TBinaryJson* val = std::get_if<NBinaryJson::TBinaryJson>(&bJson)) {
binaryBuilder->Append(val->data(), val->size());
TStatusValidator::Validate(binaryBuilder->Append(val->data(), val->size()));
} else {
AFL_VERIFY(false);
}
};
auto onRecordKV = [&](const ui32 index, const std::string_view value, const bool isColumn) {
auto onRecordKV = [&](const ui32 index, const std::string_view valueView, const bool isColumn) {
if (isColumn) {
value.InsertValue(ColumnsData.GetStats().GetColumnName(index), value);
value.InsertValue(ColumnsData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size()));
} else {
value.InsertValue(OthersData.GetStats().GetColumnName(index), value);
value.InsertValue(OthersData.GetStats().GetColumnNameString(index), TString(valueView.data(), valueView.size()));
}
};
it.ReadRecords(1, onStartRecord, onRecordKV, onFinishRecord);
}
AFL_VERIFY(recordsCount == chunkCurrent.GetLength())("real", recordsCount)("expected", chunkCurrent.GetLength());
return TLocalDataAddress(NArrow::FinishBuilder(builder), 0, 0);
return TLocalDataAddress(NArrow::FinishBuilder(std::move(builder)), 0, 0);
}

} // namespace NKikimr::NArrow::NAccessor
8 changes: 2 additions & 6 deletions ydb/core/formats/arrow/accessor/sub_columns/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,8 @@ class TSubColumnsArray: public IChunkedArray {
TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others, const std::shared_ptr<arrow::DataType>& type,
const ui32 recordsCount);

TSubColumnsArray(const std::shared_ptr<arrow::RecordBatch>& batch);

TSubColumnsArray(const std::shared_ptr<IChunkedArray>& sourceArray, const std::shared_ptr<NSubColumns::IDataAdapter>& adapter);

TSubColumnsArray(
const std::shared_ptr<arrow::Schema>& schema, const std::vector<TString>& columns, const TChunkConstructionData& externalInfo);
static TConclusion<std::shared_ptr<TSubColumnsArray>> Make(
const std::shared_ptr<IChunkedArray>& sourceArray, const std::shared_ptr<NSubColumns::IDataAdapter>& adapter);

TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns {
class TColumnsData {
private:
TDictStats Stats;
std::shared_ptr<TGeneralContainer> Records;
YDB_READONLY_DEF(std::shared_ptr<TGeneralContainer>, Records);

public:
static TColumnsData BuildEmpty(const ui32 recordsCount) {
return TColumnsData(TDictStats::BuildEmpty(), std::make_shared<TGeneralContainer>(recordsCount));
}

ui64 GetRawSize() const {
return Records->GetRawSizeVerified();
}
Expand Down
53 changes: 43 additions & 10 deletions ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "accessor.h"
#include "constructor.h"

#include <ydb/core/formats/arrow/accessor/composite_serial/accessor.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>

namespace NKikimr::NArrow::NAccessor::NSubColumns {

TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstructDefault(const TChunkConstructionData& externalInfo) const {
Expand All @@ -19,19 +22,49 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
return TConclusionStatus::Fail("cannot parse proto");
}
currentIndex += protoSize;
std::shared_ptr<arrow::RecordBatch> columnStats = externalInfo.GetDefaultSerializer()->Deserialize(
TStringBuf(originalData.data() + currentIndex, proto.GetColumnStatsSize()), TDictStats::GetSchema());
std::shared_ptr<arrow::RecordBatch> rbColumnStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize(
TString(originalData.data() + currentIndex, proto.GetColumnStatsSize()), TDictStats::GetSchema()));
TDictStats columnStats(rbColumnStats);
currentIndex += proto.GetColumnStatsSize();
std::shared_ptr<arrow::RecordBatch> otherStats = externalInfo.GetDefaultSerializer()->Deserialize(
TStringBuf(originalData.data() + currentIndex, proto.GetOtherStatsSize()), TDictStats::GetSchema());
std::shared_ptr<arrow::RecordBatch> rbOtherStats = TStatusValidator::GetValid(externalInfo.GetDefaultSerializer()->Deserialize(
TString(originalData.data() + currentIndex, proto.GetOtherStatsSize()), TDictStats::GetSchema()));
TDictStats otherStats(rbOtherStats);
currentIndex += proto.GetOtherStatsSize();

std::vector<TString> columns;
for (ui32 i = 0; i < (ui32)schema->num_fields(); ++i) {
columns.emplace_back(originalData.substr(currentIndex, proto.GetColumns(i).GetDescriptionSize()));
currentIndex += proto.GetColumns(i).GetDescriptionSize();
std::shared_ptr<TGeneralContainer> columnKeysContainer;
{
std::vector<std::shared_ptr<IChunkedArray>> columns;
auto schema = columnStats.GetSchema();
AFL_VERIFY(rbColumnStats->num_rows() == proto.GetKeyColumns().size());
for (ui32 i = 0; i < (ui32)proto.GetKeyColumns().size(); ++i) {
std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>(externalInfo.GetDefaultSerializer(),
columnStats.GetAccessorConstructor(i, externalInfo.GetRecordsCount()), schema->field(i), nullptr, 0);
std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk(
externalInfo.GetRecordsCount(), originalData.substr(currentIndex, proto.GetKeyColumns(i).GetSize())) };
columns.emplace_back(std::make_shared<TDeserializeChunkedArray>(externalInfo.GetRecordsCount(), columnLoader, std::move(chunks)));
currentIndex += proto.GetKeyColumns(i).GetSize();
}
columnKeysContainer = std::make_shared<TGeneralContainer>(schema, std::move(columns));
}
std::shared_ptr<TGeneralContainer> otherKeysContainer;
{
std::vector<std::shared_ptr<IChunkedArray>> columns;
AFL_VERIFY(rbOtherStats->num_rows() == proto.GetOtherColumns().size());
auto schema = otherStats.GetSchema();
for (ui32 i = 0; i < (ui32)proto.GetOtherColumns().size(); ++i) {
std::shared_ptr<TColumnLoader> columnLoader = std::make_shared<TColumnLoader>(externalInfo.GetDefaultSerializer(),
otherStats.GetAccessorConstructor(i, proto.GetOtherRecordsCount()), schema->field(i), nullptr, 0);
std::vector<TDeserializeChunkedArray::TChunk> chunks = { TDeserializeChunkedArray::TChunk(
proto.GetOtherRecordsCount(), originalData.substr(currentIndex, proto.GetOtherColumns(i).GetSize())) };
columns.emplace_back(std::make_shared<TDeserializeChunkedArray>(proto.GetOtherRecordsCount(), columnLoader, std::move(chunks)));
currentIndex += proto.GetOtherColumns(i).GetSize();
}
otherKeysContainer = std::make_shared<TGeneralContainer>(schema, std::move(columns));
}
return std::make_shared<TSubColumnsArray>(schema, std::move(columns), externalInfo);
TColumnsData columnData(columnStats, columnKeysContainer);
TOthersData otherData(otherStats, otherKeysContainer);
return std::make_shared<TSubColumnsArray>(
std::move(columnData), std::move(otherData), externalInfo.GetColumnType(), externalInfo.GetRecordsCount());
}

NKikimrArrowAccessorProto::TConstructor TConstructor::DoSerializeToProto() const {
Expand All @@ -46,7 +79,7 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons

TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstruct(
const std::shared_ptr<IChunkedArray>& originalData, const TChunkConstructionData& /*externalInfo*/) const {
return std::make_shared<NAccessor::TSubColumnsArray>(originalData, DataExtractor);
return NAccessor::TSubColumnsArray::Make(originalData, DataExtractor).DetachResult();
}

TString TConstructor::DoSerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
Expand Down
16 changes: 0 additions & 16 deletions ydb/core/formats/arrow/accessor/sub_columns/data_extractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,19 @@ namespace NKikimr::NArrow::NAccessor::NSubColumns {

class IDataAdapter {
private:
virtual TConclusion<std::shared_ptr<arrow::Schema>> DoBuildSchemaForData(const std::shared_ptr<IChunkedArray>& sourceArray) const = 0;
virtual TConclusionStatus DoAddDataToBuilders(const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const = 0;

public:
virtual ~IDataAdapter() = default;

TConclusion<std::shared_ptr<arrow::Schema>> BuildSchemaForData(const std::shared_ptr<IChunkedArray>& sourceArray) const {
return DoBuildSchemaForData(sourceArray);
}
TConclusionStatus AddDataToBuilders(const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const;
};

class TFirstLevelSchemaData: public IDataAdapter {
private:
virtual TConclusion<std::shared_ptr<arrow::Schema>> DoBuildSchemaForData(const std::shared_ptr<IChunkedArray>& sourceArray) const override;

virtual TConclusionStatus DoAddDataToBuilders(const std::shared_ptr<arrow::Array>& sourceArray, TDataBuilder& dataBuilder) const override;

public:
};

class TFirstLevelKVData: public IDataAdapter {
private:
virtual TConclusion<std::shared_ptr<arrow::Schema>> DoBuildSchemaForData(
const std::shared_ptr<IChunkedArray>& /*sourceArray*/) const override {
return std::shared_ptr<arrow::Schema>();
}

public:
};

} // namespace NKikimr::NArrow::NAccessor
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ std::shared_ptr<TSubColumnsArray> TDataBuilder::Finish() {
for (auto&& i : rIt->second) {
if (columnAccessorsCount < TSettings::ColumnAccessorsCountLimit) {
columnElements.emplace_back(i);
if (i->GetRecordIndexes().size() * 20 < CurrentRecordIndex) {
if (TSettings::IsSparsed(i->GetRecordIndexes().size(), CurrentRecordIndex)) {
i->BuildSparsedAccessor(CurrentRecordIndex);
} else {
i->BuildPlainAccessor(CurrentRecordIndex);
Expand Down Expand Up @@ -77,14 +77,14 @@ TOthersData TDataBuilder::MergeOthers(const std::vector<TColumnElements*>& other
auto othersBuilder = TOthersData::MakeMergedBuilder();
while (heap.size()) {
std::pop_heap(heap.begin(), heap.end());
othersBuilder.Add(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), heap.back().GetValue());
othersBuilder->Add(heap.back().GetRecordIndex(), heap.back().GetKeyIndex(), heap.back().GetValue());
if (!heap.back().Next()) {
heap.pop_back();
} else {
std::push_heap(heap.begin(), heap.end());
}
}
return othersBuilder.Finish(BuildStats(otherKeys));
return othersBuilder->Finish(BuildStats(otherKeys));
}

} // namespace NKikimr::NArrow::NAccessor::NSubColumns
Loading

0 comments on commit 79b8b84

Please sign in to comment.