Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Postgres Source: fixed unsupported date-time datatypes during incremental sync #13655

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,13 @@ void testReadOneColumn() throws Exception {

setEmittedAtToNull(actualMessages);

final List<AirbyteMessage> expectedMessages = getAirbyteMessagesReadOneColumn();
assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesReadOneColumn() {
final List<AirbyteMessage> expectedMessages = getTestMessages().stream()
.map(Jsons::clone)
.peek(m -> {
Expand All @@ -397,9 +404,7 @@ void testReadOneColumn() throws Exception {
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
return expectedMessages;
}

@Test
Expand Down Expand Up @@ -432,17 +437,7 @@ void testReadMultipleTables() throws Exception {
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING)));

final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
m.getRecord().setNamespace(getDefaultNamespace());
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesSecondSync(streamName2);
expectedMessages.addAll(secondStreamExpectedMessages);
}

Expand All @@ -456,6 +451,21 @@ void testReadMultipleTables() throws Exception {
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesSecondSync(String streamName2) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
m.getRecord().setNamespace(getDefaultNamespace());
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());

}

@Test
void testTablesWithQuoting() throws Exception {
final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces();
Expand All @@ -469,7 +479,17 @@ void testTablesWithQuoting() throws Exception {

setEmittedAtToNull(actualMessages);

final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesForTablesWithQuoting(streamForTableWithSpaces);
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());
expectedMessages.addAll(secondStreamExpectedMessages);

assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
Expand All @@ -481,12 +501,6 @@ void testTablesWithQuoting() throws Exception {
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());
expectedMessages.addAll(secondStreamExpectedMessages);

assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

@SuppressWarnings("ResultOfMethodCallIgnored")
Expand Down Expand Up @@ -532,6 +546,17 @@ void testIncrementalStringCheckCursor() throws Exception {
void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {
final ConfiguredAirbyteStream streamWithSpaces = createTableWithSpaces();

final ArrayList<AirbyteMessage> expectedRecordMessages = getAirbyteMessagesCheckCursorSpaceInColumnName(streamWithSpaces);
incrementalCursorCheck(
COL_LAST_NAME_WITH_SPACE,
COL_LAST_NAME_WITH_SPACE,
"patent",
"vash",
expectedRecordMessages,
streamWithSpaces);
}

protected ArrayList<AirbyteMessage> getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) {
final AirbyteMessage firstMessage = getTestMessages().get(0);
firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName());
((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT);
Expand All @@ -546,13 +571,7 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {

Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2));

incrementalCursorCheck(
COL_LAST_NAME_WITH_SPACE,
COL_LAST_NAME_WITH_SPACE,
"patent",
"vash",
Lists.newArrayList(firstMessage, secondMessage),
streamWithSpaces);
return Lists.newArrayList(firstMessage, secondMessage);
}

@Test
Expand Down Expand Up @@ -600,14 +619,7 @@ void testReadOneTableIncrementallyTwice() throws Exception {
.filter(r -> r.getType() == Type.STATE).findFirst();
assertTrue(stateAfterFirstSyncOptional.isPresent());

database.execute(connection -> {
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
});
executeStatementReadIncrementallyTwice();

final List<AirbyteMessage> actualMessagesSecondSync = MoreIterators
.toList(source.read(config, configuredCatalog,
Expand All @@ -624,6 +636,17 @@ void testReadOneTableIncrementallyTwice() throws Exception {
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
}

protected void executeStatementReadIncrementallyTwice() throws SQLException {
database.execute(connection -> {
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
});
}

protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(Type.RECORD)
Expand Down Expand Up @@ -696,16 +719,7 @@ void testReadMultipleTablesIncrementally() throws Exception {

// we know the second streams messages are the same as the first minus the updated at column. so we
// cheat and generate the expected messages off of the first expected messages.
final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesSecondStreamWithNamespace(streamName2);
final List<AirbyteMessage> expectedMessagesFirstSync = new ArrayList<>(getTestMessages());
expectedMessagesFirstSync.add(new AirbyteMessage()
.withType(Type.STATE)
Expand Down Expand Up @@ -748,6 +762,19 @@ void testReadMultipleTablesIncrementally() throws Exception {
assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync));
}

protected List<AirbyteMessage> getAirbyteMessagesSecondStreamWithNamespace(String streamName2) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
}

// when initial and final cursor fields are the same.
protected void incrementalCursorCheck(
final String cursorField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -30,6 +29,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -79,15 +80,57 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
Date date = Date.valueOf(value);
preparedStatement.setDate(parameterIndex, date);
} catch (final Exception e) {
throw new RuntimeException(e);
public void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
final JDBCType cursorFieldType,
final String value)
throws SQLException {
switch (cursorFieldType) {

case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value);
case TIMESTAMP_WITH_TIMEZONE -> setTimestampWithTimezone(preparedStatement, parameterIndex, value);
case TIME -> setTime(preparedStatement, parameterIndex, value);
case TIME_WITH_TIMEZONE -> setTimeWithTimezone(preparedStatement, parameterIndex, value);
case DATE -> setDate(preparedStatement, parameterIndex, value);
case BIT -> setBit(preparedStatement, parameterIndex, value);
case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value);
case TINYINT, SMALLINT -> setShortInt(preparedStatement, parameterIndex, value);
case INTEGER -> setInteger(preparedStatement, parameterIndex, value);
case BIGINT -> setBigInteger(preparedStatement, parameterIndex, value);
case FLOAT, DOUBLE -> setDouble(preparedStatement, parameterIndex, value);
case REAL -> setReal(preparedStatement, parameterIndex, value);
case NUMERIC, DECIMAL -> setDecimal(preparedStatement, parameterIndex, value);
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> setString(preparedStatement, parameterIndex, value);
case BINARY, BLOB -> setBinary(preparedStatement, parameterIndex, value);
// since cursor are expected to be comparable, handle cursor typing strictly and error on
// unrecognized types
default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType));
}
}

private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
}

private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
}

@Override
protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
}

@Override
protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalDate.parse(value));
}

@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
Expand Down
Loading