Skip to content

Commit

Permalink
YQL-17493: Wrap returned YQL-typed columns with ToPg (ydb-platform#1059)
Browse files Browse the repository at this point in the history
  • Loading branch information
marsaly79 authored Jan 17, 2024
1 parent 09f346f commit 2d2adb6
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 60 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
auto result = session.ExecuteSchemeQuery(R"(
CREATE TABLE test (id int8,PRIMARY KEY (id)))"
CREATE TABLE test (id int16,PRIMARY KEY (id)))"
).GetValueSync();

UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
Expand Down
113 changes: 59 additions & 54 deletions ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1699,53 +1699,74 @@ TExprNode::TPtr BuildProjectionLambda(TPositionHandle pos, const TExprNode::TPtr
.Param("row")
.Callable("AsStruct")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
ui32 index = 0;
THashMap<TString, TExprNode*> overrideColumns;
if (emitPgStar) {
for (const auto& x : result->Tail().Children()) {
if (x->HeadPtr()->IsAtom()) {
overrideColumns.emplace(TString(x->HeadPtr()->Content()), x.Get());
}
}
}
auto addAtomToList = [] (TExprNodeBuilder& listBuilder, TExprNode* x) -> void {
listBuilder.Add(0, x->HeadPtr());
listBuilder.Apply(1, x->TailPtr())
auto addResultItem = [] (TExprNodeBuilder& builder, ui32 idx, TExprNode* resultItem) {
builder.Apply(idx, resultItem->TailPtr())
.With(0, "row")
.Seal();
listBuilder.Seal();
};

auto addAtomToListWithCast = [&addAtomToList] (TExprNodeBuilder& listBuilder, TExprNode* x,
const TTypeAnnotationNode* expectedTypeNode) -> void {
auto actualType = x->GetTypeAnn()->Cast<TPgExprType>();
auto addStructMember = [] (TExprNodeBuilder& builder, ui32 idx, const TStringBuf& memberName) {
builder.Callable(idx, "Member")
.Arg(0, "row")
.Atom(1, memberName)
.Seal();
};

auto addPgCast = [&finalType, &columnNamesMap, &pos, &ctx] (TExprNodeBuilder& builder, ui32 idx, const TStringBuf& columnName,
const TTypeAnnotationNode* actualTypeNode, const std::function<void(TExprNodeBuilder&, ui32)>& buildCore) {

const auto expectedTypeNode = finalType->FindItemType(columnNamesMap[columnName]);
Y_ENSURE(expectedTypeNode);
const auto expectedType = expectedTypeNode->Cast<TPgExprType>();

if (actualType == expectedType) {
addAtomToList(listBuilder, x);
return;
}
listBuilder.Add(0, x->HeadPtr());
listBuilder.Callable(1, "PgCast")
.Apply(0, x->TailPtr())
.With(0, "row")
.Seal()
.Callable(1, "PgType")
ui32 actualPgTypeId;
bool convertToPg;
Y_ENSURE(ExtractPgType(actualTypeNode, actualPgTypeId, convertToPg, pos, ctx));

auto needPgCast = (expectedType->GetId() != actualPgTypeId);

if (convertToPg) {
Y_ENSURE(!needPgCast, TStringBuilder()
<< "Conversion to PG type is different at typization (" << expectedType->GetId()
<< ") and optimization (" << actualPgTypeId << ") stages.");

auto wrapBuilder = builder.Callable(idx, "ToPg");
buildCore(wrapBuilder, 0);
wrapBuilder.Seal();
} else if (needPgCast) {
auto wrapBuilder = builder.Callable(idx, "PgCast");
buildCore(wrapBuilder, 0);
wrapBuilder.Callable(1, "PgType")
.Atom(0, NPg::LookupType(expectedType->GetId()).Name)
.Seal();
listBuilder.Seal();
wrapBuilder.Seal();
} else {
buildCore(builder, idx);
}
builder.Seal();
};

ui32 index = 0;
THashMap<TString, TExprNode*> overrideColumns;
if (emitPgStar) {
for (const auto& x : result->Tail().Children()) {
if (x->HeadPtr()->IsAtom()) {
overrideColumns.emplace(TString(x->HeadPtr()->Content()), x.Get());
}
}
}

for (const auto& x : result->Tail().Children()) {
if (x->HeadPtr()->IsAtom()) {
if (!emitPgStar) {
const auto& columnName = x->Child(0)->Content();
auto listBuilder = parent.List(index++);
const auto expectedType = finalType->FindItemType(columnNamesMap[columnName]);
Y_ENSURE(expectedType);
addAtomToListWithCast(listBuilder, x.Get(), expectedType);
if (emitPgStar) {
continue;
}
const auto& columnName = x->Head().Content();

auto listBuilder = parent.List(index++);
listBuilder.Add(0, x->HeadPtr());
addPgCast(listBuilder, 1, columnName, x->GetTypeAnn(),
[&addResultItem, &x] (TExprNodeBuilder& builder, ui32 idx) { addResultItem(builder, idx, x.Get()); });
} else {
auto type = x->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
Y_ENSURE(type);
Expand All @@ -1755,31 +1776,15 @@ TExprNode::TPtr BuildProjectionLambda(TPositionHandle pos, const TExprNode::TPtr
auto columnName = subLink ? column : NTypeAnnImpl::RemoveAlias(column);

auto listBuilder = parent.List(index++);
if (overrideColumns.contains(columnName)) {
if (auto* columnNode = overrideColumns.FindPtr(columnName)) {
// we never get here while processing SELECTs,
// so no need to add PgCasts due to query combining with UNION ALL et al
addAtomToList(listBuilder, overrideColumns[columnName]);
listBuilder.Add(0, (*columnNode)->HeadPtr());
addResultItem(listBuilder, 1, *columnNode);
} else {
listBuilder.Atom(0, columnName);

const auto expectedType = finalType->FindItemType(columnNamesMap[columnName]);
Y_ENSURE(expectedType);
if (item->GetItemType() == expectedType) {
listBuilder.Callable(1, "Member")
.Arg(0, "row")
.Atom(1, column)
.Seal();
} else {
listBuilder.Callable(1, "PgCast")
.Callable(0, "Member")
.Arg(0, "row")
.Atom(1, column)
.Seal()
.Callable(1, "PgType")
.Atom(0, NPg::LookupType(expectedType->Cast<TPgExprType>()->GetId()).Name)
.Seal()
.Seal();
}
addPgCast(listBuilder, 1, columnName, item->GetItemType(),
[&addStructMember, &column] (TExprNodeBuilder& builder, ui32 idx) { addStructMember(builder, idx, column); });
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions ydb/library/yql/core/type_ann/type_ann_pg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,15 @@ IGraphTransformer::TStatus InferPgCommonType(TPositionHandle pos, const TExprNod
for (const auto& col : *childColumnOrder) {
auto itemIdx = structType->FindItem(col);
YQL_ENSURE(itemIdx);
pgTypes[j].push_back(structType->GetItems()[*itemIdx]->GetItemType()->Cast<TPgExprType>()->GetId());

const auto* type = structType->GetItems()[*itemIdx]->GetItemType();
ui32 pgType;
bool convertToPg;
if (!ExtractPgType(type, pgType, convertToPg, child->Pos(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

pgTypes[j].push_back(pgType);

++j;
}
Expand Down Expand Up @@ -4282,7 +4290,6 @@ IGraphTransformer::TStatus PgSelectWrapper(const TExprNode::TPtr& input, TExprNo
return IGraphTransformer::TStatus::Error;
}

// const TStructExprType* outputRowType = nullptr;
TExprNode* setItems = nullptr;
TExprNode* setOps = nullptr;
bool hasSort = false;
Expand Down Expand Up @@ -4343,9 +4350,6 @@ IGraphTransformer::TStatus PgSelectWrapper(const TExprNode::TPtr& input, TExprNo
}

setItems = &option->Tail();
} else {
/*outputRowType = */option->Tail().Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->
Cast<TStructExprType>();
}
} else if (optionName == "limit" || optionName == "offset") {
if (pass != 0) {
Expand Down
22 changes: 22 additions & 0 deletions ydb/library/yql/tests/sql/dq_file/part5/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -2210,6 +2210,28 @@
}
],
"test.test[pg-select_win_count-default.txt-Results]": [],
"test.test[pg-select_yql_type--Analyze]": [
{
"checksum": "8ff95379719438cd7e637f0c2b6945c5",
"size": 7622,
"uri": "https://{canondata_backend}/1031349/1e1ff3377d9e6463687741aa3509395b92a00445/resource.tar.gz#test.test_pg-select_yql_type--Analyze_/plan.txt"
}
],
"test.test[pg-select_yql_type--Debug]": [
{
"checksum": "3dde0f6689824d0af2cae2e11a23fe80",
"size": 2702,
"uri": "https://{canondata_backend}/1031349/1e1ff3377d9e6463687741aa3509395b92a00445/resource.tar.gz#test.test_pg-select_yql_type--Debug_/opt.yql_patched"
}
],
"test.test[pg-select_yql_type--Plan]": [
{
"checksum": "8ff95379719438cd7e637f0c2b6945c5",
"size": 7622,
"uri": "https://{canondata_backend}/1031349/1e1ff3377d9e6463687741aa3509395b92a00445/resource.tar.gz#test.test_pg-select_yql_type--Plan_/plan.txt"
}
],
"test.test[pg-select_yql_type--Results]": [],
"test.test[pg-str_lookup_pg-default.txt-Analyze]": [
{
"checksum": "a48ccc9922567dfee1170d2c2df45b6e",
Expand Down
7 changes: 7 additions & 0 deletions ydb/library/yql/tests/sql/sql2yql/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -12046,6 +12046,13 @@
"uri": "https://{canondata_backend}/1599023/af9c2f81df0601cf266a0926b5ce73b6101b9115/resource.tar.gz#test_sql2yql.test_pg-select_win_sum_null_/sql.yql"
}
],
"test_sql2yql.test[pg-select_yql_type]": [
{
"checksum": "7447752d11b2fc790e27004b7112fe31",
"size": 1975,
"uri": "https://{canondata_backend}/1937367/379ba745ff59ebd29c817a5b281c005a67df3be4/resource.tar.gz#test_sql2yql.test_pg-select_yql_type_/sql.yql"
}
],
"test_sql2yql.test[pg-set_of_as_records]": [
{
"checksum": "439ee1feaf9750daf11386fe54adbc05",
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/tests/sql/suites/pg/select_yql_type.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
in Input input_name.txt
15 changes: 15 additions & 0 deletions ydb/library/yql/tests/sql/suites/pg/select_yql_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
--!syntax_pg
SELECT
key, index, index + 1
FROM plato."Input"
ORDER BY index;

SELECT
*
FROM plato."Input"
ORDER BY index;

SELECT
t.*
FROM plato."Input" as t
ORDER BY index;
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,27 @@
"uri": "https://{canondata_backend}/1773845/f8e51a5dd98665ee7534cca6737c91ce5a753b8e/resource.tar.gz#test.test_pg-select_win_count-default.txt-Results_/results.txt"
}
],
"test.test[pg-select_yql_type--Debug]": [
{
"checksum": "f94603a4a31fd031cd1be6adf9bde1fd",
"size": 3816,
"uri": "https://{canondata_backend}/1031349/2368626953942b3adafcb76228785c3ee4e852f5/resource.tar.gz#test.test_pg-select_yql_type--Debug_/opt.yql"
}
],
"test.test[pg-select_yql_type--Plan]": [
{
"checksum": "5889de5da2fd54b7327aaf12f536e2d9",
"size": 6736,
"uri": "https://{canondata_backend}/1031349/2368626953942b3adafcb76228785c3ee4e852f5/resource.tar.gz#test.test_pg-select_yql_type--Plan_/plan.txt"
}
],
"test.test[pg-select_yql_type--Results]": [
{
"checksum": "aa4c3b838298a4c407b27345733a5fa1",
"size": 3812,
"uri": "https://{canondata_backend}/1031349/2368626953942b3adafcb76228785c3ee4e852f5/resource.tar.gz#test.test_pg-select_yql_type--Results_/results.txt"
}
],
"test.test[pg-str_lookup_pg-default.txt-Debug]": [
{
"checksum": "66a86824b404ec795db5472f15e7ef6d",
Expand Down

0 comments on commit 2d2adb6

Please sign in to comment.