From 8a81a3364b2fab7ae45726d667460ddebf8f6ac7 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Thu, 9 Jun 2022 22:11:28 +0300 Subject: [PATCH 01/11] Postgres Source: fixed unsupposted date-time datatypes during incremental sync --- .../postgres/PostgresSourceOperations.java | 58 ++++++++++++++++--- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 4d8247798a792..6901e416e2d4d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -27,9 +27,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; +import java.time.*; import java.util.Collections; import org.postgresql.jdbc.PgResultSetMetaData; import org.slf4j.Logger; @@ -79,15 +77,57 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { } @Override - protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { - try { - Date date = Date.valueOf(value); - preparedStatement.setDate(parameterIndex, date); - } catch (final Exception e) { - throw new RuntimeException(e); + public void setStatementField(final PreparedStatement preparedStatement, + final int parameterIndex, + final JDBCType cursorFieldType, + final String value) + throws SQLException { + switch (cursorFieldType) { + + case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value); + case TIMESTAMP_WITH_TIMEZONE -> setTimestampWithTimezone(preparedStatement, parameterIndex, value); + case TIME -> setTime(preparedStatement, parameterIndex, value); + case TIME_WITH_TIMEZONE -> setTimeWithTimezone(preparedStatement, parameterIndex, value); + case DATE -> setDate(preparedStatement, parameterIndex, value); + case BIT -> setBit(preparedStatement, parameterIndex, value); + case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value); + case TINYINT, SMALLINT -> setShortInt(preparedStatement, parameterIndex, value); + case INTEGER -> setInteger(preparedStatement, parameterIndex, value); + case BIGINT -> setBigInteger(preparedStatement, parameterIndex, value); + case FLOAT, DOUBLE -> setDouble(preparedStatement, parameterIndex, value); + case REAL -> setReal(preparedStatement, parameterIndex, value); + case NUMERIC, DECIMAL -> setDecimal(preparedStatement, parameterIndex, value); + case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> setString(preparedStatement, parameterIndex, value); + case BINARY, BLOB -> setBinary(preparedStatement, parameterIndex, value); + // since cursor are expected to be comparable, handle cursor typing strictly and error on + // unrecognized types + default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType)); } } + private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + } + + private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } + + @Override + protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } + + @Override + protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + } + + @Override + protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalDate.parse(value)); + } + @Override public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); From 84d9f893a694fa89331beb068ef394c0ce7d9df7 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Thu, 9 Jun 2022 22:21:03 +0300 Subject: [PATCH 02/11] updated CHANGELOG --- .../source/postgres/PostgresSourceOperations.java | 7 +++++-- docs/integrations/sources/postgres.md | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 6901e416e2d4d..23010e34a07c3 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -21,13 +21,16 @@ import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; -import java.sql.Date; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.time.*; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; import java.util.Collections; import org.postgresql.jdbc.PgResultSetMetaData; import org.slf4j.Logger; diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index d7a3f8884de52..4b3781a3e55ce 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -275,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 0.4.22 | 2022-06-09 | [13655](https://github.com/airbytehq/airbyte/pull/13655) | Fixed bug with unsupported date-time datatypes during incremental sync | | 0.4.21 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size | | 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format | | 0.4.19 | 2022-05-25 | [13166](https://github.com/airbytehq/airbyte/pull/13166) | Added timezone awareness and handle BC dates | From 15c8918925ba4c42f61ad82b0bf7194714dad17a Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Fri, 10 Jun 2022 08:18:43 +0300 Subject: [PATCH 03/11] add tests for incremental cursor check --- .../jdbc/test/JdbcSourceAcceptanceTest.java | 119 +++++---- .../postgres/PostgresSourceOperations.java | 2 +- .../PostgresJdbcSourceAcceptanceTest.java | 247 ++++++++++++++++-- 3 files changed, 300 insertions(+), 68 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index be29c888993f9..adc8589e617b0 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -388,6 +388,13 @@ void testReadOneColumn() throws Exception { setEmittedAtToNull(actualMessages); + final List expectedMessages = getAirbyteMessagesReadOneColumn(); + assertTrue(expectedMessages.size() == actualMessages.size()); + assertTrue(expectedMessages.containsAll(actualMessages)); + assertTrue(actualMessages.containsAll(expectedMessages)); + } + + protected List getAirbyteMessagesReadOneColumn() { final List expectedMessages = getTestMessages().stream() .map(Jsons::clone) .peek(m -> { @@ -397,9 +404,7 @@ void testReadOneColumn() throws Exception { convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); }) .collect(Collectors.toList()); - assertTrue(expectedMessages.size() == actualMessages.size()); - assertTrue(expectedMessages.containsAll(actualMessages)); - assertTrue(actualMessages.containsAll(expectedMessages)); + return expectedMessages; } @Test @@ -432,17 +437,7 @@ void testReadMultipleTables() throws Exception { Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING))); - final List secondStreamExpectedMessages = getTestMessages() - .stream() - .map(Jsons::clone) - .peek(m -> { - m.getRecord().setStream(streamName2); - m.getRecord().setNamespace(getDefaultNamespace()); - ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).replace(COL_ID, - convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); - }) - .collect(Collectors.toList()); + final List secondStreamExpectedMessages = getAirbyteMessagesSecondSync(streamName2); expectedMessages.addAll(secondStreamExpectedMessages); } @@ -456,6 +451,21 @@ void testReadMultipleTables() throws Exception { assertTrue(actualMessages.containsAll(expectedMessages)); } + protected List getAirbyteMessagesSecondSync(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + m.getRecord().setNamespace(getDefaultNamespace()); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + + } + @Test void testTablesWithQuoting() throws Exception { final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces(); @@ -469,7 +479,17 @@ void testTablesWithQuoting() throws Exception { setEmittedAtToNull(actualMessages); - final List secondStreamExpectedMessages = getTestMessages() + final List secondStreamExpectedMessages = getAirbyteMessagesForTablesWithQuoting(streamForTableWithSpaces); + final List expectedMessages = new ArrayList<>(getTestMessages()); + expectedMessages.addAll(secondStreamExpectedMessages); + + assertTrue(expectedMessages.size() == actualMessages.size()); + assertTrue(expectedMessages.containsAll(actualMessages)); + assertTrue(actualMessages.containsAll(expectedMessages)); + } + + protected List getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) { + return getTestMessages() .stream() .map(Jsons::clone) .peek(m -> { @@ -481,12 +501,6 @@ void testTablesWithQuoting() throws Exception { convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); }) .collect(Collectors.toList()); - final List expectedMessages = new ArrayList<>(getTestMessages()); - expectedMessages.addAll(secondStreamExpectedMessages); - - assertTrue(expectedMessages.size() == actualMessages.size()); - assertTrue(expectedMessages.containsAll(actualMessages)); - assertTrue(actualMessages.containsAll(expectedMessages)); } @SuppressWarnings("ResultOfMethodCallIgnored") @@ -532,6 +546,17 @@ void testIncrementalStringCheckCursor() throws Exception { void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception { final ConfiguredAirbyteStream streamWithSpaces = createTableWithSpaces(); + final ArrayList expectedRecordMessages = getAirbyteMessagesCheckCursorSpaceInColumnName(streamWithSpaces); + incrementalCursorCheck( + COL_LAST_NAME_WITH_SPACE, + COL_LAST_NAME_WITH_SPACE, + "patent", + "vash", + expectedRecordMessages, + streamWithSpaces); + } + + protected ArrayList getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) { final AirbyteMessage firstMessage = getTestMessages().get(0); firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT); @@ -546,13 +571,7 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception { Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2)); - incrementalCursorCheck( - COL_LAST_NAME_WITH_SPACE, - COL_LAST_NAME_WITH_SPACE, - "patent", - "vash", - Lists.newArrayList(firstMessage, secondMessage), - streamWithSpaces); + return Lists.newArrayList(firstMessage, secondMessage); } @Test @@ -600,14 +619,7 @@ void testReadOneTableIncrementallyTwice() throws Exception { .filter(r -> r.getType() == Type.STATE).findFirst(); assertTrue(stateAfterFirstSyncOptional.isPresent()); - database.execute(connection -> { - connection.createStatement().execute( - String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME))); - connection.createStatement().execute( - String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME))); - }); + executeStatementReadIncrementallyTwice(); final List actualMessagesSecondSync = MoreIterators .toList(source.read(config, configuredCatalog, @@ -624,6 +636,17 @@ void testReadOneTableIncrementallyTwice() throws Exception { assertTrue(actualMessagesSecondSync.containsAll(expectedMessages)); } + protected void executeStatementReadIncrementallyTwice() throws SQLException { + database.execute(connection -> { + connection.createStatement().execute( + String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')", + getFullyQualifiedTableName(TABLE_NAME))); + }); + } + protected List getExpectedAirbyteMessagesSecondSync(String namespace) { final List expectedMessages = new ArrayList<>(); expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) @@ -696,16 +719,7 @@ void testReadMultipleTablesIncrementally() throws Exception { // we know the second streams messages are the same as the first minus the updated at column. so we // cheat and generate the expected messages off of the first expected messages. - final List secondStreamExpectedMessages = getTestMessages() - .stream() - .map(Jsons::clone) - .peek(m -> { - m.getRecord().setStream(streamName2); - ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).replace(COL_ID, - convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); - }) - .collect(Collectors.toList()); + final List secondStreamExpectedMessages = getAirbyteMessagesSecondStreamWithNamespace(streamName2); final List expectedMessagesFirstSync = new ArrayList<>(getTestMessages()); expectedMessagesFirstSync.add(new AirbyteMessage() .withType(Type.STATE) @@ -748,6 +762,19 @@ void testReadMultipleTablesIncrementally() throws Exception { assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync)); } + protected List getAirbyteMessagesSecondStreamWithNamespace(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + // when initial and final cursor fields are the same. protected void incrementalCursorCheck( final String cursorField, diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 23010e34a07c3..798286efb2977 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -84,7 +84,7 @@ public void setStatementField(final PreparedStatement preparedStatement, final int parameterIndex, final JDBCType cursorFieldType, final String value) - throws SQLException { + throws SQLException { switch (cursorFieldType) { case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index bb25b4493fc20..cc7814ac3b147 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -14,25 +15,23 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; +import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.JdbcSourceOperations; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.SyncMode; +import io.airbyte.protocol.models.*; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.JDBCType; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -43,8 +42,8 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { private static PostgreSQLContainer PSQL_DB; - - private JsonNode config; + public static String COL_WAKEUP = "wakeup"; + public static String COL_BIRTH = "birth"; @BeforeAll static void init() { @@ -55,6 +54,9 @@ static void init() { @BeforeEach public void setup() throws Exception { final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); + COLUMN_CLAUSE_WITH_PK = "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; + COLUMN_CLAUSE_WITHOUT_PK = "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; + COLUMN_CLAUSE_WITH_COMPOSITE_PK = "first_name VARCHAR(200), last_name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; config = Jsons.jsonNode(ImmutableMap.builder() .put("host", PSQL_DB.getHost()) @@ -70,7 +72,164 @@ public void setup() throws Exception { final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB); - super.setup(); + source = getSource(); + final JsonNode jdbcConfig = getToDatabaseConfigFunction().apply(config); + + streamName = TABLE_NAME; + + dataSource = DataSourceFactory.create( + jdbcConfig.get("username").asText(), + jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, + getDriverClass(), + jdbcConfig.get("jdbc_url").asText(), + JdbcUtils.parseJdbcParameters(jdbcConfig, "connection_properties", getJdbcParameterDelimiter())); + + database = new StreamingJdbcDatabase(dataSource, + JdbcUtils.getDefaultSourceOperations(), + AdaptiveStreamingQueryConfig::new); + + createSchemas(); + + database.execute(connection -> { + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME), COLUMN_CLAUSE_WITH_PK, + primaryKeyClause(Collections.singletonList("id")))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (1,'picard', '2004-10-19','10:10:10.123456-05:00','2004-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (3, 'vash', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK), + COLUMN_CLAUSE_WITHOUT_PK, "")); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (1,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (3, 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK), + COLUMN_CLAUSE_WITH_COMPOSITE_PK, + primaryKeyClause(ImmutableList.of("first_name", "last_name")))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('first' ,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('second', 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('third', 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + + }); + + } + + @Override + protected List getAirbyteMessagesReadOneColumn() { + return getTestMessages().stream() + .map(Jsons::clone) + .peek(m -> { + ((ObjectNode) m.getRecord().getData()).remove(COL_NAME); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + @Override + protected ArrayList getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) { + final AirbyteMessage firstMessage = getTestMessages().get(0); + firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) firstMessage.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_NAME)); + + final AirbyteMessage secondMessage = getTestMessages().get(2); + secondMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) secondMessage.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_NAME)); + + Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2)); + + return Lists.newArrayList(firstMessage, secondMessage); + } + + @Override + protected List getAirbyteMessagesSecondSync(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + m.getRecord().setNamespace(getDefaultNamespace()); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + protected List getAirbyteMessagesSecondStreamWithNamespace(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + protected List getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamForTableWithSpaces.getStream().getName()); + ((ObjectNode) m.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) m.getRecord().getData()).remove(COL_NAME)); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); } @Override @@ -114,20 +273,38 @@ protected List getTestMessages() { .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_1, COL_NAME, "picard", - COL_UPDATED_AT, "2004-10-19")))), + COL_UPDATED_AT, "2004-10-19", + COL_WAKEUP, "10:10:10.123456-05:00", + COL_BIRTH, "2004-10-19T17:23:54.123456Z")))), new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_2, COL_NAME, "crusher", - COL_UPDATED_AT, - "2005-10-19")))), + COL_UPDATED_AT, "2005-10-19", + COL_WAKEUP, "11:11:11.123456-05:00", + COL_BIRTH, "2005-10-19T17:23:54.123456Z")))), new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_3, COL_NAME, "vash", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP, "12:12:12.123456-05:00", + COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); + } + + protected void executeStatementReadIncrementallyTwice() throws SQLException { + database.execute(connection -> { + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (4,'riker', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (5, 'data', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + }); } @Override @@ -138,7 +315,9 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), CatalogHelpers.createAirbyteStream( @@ -146,7 +325,9 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(Collections.emptyList()), CatalogHelpers.createAirbyteStream( @@ -154,7 +335,9 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), Field.of(COL_LAST_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey( List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); @@ -169,6 +352,24 @@ protected void incrementalTimestampCheck() throws Exception { getTestMessages().get(2))); } + @Test + void incrementalTimeTzCheck() throws Exception { + super.incrementalCursorCheck(COL_WAKEUP, + "11:09:11.123456-05:00", + "12:12:12.123456-05:00", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + + @Test + void incrementalTimestampTzCheck() throws Exception { + super.incrementalCursorCheck(COL_BIRTH, + "2005-10-18T17:23:54.123456Z", + "2006-10-19T17:23:54.123456Z", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + @Override protected JdbcSourceOperations getSourceOperations() { return new PostgresSourceOperations(); @@ -182,13 +383,17 @@ protected List getExpectedAirbyteMessagesSecondSync(String names .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_4, COL_NAME, "riker", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP, "12:12:12.123456-05:00", + COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_5, COL_NAME, "data", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP, "12:12:12.123456-05:00", + COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); expectedMessages.add(new AirbyteMessage() .withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage() From eb72e6ad506f5f868dcbffb7394dab243fdda742 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Fri, 10 Jun 2022 08:23:09 +0300 Subject: [PATCH 04/11] removed star import --- .../postgres/PostgresJdbcSourceAcceptanceTest.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index cc7814ac3b147..1bf9187d7eb99 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -24,7 +24,16 @@ import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; -import io.airbyte.protocol.models.*; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.JDBCType; import java.sql.SQLException; From 40c123ca90d24075fd66ff4a14e8d7d4fdbde08d Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Thu, 9 Jun 2022 22:11:28 +0300 Subject: [PATCH 05/11] Postgres Source: fixed unsupposted date-time datatypes during incremental sync --- .../postgres/PostgresSourceOperations.java | 58 ++++++++++++++++--- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 4d8247798a792..6901e416e2d4d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -27,9 +27,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; +import java.time.*; import java.util.Collections; import org.postgresql.jdbc.PgResultSetMetaData; import org.slf4j.Logger; @@ -79,15 +77,57 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { } @Override - protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { - try { - Date date = Date.valueOf(value); - preparedStatement.setDate(parameterIndex, date); - } catch (final Exception e) { - throw new RuntimeException(e); + public void setStatementField(final PreparedStatement preparedStatement, + final int parameterIndex, + final JDBCType cursorFieldType, + final String value) + throws SQLException { + switch (cursorFieldType) { + + case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value); + case TIMESTAMP_WITH_TIMEZONE -> setTimestampWithTimezone(preparedStatement, parameterIndex, value); + case TIME -> setTime(preparedStatement, parameterIndex, value); + case TIME_WITH_TIMEZONE -> setTimeWithTimezone(preparedStatement, parameterIndex, value); + case DATE -> setDate(preparedStatement, parameterIndex, value); + case BIT -> setBit(preparedStatement, parameterIndex, value); + case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value); + case TINYINT, SMALLINT -> setShortInt(preparedStatement, parameterIndex, value); + case INTEGER -> setInteger(preparedStatement, parameterIndex, value); + case BIGINT -> setBigInteger(preparedStatement, parameterIndex, value); + case FLOAT, DOUBLE -> setDouble(preparedStatement, parameterIndex, value); + case REAL -> setReal(preparedStatement, parameterIndex, value); + case NUMERIC, DECIMAL -> setDecimal(preparedStatement, parameterIndex, value); + case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> setString(preparedStatement, parameterIndex, value); + case BINARY, BLOB -> setBinary(preparedStatement, parameterIndex, value); + // since cursor are expected to be comparable, handle cursor typing strictly and error on + // unrecognized types + default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType)); } } + private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + } + + private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } + + @Override + protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } + + @Override + protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + } + + @Override + protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalDate.parse(value)); + } + @Override public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); From fc5eadb2cadf84b00968534ae0525af0124397ef Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Thu, 9 Jun 2022 22:21:03 +0300 Subject: [PATCH 06/11] updated CHANGELOG --- .../source/postgres/PostgresSourceOperations.java | 7 +++++-- docs/integrations/sources/postgres.md | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 6901e416e2d4d..23010e34a07c3 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -21,13 +21,16 @@ import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; -import java.sql.Date; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.time.*; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; import java.util.Collections; import org.postgresql.jdbc.PgResultSetMetaData; import org.slf4j.Logger; diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index d7a3f8884de52..4b3781a3e55ce 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -275,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 0.4.22 | 2022-06-09 | [13655](https://github.com/airbytehq/airbyte/pull/13655) | Fixed bug with unsupported date-time datatypes during incremental sync | | 0.4.21 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size | | 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format | | 0.4.19 | 2022-05-25 | [13166](https://github.com/airbytehq/airbyte/pull/13166) | Added timezone awareness and handle BC dates | From 52da9a9a82c2a14add0c809cf1073fbb32e7ce2c Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Fri, 10 Jun 2022 08:18:43 +0300 Subject: [PATCH 07/11] add tests for incremental cursor check --- .../jdbc/test/JdbcSourceAcceptanceTest.java | 119 +++++---- .../postgres/PostgresSourceOperations.java | 2 +- .../PostgresJdbcSourceAcceptanceTest.java | 247 ++++++++++++++++-- 3 files changed, 300 insertions(+), 68 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index be29c888993f9..adc8589e617b0 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -388,6 +388,13 @@ void testReadOneColumn() throws Exception { setEmittedAtToNull(actualMessages); + final List expectedMessages = getAirbyteMessagesReadOneColumn(); + assertTrue(expectedMessages.size() == actualMessages.size()); + assertTrue(expectedMessages.containsAll(actualMessages)); + assertTrue(actualMessages.containsAll(expectedMessages)); + } + + protected List getAirbyteMessagesReadOneColumn() { final List expectedMessages = getTestMessages().stream() .map(Jsons::clone) .peek(m -> { @@ -397,9 +404,7 @@ void testReadOneColumn() throws Exception { convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); }) .collect(Collectors.toList()); - assertTrue(expectedMessages.size() == actualMessages.size()); - assertTrue(expectedMessages.containsAll(actualMessages)); - assertTrue(actualMessages.containsAll(expectedMessages)); + return expectedMessages; } @Test @@ -432,17 +437,7 @@ void testReadMultipleTables() throws Exception { Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING))); - final List secondStreamExpectedMessages = getTestMessages() - .stream() - .map(Jsons::clone) - .peek(m -> { - m.getRecord().setStream(streamName2); - m.getRecord().setNamespace(getDefaultNamespace()); - ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).replace(COL_ID, - convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); - }) - .collect(Collectors.toList()); + final List secondStreamExpectedMessages = getAirbyteMessagesSecondSync(streamName2); expectedMessages.addAll(secondStreamExpectedMessages); } @@ -456,6 +451,21 @@ void testReadMultipleTables() throws Exception { assertTrue(actualMessages.containsAll(expectedMessages)); } + protected List getAirbyteMessagesSecondSync(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + m.getRecord().setNamespace(getDefaultNamespace()); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + + } + @Test void testTablesWithQuoting() throws Exception { final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces(); @@ -469,7 +479,17 @@ void testTablesWithQuoting() throws Exception { setEmittedAtToNull(actualMessages); - final List secondStreamExpectedMessages = getTestMessages() + final List secondStreamExpectedMessages = getAirbyteMessagesForTablesWithQuoting(streamForTableWithSpaces); + final List expectedMessages = new ArrayList<>(getTestMessages()); + expectedMessages.addAll(secondStreamExpectedMessages); + + assertTrue(expectedMessages.size() == actualMessages.size()); + assertTrue(expectedMessages.containsAll(actualMessages)); + assertTrue(actualMessages.containsAll(expectedMessages)); + } + + protected List getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) { + return getTestMessages() .stream() .map(Jsons::clone) .peek(m -> { @@ -481,12 +501,6 @@ void testTablesWithQuoting() throws Exception { convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); }) .collect(Collectors.toList()); - final List expectedMessages = new ArrayList<>(getTestMessages()); - expectedMessages.addAll(secondStreamExpectedMessages); - - assertTrue(expectedMessages.size() == actualMessages.size()); - assertTrue(expectedMessages.containsAll(actualMessages)); - assertTrue(actualMessages.containsAll(expectedMessages)); } @SuppressWarnings("ResultOfMethodCallIgnored") @@ -532,6 +546,17 @@ void testIncrementalStringCheckCursor() throws Exception { void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception { final ConfiguredAirbyteStream streamWithSpaces = createTableWithSpaces(); + final ArrayList expectedRecordMessages = getAirbyteMessagesCheckCursorSpaceInColumnName(streamWithSpaces); + incrementalCursorCheck( + COL_LAST_NAME_WITH_SPACE, + COL_LAST_NAME_WITH_SPACE, + "patent", + "vash", + expectedRecordMessages, + streamWithSpaces); + } + + protected ArrayList getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) { final AirbyteMessage firstMessage = getTestMessages().get(0); firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT); @@ -546,13 +571,7 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception { Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2)); - incrementalCursorCheck( - COL_LAST_NAME_WITH_SPACE, - COL_LAST_NAME_WITH_SPACE, - "patent", - "vash", - Lists.newArrayList(firstMessage, secondMessage), - streamWithSpaces); + return Lists.newArrayList(firstMessage, secondMessage); } @Test @@ -600,14 +619,7 @@ void testReadOneTableIncrementallyTwice() throws Exception { .filter(r -> r.getType() == Type.STATE).findFirst(); assertTrue(stateAfterFirstSyncOptional.isPresent()); - database.execute(connection -> { - connection.createStatement().execute( - String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME))); - connection.createStatement().execute( - String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME))); - }); + executeStatementReadIncrementallyTwice(); final List actualMessagesSecondSync = MoreIterators .toList(source.read(config, configuredCatalog, @@ -624,6 +636,17 @@ void testReadOneTableIncrementallyTwice() throws Exception { assertTrue(actualMessagesSecondSync.containsAll(expectedMessages)); } + protected void executeStatementReadIncrementallyTwice() throws SQLException { + database.execute(connection -> { + connection.createStatement().execute( + String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')", + getFullyQualifiedTableName(TABLE_NAME))); + }); + } + protected List getExpectedAirbyteMessagesSecondSync(String namespace) { final List expectedMessages = new ArrayList<>(); expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) @@ -696,16 +719,7 @@ void testReadMultipleTablesIncrementally() throws Exception { // we know the second streams messages are the same as the first minus the updated at column. so we // cheat and generate the expected messages off of the first expected messages. - final List secondStreamExpectedMessages = getTestMessages() - .stream() - .map(Jsons::clone) - .peek(m -> { - m.getRecord().setStream(streamName2); - ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).replace(COL_ID, - convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); - }) - .collect(Collectors.toList()); + final List secondStreamExpectedMessages = getAirbyteMessagesSecondStreamWithNamespace(streamName2); final List expectedMessagesFirstSync = new ArrayList<>(getTestMessages()); expectedMessagesFirstSync.add(new AirbyteMessage() .withType(Type.STATE) @@ -748,6 +762,19 @@ void testReadMultipleTablesIncrementally() throws Exception { assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync)); } + protected List getAirbyteMessagesSecondStreamWithNamespace(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + // when initial and final cursor fields are the same. protected void incrementalCursorCheck( final String cursorField, diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 23010e34a07c3..798286efb2977 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -84,7 +84,7 @@ public void setStatementField(final PreparedStatement preparedStatement, final int parameterIndex, final JDBCType cursorFieldType, final String value) - throws SQLException { + throws SQLException { switch (cursorFieldType) { case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index bb25b4493fc20..cc7814ac3b147 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -14,25 +15,23 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; +import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.JdbcSourceOperations; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.SyncMode; +import io.airbyte.protocol.models.*; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.JDBCType; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -43,8 +42,8 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { private static PostgreSQLContainer PSQL_DB; - - private JsonNode config; + public static String COL_WAKEUP = "wakeup"; + public static String COL_BIRTH = "birth"; @BeforeAll static void init() { @@ -55,6 +54,9 @@ static void init() { @BeforeEach public void setup() throws Exception { final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); + COLUMN_CLAUSE_WITH_PK = "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; + COLUMN_CLAUSE_WITHOUT_PK = "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; + COLUMN_CLAUSE_WITH_COMPOSITE_PK = "first_name VARCHAR(200), last_name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; config = Jsons.jsonNode(ImmutableMap.builder() .put("host", PSQL_DB.getHost()) @@ -70,7 +72,164 @@ public void setup() throws Exception { final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB); - super.setup(); + source = getSource(); + final JsonNode jdbcConfig = getToDatabaseConfigFunction().apply(config); + + streamName = TABLE_NAME; + + dataSource = DataSourceFactory.create( + jdbcConfig.get("username").asText(), + jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, + getDriverClass(), + jdbcConfig.get("jdbc_url").asText(), + JdbcUtils.parseJdbcParameters(jdbcConfig, "connection_properties", getJdbcParameterDelimiter())); + + database = new StreamingJdbcDatabase(dataSource, + JdbcUtils.getDefaultSourceOperations(), + AdaptiveStreamingQueryConfig::new); + + createSchemas(); + + database.execute(connection -> { + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME), COLUMN_CLAUSE_WITH_PK, + primaryKeyClause(Collections.singletonList("id")))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (1,'picard', '2004-10-19','10:10:10.123456-05:00','2004-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (3, 'vash', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK), + COLUMN_CLAUSE_WITHOUT_PK, "")); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (1,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (3, 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK), + COLUMN_CLAUSE_WITH_COMPOSITE_PK, + primaryKeyClause(ImmutableList.of("first_name", "last_name")))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('first' ,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('second', 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('third', 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + + }); + + } + + @Override + protected List getAirbyteMessagesReadOneColumn() { + return getTestMessages().stream() + .map(Jsons::clone) + .peek(m -> { + ((ObjectNode) m.getRecord().getData()).remove(COL_NAME); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + @Override + protected ArrayList getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) { + final AirbyteMessage firstMessage = getTestMessages().get(0); + firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) firstMessage.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_NAME)); + + final AirbyteMessage secondMessage = getTestMessages().get(2); + secondMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) secondMessage.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_NAME)); + + Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2)); + + return Lists.newArrayList(firstMessage, secondMessage); + } + + @Override + protected List getAirbyteMessagesSecondSync(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + m.getRecord().setNamespace(getDefaultNamespace()); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + protected List getAirbyteMessagesSecondStreamWithNamespace(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + protected List getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamForTableWithSpaces.getStream().getName()); + ((ObjectNode) m.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) m.getRecord().getData()).remove(COL_NAME)); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); } @Override @@ -114,20 +273,38 @@ protected List getTestMessages() { .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_1, COL_NAME, "picard", - COL_UPDATED_AT, "2004-10-19")))), + COL_UPDATED_AT, "2004-10-19", + COL_WAKEUP, "10:10:10.123456-05:00", + COL_BIRTH, "2004-10-19T17:23:54.123456Z")))), new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_2, COL_NAME, "crusher", - COL_UPDATED_AT, - "2005-10-19")))), + COL_UPDATED_AT, "2005-10-19", + COL_WAKEUP, "11:11:11.123456-05:00", + COL_BIRTH, "2005-10-19T17:23:54.123456Z")))), new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_3, COL_NAME, "vash", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP, "12:12:12.123456-05:00", + COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); + } + + protected void executeStatementReadIncrementallyTwice() throws SQLException { + database.execute(connection -> { + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (4,'riker', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (5, 'data', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + getFullyQualifiedTableName(TABLE_NAME))); + }); } @Override @@ -138,7 +315,9 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), CatalogHelpers.createAirbyteStream( @@ -146,7 +325,9 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(Collections.emptyList()), CatalogHelpers.createAirbyteStream( @@ -154,7 +335,9 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), Field.of(COL_LAST_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey( List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); @@ -169,6 +352,24 @@ protected void incrementalTimestampCheck() throws Exception { getTestMessages().get(2))); } + @Test + void incrementalTimeTzCheck() throws Exception { + super.incrementalCursorCheck(COL_WAKEUP, + "11:09:11.123456-05:00", + "12:12:12.123456-05:00", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + + @Test + void incrementalTimestampTzCheck() throws Exception { + super.incrementalCursorCheck(COL_BIRTH, + "2005-10-18T17:23:54.123456Z", + "2006-10-19T17:23:54.123456Z", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + @Override protected JdbcSourceOperations getSourceOperations() { return new PostgresSourceOperations(); @@ -182,13 +383,17 @@ protected List getExpectedAirbyteMessagesSecondSync(String names .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_4, COL_NAME, "riker", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP, "12:12:12.123456-05:00", + COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_5, COL_NAME, "data", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP, "12:12:12.123456-05:00", + COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); expectedMessages.add(new AirbyteMessage() .withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage() From c34dff8671a896c96c6244a9084942696bec31dd Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Fri, 10 Jun 2022 08:23:09 +0300 Subject: [PATCH 08/11] removed star import --- .../postgres/PostgresJdbcSourceAcceptanceTest.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index cc7814ac3b147..1bf9187d7eb99 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -24,7 +24,16 @@ import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; -import io.airbyte.protocol.models.*; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.JDBCType; import java.sql.SQLException; From 3738f11202e4e82696891f07c4cb1d1199df849c Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Sat, 11 Jun 2022 18:26:56 +0300 Subject: [PATCH 09/11] add timestamp datatype test --- .../jdbc/test/JdbcSourceAcceptanceTest.java | 6 +- ...StrictEncryptJdbcSourceAcceptanceTest.java | 2 +- .../PostgresJdbcSourceAcceptanceTest.java | 121 +++++++++++------- 3 files changed, 78 insertions(+), 51 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index adc8589e617b0..802d8ac79bc72 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -575,11 +575,11 @@ protected ArrayList getAirbyteMessagesCheckCursorSpaceInColumnNa } @Test - void testIncrementalTimestampCheckCursor() throws Exception { - incrementalTimestampCheck(); + void testIncrementalDateCheckCursor() throws Exception { + incrementalDateCheck(); } - protected void incrementalTimestampCheck() throws Exception { + protected void incrementalDateCheck() throws Exception { incrementalCursorCheck( COL_UPDATED_AT, "2005-10-18T00:00:00Z", diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java index ba0d126de1fef..aa7cda5d248c2 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java @@ -164,7 +164,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { } @Override - protected void incrementalTimestampCheck() throws Exception { + protected void incrementalDateCheck() throws Exception { super.incrementalCursorCheck(COL_UPDATED_AT, "2005-10-18", "2006-10-19", diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index 1bf9187d7eb99..459a44fa86e33 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -51,8 +51,9 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { private static PostgreSQLContainer PSQL_DB; - public static String COL_WAKEUP = "wakeup"; - public static String COL_BIRTH = "birth"; + public static String COL_WAKEUP_AT = "wakeup_at"; + public static String COL_LAST_VISITED_AT = "last_visited_at"; + public static String COL_LAST_COMMENT_AT = "last_comment_at"; @BeforeAll static void init() { @@ -63,9 +64,12 @@ static void init() { @BeforeEach public void setup() throws Exception { final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); - COLUMN_CLAUSE_WITH_PK = "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; - COLUMN_CLAUSE_WITHOUT_PK = "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; - COLUMN_CLAUSE_WITH_COMPOSITE_PK = "first_name VARCHAR(200), last_name VARCHAR(200), updated_at DATE, wakeup TIMETZ, birth TIMESTAMPTZ"; + COLUMN_CLAUSE_WITH_PK = + "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup_at TIMETZ, last_visited_at TIMESTAMPTZ, last_comment_at TIMESTAMP"; + COLUMN_CLAUSE_WITHOUT_PK = + "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup_at TIMETZ, last_visited_at TIMESTAMPTZ, last_comment_at TIMESTAMP"; + COLUMN_CLAUSE_WITH_COMPOSITE_PK = + "first_name VARCHAR(200), last_name VARCHAR(200), updated_at DATE, wakeup_at TIMETZ, last_visited_at TIMESTAMPTZ, last_comment_at TIMESTAMP"; config = Jsons.jsonNode(ImmutableMap.builder() .put("host", PSQL_DB.getHost()) @@ -106,15 +110,15 @@ public void setup() throws Exception { primaryKeyClause(Collections.singletonList("id")))); connection.createStatement().execute( String.format( - "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (1,'picard', '2004-10-19','10:10:10.123456-05:00','2004-10-19T17:23:54.123456Z')", + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (1,'picard', '2004-10-19','10:10:10.123456-05:00','2004-10-19T17:23:54.123456Z','2004-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME))); connection.createStatement().execute( String.format( - "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z','2005-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME))); connection.createStatement().execute( String.format( - "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (3, 'vash', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (3, 'vash', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME))); connection.createStatement().execute( @@ -122,15 +126,15 @@ public void setup() throws Exception { COLUMN_CLAUSE_WITHOUT_PK, "")); connection.createStatement().execute( String.format( - "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (1,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z')", + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (1,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z','2004-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); connection.createStatement().execute( String.format( - "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z','2005-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); connection.createStatement().execute( String.format( - "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (3, 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z')", + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (3, 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); connection.createStatement().execute( @@ -139,15 +143,15 @@ public void setup() throws Exception { primaryKeyClause(ImmutableList.of("first_name", "last_name")))); connection.createStatement().execute( String.format( - "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('first' ,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z')", + "INSERT INTO %s(first_name, last_name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES ('first' ,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z','2004-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); connection.createStatement().execute( String.format( - "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('second', 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z')", + "INSERT INTO %s(first_name, last_name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES ('second', 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z','2005-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); connection.createStatement().execute( String.format( - "INSERT INTO %s(first_name, last_name, updated_at, wakeup, birth) VALUES ('third', 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z')", + "INSERT INTO %s(first_name, last_name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES ('third', 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); }); @@ -161,8 +165,9 @@ protected List getAirbyteMessagesReadOneColumn() { .peek(m -> { ((ObjectNode) m.getRecord().getData()).remove(COL_NAME); ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); - ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_COMMENT_AT); ((ObjectNode) m.getRecord().getData()).replace(COL_ID, Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); }) @@ -174,16 +179,18 @@ protected ArrayList getAirbyteMessagesCheckCursorSpaceInColumnNa final AirbyteMessage firstMessage = getTestMessages().get(0); firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_WAKEUP); - ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_LAST_COMMENT_AT); ((ObjectNode) firstMessage.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_NAME)); final AirbyteMessage secondMessage = getTestMessages().get(2); secondMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_WAKEUP); - ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_LAST_COMMENT_AT); ((ObjectNode) secondMessage.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_NAME)); @@ -201,8 +208,9 @@ protected List getAirbyteMessagesSecondSync(String streamName2) m.getRecord().setStream(streamName2); m.getRecord().setNamespace(getDefaultNamespace()); ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); - ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_COMMENT_AT); ((ObjectNode) m.getRecord().getData()).replace(COL_ID, Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); }) @@ -216,8 +224,9 @@ protected List getAirbyteMessagesSecondStreamWithNamespace(Strin .peek(m -> { m.getRecord().setStream(streamName2); ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); - ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_COMMENT_AT); ((ObjectNode) m.getRecord().getData()).replace(COL_ID, Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); }) @@ -233,8 +242,9 @@ protected List getAirbyteMessagesForTablesWithQuoting(Configured ((ObjectNode) m.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, ((ObjectNode) m.getRecord().getData()).remove(COL_NAME)); ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).remove(COL_BIRTH); - ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_COMMENT_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP_AT); ((ObjectNode) m.getRecord().getData()).replace(COL_ID, Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); }) @@ -283,35 +293,38 @@ protected List getTestMessages() { .of(COL_ID, ID_VALUE_1, COL_NAME, "picard", COL_UPDATED_AT, "2004-10-19", - COL_WAKEUP, "10:10:10.123456-05:00", - COL_BIRTH, "2004-10-19T17:23:54.123456Z")))), + COL_WAKEUP_AT, "10:10:10.123456-05:00", + COL_LAST_VISITED_AT, "2004-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2004-01-01T17:23:54.123456")))), new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_2, COL_NAME, "crusher", COL_UPDATED_AT, "2005-10-19", - COL_WAKEUP, "11:11:11.123456-05:00", - COL_BIRTH, "2005-10-19T17:23:54.123456Z")))), + COL_WAKEUP_AT, "11:11:11.123456-05:00", + COL_LAST_VISITED_AT, "2005-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2005-01-01T17:23:54.123456")))), new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_3, COL_NAME, "vash", COL_UPDATED_AT, "2006-10-19", - COL_WAKEUP, "12:12:12.123456-05:00", - COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); + COL_WAKEUP_AT, "12:12:12.123456-05:00", + COL_LAST_VISITED_AT, "2006-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2006-01-01T17:23:54.123456"))))); } protected void executeStatementReadIncrementallyTwice() throws SQLException { database.execute(connection -> { connection.createStatement().execute( String.format( - "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (4,'riker', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (4,'riker', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME))); connection.createStatement().execute( String.format( - "INSERT INTO %s(id, name, updated_at, wakeup, birth) VALUES (5, 'data', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z')", + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (5, 'data', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", getFullyQualifiedTableName(TABLE_NAME))); }); } @@ -325,8 +338,9 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING), Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), - Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), - Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) + Field.of(COL_WAKEUP_AT, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_LAST_VISITED_AT, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE), + Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), CatalogHelpers.createAirbyteStream( @@ -335,8 +349,9 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING), Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), - Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), - Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) + Field.of(COL_WAKEUP_AT, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_LAST_VISITED_AT, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE), + Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(Collections.emptyList()), CatalogHelpers.createAirbyteStream( @@ -345,15 +360,16 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), Field.of(COL_LAST_NAME, JsonSchemaType.STRING), Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), - Field.of(COL_WAKEUP, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), - Field.of(COL_BIRTH, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)) + Field.of(COL_WAKEUP_AT, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_LAST_VISITED_AT, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE), + Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey( List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); } @Override - protected void incrementalTimestampCheck() throws Exception { + protected void incrementalDateCheck() throws Exception { super.incrementalCursorCheck(COL_UPDATED_AT, "2005-10-18", "2006-10-19", @@ -363,7 +379,7 @@ protected void incrementalTimestampCheck() throws Exception { @Test void incrementalTimeTzCheck() throws Exception { - super.incrementalCursorCheck(COL_WAKEUP, + super.incrementalCursorCheck(COL_WAKEUP_AT, "11:09:11.123456-05:00", "12:12:12.123456-05:00", Lists.newArrayList(getTestMessages().get(1), @@ -372,13 +388,22 @@ void incrementalTimeTzCheck() throws Exception { @Test void incrementalTimestampTzCheck() throws Exception { - super.incrementalCursorCheck(COL_BIRTH, + super.incrementalCursorCheck(COL_LAST_VISITED_AT, "2005-10-18T17:23:54.123456Z", "2006-10-19T17:23:54.123456Z", Lists.newArrayList(getTestMessages().get(1), getTestMessages().get(2))); } + @Test + void incrementalTimestampCheck() throws Exception { + super.incrementalCursorCheck(COL_LAST_COMMENT_AT, + "2004-12-12T17:23:54.123456", + "2006-01-01T17:23:54.123456", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + @Override protected JdbcSourceOperations getSourceOperations() { return new PostgresSourceOperations(); @@ -393,16 +418,18 @@ protected List getExpectedAirbyteMessagesSecondSync(String names .of(COL_ID, ID_VALUE_4, COL_NAME, "riker", COL_UPDATED_AT, "2006-10-19", - COL_WAKEUP, "12:12:12.123456-05:00", - COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); + COL_WAKEUP_AT, "12:12:12.123456-05:00", + COL_LAST_VISITED_AT, "2006-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2006-01-01T17:23:54.123456"))))); expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_5, COL_NAME, "data", COL_UPDATED_AT, "2006-10-19", - COL_WAKEUP, "12:12:12.123456-05:00", - COL_BIRTH, "2006-10-19T17:23:54.123456Z"))))); + COL_WAKEUP_AT, "12:12:12.123456-05:00", + COL_LAST_VISITED_AT, "2006-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2006-01-01T17:23:54.123456"))))); expectedMessages.add(new AirbyteMessage() .withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage() From 5771f4fd00a83a036ccd6bf9eb2791256b9e7b5f Mon Sep 17 00:00:00 2001 From: grishick Date: Mon, 13 Jun 2022 13:12:59 -0700 Subject: [PATCH 10/11] Bump version in Dockerfile --- airbyte-integrations/connectors/source-postgres/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 42f2984e039df..1b07db6a77492 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.21 +LABEL io.airbyte.version=0.4.22 LABEL io.airbyte.name=airbyte/source-postgres From d5f6eaac75ab840d2c0445a777111a18c0f802ad Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Mon, 13 Jun 2022 20:53:47 +0000 Subject: [PATCH 11/11] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index b791b8affd0d3..ad55438cdc0ca 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -715,7 +715,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 0.4.21 + dockerImageTag: 0.4.22 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 99606929ea518..e009be787db7f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6719,7 +6719,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:0.4.21" +- dockerImage: "airbyte/source-postgres:0.4.22" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: