Skip to content

Commit

Permalink
[pulsar-io] KCA: properly handle KeyValue that getNativeObject() retu…
Browse files Browse the repository at this point in the history
…rns: corrected type + support for KeyValue<GenericRecord, GenericRecord> (apache#15025)

(cherry picked from commit d76b5d4)
(cherry picked from commit 515a9bf)
  • Loading branch information
dlg99 authored and nicoloboschi committed Jun 9, 2022
1 parent f397983 commit 5595d58
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
Expand Down Expand Up @@ -261,12 +262,8 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {

Object nativeObject = sourceRecord.getValue().getNativeObject();

if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue) {
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) nativeObject;
key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema);
value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema);
} else if (nativeObject instanceof org.apache.pulsar.io.core.KeyValue) {
org.apache.pulsar.io.core.KeyValue kv = (org.apache.pulsar.io.core.KeyValue) nativeObject;
if (nativeObject instanceof KeyValue) {
KeyValue kv = (KeyValue) nativeObject;
key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema);
value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema);
} else if (nativeObject != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.pulsar.client.api.schema.GenericRecord;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.core.SinkContext;
Expand Down Expand Up @@ -604,17 +605,6 @@ public void unknownRecordSchemaTest() throws Exception {
sink.close();
}

@Test
public void coreKeyValueSchemaTest() throws Exception {
org.apache.pulsar.io.core.KeyValue<Integer, String> kv =
new org.apache.pulsar.io.core.KeyValue<>(11, "value");
SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
String val = (String) sinkRecord.value();
Assert.assertEquals(val, "value");
int key = (int) sinkRecord.key();
Assert.assertEquals(key, 11);
}

@Test
public void schemaKeyValueSchemaTest() throws Exception {
org.apache.pulsar.common.schema.KeyValue<Integer, String> kv =
Expand Down Expand Up @@ -653,9 +643,7 @@ public void schemaKeyValueAvroSchemaTest() throws Exception {
// integer is coming back from ObjectMapper
expectedValue.put("field3", 100);

org.apache.pulsar.common.schema.KeyValue<GenericRecord, GenericRecord> kv =
new org.apache.pulsar.common.schema.KeyValue<>(
getGenericRecord(key, pulsarAvroSchema),
KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
getGenericRecord(value, pulsarAvroSchema));

SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema),
Expand Down

0 comments on commit 5595d58

Please sign in to comment.