Skip to content

Commit

Permalink
Add FULL_MESSAGE_IN_JSON_EXPAND_VALUE message format to Kinesis sink (#…
Browse files Browse the repository at this point in the history
…52)

* Add FULL_MESSAGE_IN_JSON_EXPAND_VALUE message format to Kinesis sink

* Add doc for FULL_MESSAGE_IN_JSON_EXPAND_VALUE format for Kinesis sink

* Fix missing dependencies

* Rename record value field to payload
to prevent confusion with KeyValue value field

* Add option to flatten JSON with FULL_MESSAGE_IN_JSON_EXPAND_VALUE format

(cherry picked from commit a2e12a8)
  • Loading branch information
cbornet authored and nicoloboschi committed Apr 6, 2022
1 parent 8fba14d commit 1063026
Show file tree
Hide file tree
Showing 10 changed files with 754 additions and 123 deletions.
19 changes: 19 additions & 0 deletions pulsar-io/kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-instance</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand All @@ -71,6 +78,18 @@
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

<dependency>
<groupId>com.github.wnameless.json</groupId>
<artifactId>json-flattener</artifactId>
<version>0.13.0</version>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;

import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -47,6 +50,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
Expand All @@ -72,25 +76,25 @@
* which accepts json-map of credentials in awsCredentialPluginParam
* eg: awsCredentialPluginParam = {"accessKey":"my-access-key","secretKey":"my-secret-key"}
* 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link AwsCredentialProviderPlugin}
* 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
* 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON","FULL_MESSAGE_IN_FB"]
* a. ONLY_RAW_PAYLOAD: publishes raw payload to stream
* b. FULL_MESSAGE_IN_JSON: publish full message (encryptionCtx + properties + payload) in json format
* json-schema:
* {"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLIB"]},"batchSize":{"type":"integer"},"algorithm":{"type":"string"}}},"payloadBase64":{"type":"string"},"properties":{"type":"object","additionalProperties":{"type":"string"}}}}
* Example:
* {"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
* c. FULL_MESSAGE_IN_FB: publish full message (encryptionCtx + properties + payload) in flatbuffer format
* d. FULL_MESSAGE_IN_JSON_EXPAND_VALUE: publish full message (topic + key + value + properties + event time) in JSON format using the schema to expand the value in JSON.
* </pre>
*
*
*
*/
@Connector(
name = "kinesis",
type = IOType.SINK,
help = "A sink connector that copies messages from Pulsar to Kinesis",
configClass = KinesisSinkConfig.class
)
public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
public class KinesisSink extends AbstractAwsConnector implements Sink<GenericObject> {

private static final Logger LOG = LoggerFactory.getLogger(KinesisSink.class);

Expand All @@ -101,6 +105,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
private static final int maxPartitionedKeyLength = 256;
private SinkContext sinkContext;
private ScheduledExecutorService scheduledExecutor;
private ObjectMapper objectMapper;
//
private static final int FALSE = 0;
private static final int TRUE = 1;
Expand All @@ -120,7 +125,7 @@ private void sendUserRecord(ProducerSendCallback producerSendCallback) {
}

@Override
public void write(Record<byte[]> record) throws Exception {
public void write(Record<GenericObject> record) throws Exception {
// kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering
if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE) {
LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName,
Expand All @@ -132,18 +137,19 @@ public void write(Record<byte[]> record) throws Exception {
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least one, and at most 256
ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
int size = data.remaining();
sendUserRecord(ProducerSendCallback.create(this, record, System.nanoTime(), partitionedKey, data));
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Published message to kinesis stream {} with size {}", streamName, record.getValue().length);
LOG.debug("Published message to kinesis stream {} with size {}", streamName, size);
}
}

@Override
public void close() throws IOException {
public void close() {
if (kinesisProducer != null) {
kinesisProducer.flush();
kinesisProducer.destroy();
Expand Down Expand Up @@ -184,14 +190,18 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc

this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
this.kinesisProducer = new KinesisProducer(kinesisConfig);
this.objectMapper = new ObjectMapper();
if (kinesisSinkConfig.isJsonIncludeNonNulls()) {
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
IS_PUBLISH_FAILED.set(this, FALSE);

LOG.info("Kinesis sink started. {}", (ReflectionToStringBuilder.toString(kinesisConfig, ToStringStyle.SHORT_PREFIX_STYLE)));
}

private static final class ProducerSendCallback implements FutureCallback<UserRecordResult> {

private Record<byte[]> resultContext;
private Record<GenericObject> resultContext;
private long startTime = 0;
private final Handle<ProducerSendCallback> recyclerHandle;
private KinesisSink kinesisSink;
Expand All @@ -203,7 +213,8 @@ private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime, String partitionedKey, ByteBuffer data) {
static ProducerSendCallback create(KinesisSink kinesisSink, Record<GenericObject> resultContext, long startTime,
String partitionedKey, ByteBuffer data) {
ProducerSendCallback sendCallback = RECYCLER.get();
sendCallback.resultContext = resultContext;
sendCallback.kinesisSink = kinesisSink;
Expand Down Expand Up @@ -284,14 +295,21 @@ public void onFailure(Throwable exception) {
}
}

public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, Record<byte[]> record) {
if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
return ByteBuffer.wrap(Utils.serializeRecordToJson(record).getBytes());
} else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) {
return Utils.serializeRecordToFlatBuffer(record);
} else {
// send raw-message
return ByteBuffer.wrap(record.getValue());
public ByteBuffer createKinesisMessage(MessageFormat msgFormat, Record<GenericObject> record)
throws JsonProcessingException {
switch (msgFormat) {
case FULL_MESSAGE_IN_JSON:
return ByteBuffer.wrap(Utils.serializeRecordToJson(record).getBytes(StandardCharsets.UTF_8));
case FULL_MESSAGE_IN_FB:
return Utils.serializeRecordToFlatBuffer(record);
case FULL_MESSAGE_IN_JSON_EXPAND_VALUE:
return ByteBuffer.wrap(
Utils.serializeRecordToJsonExpandingValue(objectMapper, record,
kinesisSinkConfig.isJsonFlatten())
.getBytes(StandardCharsets.UTF_8));
default:
// send raw-message
return ByteBuffer.wrap(Utils.getMessage(record).getData());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,38 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
+ " #\n"
+ " # Kinesis sink creates a flatbuffer serialized paylaod with pulsar message payload, \n"
+ " # properties and encryptionCtx, and publishes flatbuffer payload into the configured kinesis stream."
+ " #\n"
+ " # - FULL_MESSAGE_IN_JSON_EXPAND_VALUE \n"
+ " #\n"
+ " # Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties and event time.\n"
+ " # The record schema is used to convert the value to JSON."
)
private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD

@FieldDoc(
required = false,
defaultValue = "true",
help = "Value that indicates that only properties with non-null values are to be included when using "
+ "MessageFormat.FULL_MESSAGE_IN_JSON_EXPAND_VALUE."
)
private boolean jsonIncludeNonNulls = true;

@FieldDoc(
defaultValue = "false",
help = "When set to true and the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE the output JSON will be flattened."
)
private boolean jsonFlatten = false;

@FieldDoc(
defaultValue = "false",
help = "A flag to tell Pulsar IO to retain ordering when moving messages from Pulsar to Kinesis")
private boolean retainOrdering = false;

@FieldDoc(
required = false,
defaultValue = "100",
help = "The initial delay(in milliseconds) between retries.")
private long retryInitialDelayInMillis = 100;

@FieldDoc(
required = false,
defaultValue = "60000",
help = "The maximum delay(in milliseconds) between retries.")
private long retryMaxDelayInMillis = 60000;
Expand Down Expand Up @@ -101,7 +116,21 @@ public static enum MessageFormat {
/**
* Kinesis sink sends message serialized in flat-buffer.
*/
FULL_MESSAGE_IN_FB;
FULL_MESSAGE_IN_FB,
/**
* Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties and event time.
* The record schema is used to convert the value to JSON.
*
* Example for primitive schema:
* {"topicName":"my-topic","key":"message-key","payload":"message-value","properties":{"prop-key":"prop-value"},"eventTime":1648502845803}
*
* Example for AVRO or JSON schema:
* {"topicName":"my-topic","key":"message-key","payload":{"c":"1","d":1,"e":{"a":"a"}},"properties":{"prop-key":"prop-value"},"eventTime":1648502845803}
*
* Example for KeyValue schema:
* {"topicName":"my-topic","key":"message-key","payload":{"value":{"c":"1","d":1,"e":{"a":"a"}},"key":{"a":"1","b":1}},"properties":{"prop-key":"prop-value"},"eventTime":1648502845803}
*/
FULL_MESSAGE_IN_JSON_EXPAND_VALUE
}

}
Loading

0 comments on commit 1063026

Please sign in to comment.