Skip to content

Commit

Permalink
🐛 Source Relational DB : remove unicode nulls from cursors (#13854)
Browse files Browse the repository at this point in the history
* Make sure that cursor value doesn't contain unicode null.

* Test that cursors doesn't contain unicode null

* incr version
  • Loading branch information
DoNotPanicUA authored Jun 17, 2022
1 parent 074adef commit 6ba511c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-relational-db

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.name=airbyte/source-relational-db
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ public StateDecoratingIterator(final Iterator<AirbyteMessage> messageIterator,
stateManager.setIsCdc(false);
}

private String getCursorCandidate(final AirbyteMessage message) {
String cursorCandidate = message.getRecord().getData().get(cursorField).asText();
return (cursorCandidate != null ? cursorCandidate.replaceAll("\u0000", "") : null);
}

@Override
protected AirbyteMessage computeNext() {
if (messageIterator.hasNext()) {
final AirbyteMessage message = messageIterator.next();
if (message.getRecord().getData().hasNonNull(cursorField)) {
final String cursorCandidate = message.getRecord().getData().get(cursorField).asText();
final String cursorCandidate = getCursorCandidate(message);
if (IncrementalUtils.compareCursors(maxCursor, cursorCandidate, cursorType) < 0) {
maxCursor = cursorCandidate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class StateDecoratingIteratorTest {
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, "def"))));

private static final AirbyteMessage RECORD_MESSAGE3 = new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, "abc\u0000"))));

private static Iterator<AirbyteMessage> messageIterator;
private StateManager stateManager;
private AirbyteStateMessage stateMessage;
Expand Down Expand Up @@ -130,4 +135,22 @@ void testEmptyStream() {
assertFalse(iterator.hasNext());
}

@Test
void testUnicodeNull() {
messageIterator = MoreIterators.of(RECORD_MESSAGE3);
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, "abc")).thenReturn(stateMessage);

final StateDecoratingIterator iterator = new StateDecoratingIterator(
messageIterator,
stateManager,
NAME_NAMESPACE_PAIR,
UUID_FIELD_NAME,
null,
JsonSchemaPrimitive.STRING);

assertEquals(RECORD_MESSAGE3, iterator.next());
assertEquals(stateMessage, iterator.next().getState());
assertFalse(iterator.hasNext());
}

}

0 comments on commit 6ba511c

Please sign in to comment.