Skip to content

Commit

Permalink
[Fixbug] doris custom sql work (apache#7464)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangshenghang authored and hawk9821 committed Aug 29, 2024
1 parent b300217 commit 89a5f3f
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,15 @@ public boolean isExistsData(TablePath tablePath) {
}
}

@Override
public void executeSql(TablePath tablePath, String sql) {
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.execute();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
}
}

@Override
public PreviewResult previewAction(
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ public class DorisIT extends AbstractDorisIT {
Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
};

@TestTemplate
public void testCustomSql(TestContainer container) throws IOException, InterruptedException {
initializeJdbcTable();
Container.ExecResult execResult =
container.executeJob("/doris_source_and_sink_with_custom_sql.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(101, tableCount(sinkDB, UNIQUE_TABLE));
clearUniqueTable();
}

@TestTemplate
public void testDoris(TestContainer container) throws IOException, InterruptedException {
initializeJdbcTable();
Expand Down Expand Up @@ -344,6 +354,20 @@ private void checkSourceAndSinkTableDate(String sourceSql, String sinkSql, Strin
Assertions.assertEquals(sourceResultSet.getRow(), sinkResultSet.getRow());
}

private Integer tableCount(String db, String table) {
try (Statement statement = conn.createStatement()) {
String sql = String.format("select count(*) from %s.%s", db, table);
ResultSet source = statement.executeQuery(sql);
if (source.next()) {
int rowCount = source.getInt(1);
return rowCount;
}
} catch (Exception e) {
throw new RuntimeException("Failed to check data in Doris server", e);
}
return -1;
}

private void assertHasData(String db, String table) {
try (Statement statement = conn.createStatement()) {
String sql = String.format("select * from %s.%s limit 1", db, table);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env{
parallelism = 1
job.mode = "BATCH"
job.retry.times = 0
}

source{
FakeSource {
row.num = 100
split.num = 10
string.length = 1
schema = {
fields {
F_ID = "bigint"
F_INT = "int"
F_BIGINT = "bigint"
F_TINYINT = "tinyint"
F_SMALLINT = "smallint"
F_DECIMAL = "decimal(10,2)"
F_LARGEINT = "bigint"
F_BOOLEAN = "boolean"
F_DOUBLE = "double"
F_FLOAT = "float"
F_CHAR = "string"
F_VARCHAR_11 = "string"
F_STRING = "string"
F_DATETIME_P = "timestamp"
F_DATETIME = "timestamp"
F_DATE = "date"
}
}
}
}

transform {}

sink{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
table.identifier = "e2e_sink.doris_e2e_unique_table"
data_save_mode=CUSTOM_PROCESSING
custom_sql="INSERT INTO e2e_sink.doris_e2e_unique_table ( F_ID,F_INT,F_BIGINT) VALUES (1, 123, 1234567890123);"
sink.enable-2pc = "true"
sink.buffer-size = 2
sink.buffer-count = 2
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
save_mode_create_template = """CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (${rowtype_fields}) ENGINE=OLAP unique KEY (`F_ID`) DISTRIBUTED BY HASH (`F_ID`) PROPERTIES ("replication_allocation" = "tag.location.default: 1")"""
}
}

0 comments on commit 89a5f3f

Please sign in to comment.