Skip to content

Commit

Permalink
fix build
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Feb 2, 2025
1 parent e6372e0 commit 2de95c4
Show file tree
Hide file tree
Showing 23 changed files with 1,222 additions and 1,036 deletions.
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/accessor/plain/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ std::optional<ui64> TTrivialArray::DoGetRawSize() const {
std::vector<TChunkedArraySerialized> TTrivialArray::DoSplitBySizes(
const TColumnLoader& saver, const TString& fullSerializedData, const std::vector<ui64>& splitSizes) {
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("f", GetDataType()) }));
auto chunks = NArrow::NSplitter::TSimpleSplitter(saver.GetSerializer())
auto chunks = NSplitter::TSimpleSplitter(saver.GetSerializer())
.SplitBySizes(arrow::RecordBatch::Make(schema, GetRecordsCount(), { Array }), fullSerializedData, splitSizes);
std::vector<TChunkedArraySerialized> result;
for (auto&& i : chunks) {
Expand Down
36 changes: 35 additions & 1 deletion ydb/core/formats/arrow/accessor/plain/accessor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
#include <ydb/library/formats/arrow/arrow_helpers.h>
#include <ydb/library/formats/arrow/validation/validation.h>

namespace NKikimr::NArrow::NAccessor {
Expand All @@ -12,7 +13,8 @@ class TTrivialArray: public IChunkedArray {
protected:
virtual std::optional<ui64> DoGetRawSize() const override;

virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override {
virtual TLocalDataAddress DoGetLocalData(
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const override {
return TLocalDataAddress(Array, 0, 0);
}
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
Expand All @@ -31,6 +33,38 @@ class TTrivialArray: public IChunkedArray {
: TBase(data->length(), EType::Array, data->type())
, Array(data) {
}

class TPlainBuilder {
private:
std::unique_ptr<arrow::ArrayBuilder> Builder;
arrow::StringBuilder* ValueBuilder;
std::optional<ui32> LastRecordIndex;

public:
TPlainBuilder() {
Builder = NArrow::MakeBuilder(arrow::utf8());
ValueBuilder = static_cast<arrow::StringBuilder*>(Builder.get());
}

void AddRecord(const ui32 recordIndex, const std::string_view value) {
TStatusValidator::Validate(ValueBuilder->AppendNulls(recordIndex - LastRecordIndex.value_or(0)));
LastRecordIndex = recordIndex;
TStatusValidator::Validate(ValueBuilder->Append(value.data(), value.size()));
}

std::shared_ptr<IChunkedArray> Finish(const ui32 recordsCount) {
if (LastRecordIndex) {
TStatusValidator::Validate(ValueBuilder->AppendNulls(recordsCount - *LastRecordIndex - 1));
} else {
TStatusValidator::Validate(ValueBuilder->AppendNulls(recordsCount));
}
return std::make_shared<TTrivialArray>(NArrow::FinishBuilder(std::move(Builder)));
}
};

static TPlainBuilder MakeBuilderUtf8() {
return TPlainBuilder();
}
};

class TTrivialChunkedArray: public IChunkedArray {
Expand Down
35 changes: 35 additions & 0 deletions ydb/core/formats/arrow/accessor/sparsed/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,41 @@ class TSparsedArray: public IChunkedArray {
return *it;
}

class TSparsedBuilder {
private:
std::shared_ptr<arrow::Schema> Schema;
std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders;
arrow::UInt32Builder* IndexBuilder;
arrow::StringBuilder* ValueBuilder;
ui32 RecordsCount = 0;

public:
TSparsedBuilder() {
arrow::FieldVector fields = { std::make_shared<arrow::Field>("index", arrow::uint32()),
std::make_shared<arrow::Field>("value", arrow::utf8()) };
Schema = std::make_shared<arrow::Schema>(fields);
Builders = NArrow::MakeBuilders(Schema);
IndexBuilder = static_cast<arrow::UInt32Builder*>(Builders[0].get());
ValueBuilder = static_cast<arrow::StringBuilder*>(Builders[1].get());
}

void AddRecord(const ui32 recordIndex, const std::string_view value) {
NArrow::TStatusValidator::Validate(IndexBuilder->Append(recordIndex));
NArrow::TStatusValidator::Validate(ValueBuilder->Append(value.data(), value.size()));
++RecordsCount;
}

std::shared_ptr<IChunkedArray> Finish(const ui32 recordsCount) {
TSparsedArray::TBuilder builder(nullptr, arrow::utf8());
builder.AddChunk(recordsCount, arrow::RecordBatch::Make(Schema, RecordsCount, NArrow::Finish(std::move(Builders))));
return builder.Finish();
}
};

static TSparsedBuilder MakeBuilderUtf8() {
return TSparsedBuilder();
}

class TBuilder {
private:
ui32 RecordsCount = 0;
Expand Down
54 changes: 42 additions & 12 deletions ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ std::vector<TChunkedArraySerialized> TSubColumnsArray::DoSplitBySizes(
return result;
}

TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<IChunkedArray>& sourceArray, const std::shared_ptr<IDataAdapter>& adapter)
TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<IChunkedArray>& sourceArray, const std::shared_ptr<NSubColumns::IDataAdapter>& adapter)
: TBase(sourceArray->GetRecordsCount(), EType::SubColumnsArray, sourceArray->GetDataType()) {
AFL_VERIFY(adapter);
AFL_VERIFY(sourceArray);
Expand Down Expand Up @@ -72,24 +72,20 @@ TSubColumnsArray::TSubColumnsArray(
Records = std::make_shared<TGeneralContainer>(std::move(fields), std::move(arrays));
}

TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<arrow::RecordBatch>& batch)
: TBase(batch->num_rows(), EType::SubColumnsArray, batch->schema()->field(batch->schema()->num_fields() - 1)->type())
, Schema(batch->schema()) {
AFL_VERIFY(batch);
AFL_VERIFY(batch->schema()->field(batch->schema()->num_fields() - 1)->name() == "__ORIGINAL")("schema", batch->schema()->ToString());
for (ui32 i = 0; i + 1 < (ui32)batch->schema()->num_fields(); ++i) {
AFL_VERIFY(batch->schema()->field(i)->name() < batch->schema()->field(i + 1)->name())("schema", batch->schema()->ToString());
}
Records = std::make_shared<TGeneralContainer>(batch);
}

TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount)
: TBase(recordsCount, EType::SubColumnsArray, type) {
Schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("__ORIGINAL", type) }));
Records = std::make_shared<TGeneralContainer>(arrow::RecordBatch::Make(
Schema, recordsCount, std::vector<std::shared_ptr<arrow::Array>>({ NArrow::TThreadSimpleArraysCache::GetNull(type, recordsCount) })));
}

TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others,
const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount)
: TBase(recordsCount, EType::SubColumnsArray, type)
, ColumnsData(std::move(columns))
, OthersData(std::move(others)) {
}

TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& externalInfo) const {
TString blobData;
NKikimrArrowAccessorProto::TSubColumnsAccessor proto;
Expand Down Expand Up @@ -117,4 +113,38 @@ TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& extern
return result;
}

IChunkedArray::TLocalDataAddress TSubColumnsArray::DoGetLocalData(
const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const {
auto it = BuildUnorderedIterator();
ui32 recordsCount = 0;
auto builder = NArrow::MakeBuilder(arrow::binary());
auto* binaryBuilder = static_cast<arrow::BinaryBuilder>(builder.get());
while (it.IsValid()) {
NJson::TJsonValue value;
auto onStartRecord = [](const ui32 index) {
AFL_VERIFY(recordsCount++ == index);
};
auto onFinishRecord = []() {
auto bJson = NBinaryJson::SerializeToBinaryJson(value.GetStringRobust());
if (const TString* val = std::get_if<TString>(&bJson)) {
AFL_VERIFY(false);
} else if (const NBinaryJson::TBinaryJson* val = std::get_if<NBinaryJson::TBinaryJson>(&bJson)) {
binaryBuilder->Append(val->data(), val->size());
} else {
AFL_VERIFY(false);
}
};
auto onRecordKV = [&](const ui32 index, const std::string_view value, const bool isColumn) {
if (isColumn) {
value.InsertValue(ColumnsData.GetStats().GetColumnName(index), value);
} else {
value.InsertValue(OthersData.GetStats().GetColumnName(index), value);
}
};
it.ReadRecords(1, onStartRecord, onRecordKV, onFinishRecord);
}
AFL_VERIFY(recordsCount == chunkCurrent.GetLength())("real", recordsCount)("expected", chunkCurrent.GetLength());
return TLocalDataAddress(NArrow::FinishBuilder(builder), 0, 0);
}

} // namespace NKikimr::NArrow::NAccessor
Loading

0 comments on commit 2de95c4

Please sign in to comment.