Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into jdbc-clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
WenDing-Y committed Jun 1, 2023
2 parents 3dc7c4a + 7a18a94 commit bced473
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 69 deletions.
1 change: 0 additions & 1 deletion flink-connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mssqlserver</artifactId>
<version>1.17.3</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,29 +497,36 @@ protected List<String> extractColumnValuesBySQL(
Predicate<String> filterFunc,
Object... params) {

List<String> columnValues = Lists.newArrayList();

try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
PreparedStatement ps = conn.prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
return extractColumnValuesByStatement(ps, columnIndex, filterFunc, params);

} catch (Exception e) {
throw new CatalogException(
String.format(
"The following SQL query could not be executed (%s): %s", connUrl, sql),
e);
}
}

protected static List<String> extractColumnValuesByStatement(
PreparedStatement ps, int columnIndex, Predicate<String> filterFunc, Object... params)
throws SQLException {
List<String> columnValues = Lists.newArrayList();
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
ResultSet rs = ps.executeQuery();
}
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
}
return columnValues;
} catch (Exception e) {
throw new CatalogException(
String.format(
"The following SQL query could not be executed (%s): %s", connUrl, sql),
e);
}
return columnValues;
}

protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/** Catalog for PostgreSQL. */
@Internal
Expand Down Expand Up @@ -107,31 +109,34 @@ public List<String> listTables(String databaseName)

List<String> tables = Lists.newArrayList();

// get all schemas
List<String> schemas =
extractColumnValuesBySQL(
baseUrl + databaseName,
"SELECT schema_name FROM information_schema.schemata;",
1,
pgSchema -> !builtinSchemas.contains(pgSchema));

