Skip to content

Commit

Permalink
Remove inefficient namespace clone. (#21552)
Browse files Browse the repository at this point in the history
Testing shows this is causing ~ 5MB/s of throughput on the platform. This is not needed since we can simply modify the already present Json node instead of a cloned object.

This should help both CPU and GC pressure.
  • Loading branch information
davinchia authored Jan 18, 2023
1 parent e092a4e commit da34c78
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ public ConfiguredAirbyteCatalog mapCatalog(final ConfiguredAirbyteCatalog inputC
}

@Override
public AirbyteMessage mapMessage(final AirbyteMessage inputMessage) {
if (inputMessage.getType() == Type.RECORD) {
final AirbyteMessage message = Jsons.clone(inputMessage);
public AirbyteMessage mapMessage(final AirbyteMessage message) {
if (message.getType() == Type.RECORD) {
// Default behavior if namespaceDefinition is not set is to follow SOURCE
if (namespaceDefinition != null) {
if (namespaceDefinition.equals(NamespaceDefinitionType.DESTINATION)) {
Expand All @@ -73,7 +72,7 @@ public AirbyteMessage mapMessage(final AirbyteMessage inputMessage) {
message.getRecord().setStream(transformStreamName(message.getRecord().getStream(), streamPrefix));
return message;
}
return inputMessage;
return message;
}

private static String formatNamespace(final String sourceNamespace, final String namespaceFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class NamespacingMapperTest {
Expand All @@ -28,14 +29,19 @@ class NamespacingMapperTest {
STREAM_NAME,
INPUT_NAMESPACE,
Field.of(FIELD_NAME, JsonSchemaType.STRING));
private static final AirbyteMessage RECORD_MESSAGE = createRecordMessage();
private AirbyteMessage RECORD_MESSAGE;

private static AirbyteMessage createRecordMessage() {
final AirbyteMessage message = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, BLUE);
message.getRecord().withNamespace(INPUT_NAMESPACE);
return message;
}

@BeforeEach
void setUp() {
RECORD_MESSAGE = createRecordMessage();
}

@Test
void testSourceNamespace() {
final NamespacingMapper mapper = new NamespacingMapper(NamespaceDefinitionType.SOURCE, null, OUTPUT_PREFIX);
Expand All @@ -51,11 +57,12 @@ void testSourceNamespace() {
assertEquals(expectedCatalog, actualCatalog);

final AirbyteMessage originalMessage = Jsons.clone(RECORD_MESSAGE);
assertEquals(originalMessage, RECORD_MESSAGE);

final AirbyteMessage expectedMessage = AirbyteMessageUtils.createRecordMessage(OUTPUT_PREFIX + STREAM_NAME, FIELD_NAME, BLUE);
expectedMessage.getRecord().withNamespace(INPUT_NAMESPACE);
final AirbyteMessage actualMessage = mapper.mapMessage(RECORD_MESSAGE);

assertEquals(originalMessage, RECORD_MESSAGE);
final AirbyteMessage actualMessage = mapper.mapMessage(RECORD_MESSAGE);
assertEquals(expectedMessage, actualMessage);
}

Expand Down Expand Up @@ -100,10 +107,10 @@ void testDestinationNamespace() {
assertEquals(expectedCatalog, actualCatalog);

final AirbyteMessage originalMessage = Jsons.clone(RECORD_MESSAGE);
assertEquals(originalMessage, RECORD_MESSAGE);

final AirbyteMessage expectedMessage = AirbyteMessageUtils.createRecordMessage(OUTPUT_PREFIX + STREAM_NAME, FIELD_NAME, BLUE);
final AirbyteMessage actualMessage = mapper.mapMessage(RECORD_MESSAGE);

assertEquals(originalMessage, RECORD_MESSAGE);
assertEquals(expectedMessage, actualMessage);
}

Expand All @@ -122,11 +129,11 @@ void testCustomFormatWithVariableNamespace() {
assertEquals(expectedCatalog, actualCatalog);

final AirbyteMessage originalMessage = Jsons.clone(RECORD_MESSAGE);
assertEquals(originalMessage, RECORD_MESSAGE);

final AirbyteMessage expectedMessage = AirbyteMessageUtils.createRecordMessage(OUTPUT_PREFIX + STREAM_NAME, FIELD_NAME, BLUE);
expectedMessage.getRecord().withNamespace(expectedNamespace);
final AirbyteMessage actualMessage = mapper.mapMessage(RECORD_MESSAGE);

assertEquals(originalMessage, RECORD_MESSAGE);
assertEquals(expectedMessage, actualMessage);
}

Expand All @@ -145,11 +152,11 @@ void testCustomFormatWithoutVariableNamespace() {
assertEquals(expectedCatalog, actualCatalog);

final AirbyteMessage originalMessage = Jsons.clone(RECORD_MESSAGE);
assertEquals(originalMessage, RECORD_MESSAGE);

final AirbyteMessage expectedMessage = AirbyteMessageUtils.createRecordMessage(OUTPUT_PREFIX + STREAM_NAME, FIELD_NAME, BLUE);
expectedMessage.getRecord().withNamespace(expectedNamespace);
final AirbyteMessage actualMessage = mapper.mapMessage(RECORD_MESSAGE);

assertEquals(originalMessage, RECORD_MESSAGE);
assertEquals(expectedMessage, actualMessage);
}

Expand Down Expand Up @@ -194,13 +201,13 @@ void testEmptyPrefix() {
assertEquals(expectedCatalog, actualCatalog);

final AirbyteMessage originalMessage = Jsons.clone(RECORD_MESSAGE);
assertEquals(originalMessage, RECORD_MESSAGE);

final AirbyteMessage expectedMessage = AirbyteMessageUtils.createRecordMessage(
STREAM_NAME,
FIELD_NAME, BLUE);
expectedMessage.getRecord().withNamespace(INPUT_NAMESPACE);
final AirbyteMessage actualMessage = mapper.mapMessage(RECORD_MESSAGE);

assertEquals(originalMessage, RECORD_MESSAGE);
assertEquals(expectedMessage, actualMessage);
}

Expand Down

0 comments on commit da34c78

Please sign in to comment.