// get all tables
for (String schema : schemas) {
// position 1 is database name, position 2 is schema name, position 3 is table name
List<String> pureTables =
extractColumnValuesBySQL(
baseUrl + databaseName,
final String url = baseUrl + databaseName;
try (Connection conn = DriverManager.getConnection(url, username, pwd)) {
// get all schemas
List<String> schemas;
try (PreparedStatement ps =
conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;")) {
schemas =
extractColumnValuesByStatement(
ps, 1, pgSchema -> !builtinSchemas.contains(pgSchema));
}

// get all tables
try (PreparedStatement ps =
conn.prepareStatement(
"SELECT * FROM information_schema.tables "
+ "WHERE table_type = 'BASE TABLE' "
+ "AND table_schema = ? "
+ "ORDER BY table_type, table_name;",
3,
null,
schema);
tables.addAll(
pureTables.stream()
+ "ORDER BY table_type, table_name;")) {
for (String schema : schemas) {
// Column index 1 is database name, 2 is schema name, 3 is table name
extractColumnValuesByStatement(ps, 3, null, schema).stream()
.map(pureTable -> schema + "." + pureTable)
.collect(Collectors.toList()));
.forEach(tables::add);
}
}
} catch (Exception e) {
throw new CatalogException(
String.format("Failed to list tables for database %s", databaseName), e);
}
return tables;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.flink.util.CollectionUtil;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.sql.Connection;
Expand All @@ -53,8 +51,6 @@
*/
class UnsignedTypeConversionITCase extends AbstractTestBase implements MySqlTestBase {

private static final Logger LOGGER =
LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
private static final String TABLE_SOURCE = "jdbc_source";
private static final String TABLE_SINK = "jdbc_sink";
private static final String TABLE_DATA = "data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void before() throws Exception {
}

@Test
void testJdbcSource() throws Exception {
void testJdbcSource() {
tEnv.executeSql(INPUT_TABLE.getCreateQueryForFlink(getMetadata(), INPUT_TABLE_NAME));
Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE_NAME).collect();
List<String> result =
Expand All @@ -123,7 +123,7 @@ void testJdbcSource() throws Exception {
}

@Test
void testProject() throws Exception {
void testProject() {
tEnv.executeSql(
INPUT_TABLE.getCreateQueryForFlink(
getMetadata(),
Expand Down Expand Up @@ -158,7 +158,7 @@ void testProject() throws Exception {
}

@Test
void testLimit() throws Exception {
void testLimit() {
tEnv.executeSql(
INPUT_TABLE.getCreateQueryForFlink(
getMetadata(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -39,8 +37,6 @@
/** Test base for {@link PostgresCatalog}. */
class PostgresCatalogTestBase implements JdbcITCaseBase, PostgresTestBase {

public static final Logger LOG = LoggerFactory.getLogger(PostgresCatalogTestBase.class);

private static DatabaseMetadata getStaticMetadata() {
return PostgresDatabase.getMetadata();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -41,15 +38,13 @@
/** Test for {@link JdbcCatalogFactory}. */
class JdbcCatalogFactoryTest implements PostgresTestBase {

public static final Logger LOG = LoggerFactory.getLogger(JdbcCatalogFactoryTest.class);

protected static String baseUrl;
protected static JdbcCatalog catalog;

protected static final String TEST_CATALOG_NAME = "mypg";

@BeforeEach
void setup() throws SQLException {
void setup() {
// jdbc:postgresql://localhost:50807/postgres?user=postgres
String jdbcUrl = getMetadata().getJdbcUrl();
// jdbc:postgresql://localhost:50807/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void before() throws SQLException {
}

@Test
void testJdbcSource() throws Exception {
void testJdbcSource() {
createFlinkTable();
Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE_NAME).collect();
List<String> result =
Expand All @@ -131,7 +131,7 @@ void testJdbcSource() throws Exception {
}

@Test
void testProject() throws Exception {
void testProject() {
createFlinkTable();
Iterator<Row> collected =
tEnv.executeSql("SELECT id,datetime_col,decimal_col FROM " + INPUT_TABLE_NAME)
Expand All @@ -151,7 +151,7 @@ void testProject() throws Exception {
}

@Test
void testFilter() throws Exception {
void testFilter() {
createFlinkTable();
Iterator<Row> collected =
tEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ void clearOutputTable() throws Exception {
try (Connection conn = getMetadata().getConnection();
Statement stat = conn.createStatement()) {
stat.execute("DELETE FROM " + OUTPUT_TABLE);

stat.close();
conn.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void setup() {
}

@Test
void testMaxRetry() throws Exception {
void testMaxRetry() {
assertThatThrownBy(
() -> {
format =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected List<Row> getTestData() {
}

@BeforeEach
void beforeAll() throws SQLException {
void beforeEach() throws SQLException {
try (Connection conn = getMetadata().getConnection()) {
inputTable.insertIntoTableValues(conn, getTestData());
}
Expand Down Expand Up @@ -153,7 +153,7 @@ void testProject() {
}

@Test
public void testLimit() throws Exception {
public void testLimit() {
String testTable = "testTable";
tEnv.executeSql(
inputTable.getCreateQueryForFlink(
Expand All @@ -174,7 +174,7 @@ public void testLimit() throws Exception {
}

@Test
public void testFilter() throws Exception {
public void testFilter() {
String testTable = "testTable";
tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));

Expand Down Expand Up @@ -271,7 +271,7 @@ public void testFilter() throws Exception {

@ParameterizedTest
@EnumSource(Caching.class)
void testLookupJoin(Caching caching) throws Exception {
void testLookupJoin(Caching caching) {
// Create JDBC lookup table
List<String> cachingOptions = Collections.emptyList();
if (caching.equals(Caching.ENABLE_CACHE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class JdbcOutputFormatTest extends JdbcDataTestBase {
private static InternalTypeInfo<RowData> rowDataTypeInfo = InternalTypeInfo.of(rowType);

@AfterEach
void tearDown() throws Exception {
void tearDown() {
if (outputFormat != null) {
outputFormat.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,4 @@ public interface SqlServerImages {
DockerImageName MSSQL_AZURE_SQL_EDGE =
DockerImageName.parse("mcr.microsoft.com/azure-sql-edge")
.asCompatibleSubstituteFor("mcr.microsoft.com/mssql/server");

String MSSQL_SERVER_2017 = "mcr.microsoft.com/mssql/server:2017-CU12";
String MSSQL_SERVER_2019 = "mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04";
}
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ under the License.
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.9.1</junit5.version>
<assertj.version>3.23.1</assertj.version>
<testcontainers.version>1.17.2</testcontainers.version>
<testcontainers.version>1.18.2</testcontainers.version>
<mockito.version>2.21.0</mockito.version>

<japicmp.referenceVersion>3.0.0-1.16</japicmp.referenceVersion>
Expand Down Expand Up @@ -293,6 +293,13 @@ under the License.
<version>2.1</version>
</dependency>

<!-- For dependency convergence -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.23.0</version>
</dependency>

<!-- For dependency convergence on mockito/powermock mismatch -->
<dependency>
<groupId>net.bytebuddy</groupId>
Expand Down

0 comments on commit bced473

Please sign in to comment.