Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional handling for Debezium PG CDC #81

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ Similar to the state, Southpaw is built around Kafka for the log store. The topi

* jackson.serde.class - The full class name of the deserialized object created by the JacksonSerde class
* key.serde.class - The full name of the serde class for the record key
* poll.timeout - The Kafka consumer poll() timeout in milliseconds
* topic.class - The full class name of the class used by the topic
* topic.name - The name of the topic (not the entity name for this topic!)
* value.serde.class - The full name of the serde class for the record value
Expand Down Expand Up @@ -224,7 +223,6 @@ Similar to the state, Southpaw is built around Kafka for the log store. The topi
client.id: "southpaw"
enable.auto.commit: false
key.serde.class: "com.jwplayer.southpaw.serde.AvroSerde"
poll.timeout: 100
schema.registry.url: "http://my-schema-registry:8081"
topic.class: "com.jwplayer.southpaw.topic.KafkaTopic"
value.serde.class: "com.jwplayer.southpaw.serde.AvroSerde"
Expand Down
1 change: 0 additions & 1 deletion conf/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ topics:
group.id: "southpaw"
enable.auto.commit: false
key.serde.class: "com.jwplayer.southpaw.serde.AvroSerde"
poll.timeout: 100
schema.registry.url: "http://schema_registry_url:80"
topic.class: "com.jwplayer.southpaw.topic.KafkaTopic"
value.serde.class: "com.jwplayer.southpaw.serde.AvroSerde"
Expand Down
392 changes: 298 additions & 94 deletions src/main/java/com/jwplayer/southpaw/Southpaw.java

Large diffs are not rendered by default.

23 changes: 12 additions & 11 deletions src/main/java/com/jwplayer/southpaw/filter/BaseFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
*/
package com.jwplayer.southpaw.filter;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.jwplayer.southpaw.record.BaseRecord;

import java.util.List;
import java.util.Map;
import java.util.Objects;


/**
* Base class for filtering input records. These records are not recorded in the state and are not included
Expand Down Expand Up @@ -79,11 +80,11 @@ public BaseFilter() { }

/**
* Used to inform how a record should be handled:
*
*
* UPDATE: Do not filter (no op) by advancing offset and updating state, output record produced.
* SKIP: Skip record and advance offset, don't update state, output record not produced.
* DELETE: Delete record by advancing offset and updating state, output record produced.
*
*
*/
public enum FilterMode { UPDATE, SKIP, DELETE }

Expand All @@ -100,10 +101,10 @@ public void configure(Map<String, Object> config) {
*
* @param entity - The entity of the given record
* @param record - The record to filter
* @param oldRecord - The previously seen record state (may be null)
* @param oldRecordSupplier - The Supplier of the previously seen record state (may be null)
* @return FilterMode - Describes how to handle the input record
*/
protected FilterMode customFilter(String entity, BaseRecord record, BaseRecord oldRecord) {
protected FilterMode customFilter(String entity, BaseRecord record, Supplier<BaseRecord> oldRecordSupplier) {
return FilterMode.UPDATE;
}

Expand Down Expand Up @@ -140,18 +141,18 @@ protected boolean isEqual(BaseRecord record, BaseRecord oldRecord, List<String>

/**
* Determines if the given record should be filtered based on its entity and previous entity state.
*
*
* @param entity - The entity of the given record
* @param record - The record to filter
* @param oldRecord - The previously seen record state (may be null)
* @return FilterMode - Describes how to handle the input record
*/
public FilterMode filter(String entity, BaseRecord record, BaseRecord oldRecord) {
public FilterMode filter(String entity, BaseRecord record, Supplier<BaseRecord> lookup) {
FilterMode mode;
if (record == null || record.isEmpty()) {
mode = FilterMode.DELETE;
} else {
mode = customFilter(entity, record, oldRecord);
mode = customFilter(entity, record, lookup);
}

metrics.getMeter(entity, mode).mark(1);
Expand Down
20 changes: 18 additions & 2 deletions src/main/java/com/jwplayer/southpaw/record/BaseRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
*/
package com.jwplayer.southpaw.record;

import com.jwplayer.southpaw.util.ByteArray;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.jwplayer.southpaw.util.ByteArray;


/**
* A record abstraction for standardizing and exposing some basic functionality.
*/
public abstract class BaseRecord {

private Map<String, ?> metadata;

/**
* Accessor for a particular field in this record
* @param fieldName - The name of the field to get
Expand Down Expand Up @@ -78,7 +81,20 @@ public ByteArray toByteArray() {
* Gives a friendly string representation of the object. Particularly useful for debugging in Intellij.
* @return The string representation of this object.
*/
@Override
public String toString() {
return toMap().toString();
}

/**
* Metadata associated with the record to be used in initial processing decisions.
* @return
*/
public Map<String, ?> getMetadata() {
return metadata;
}

public void setMetadata(Map<String, ?> metadata) {
this.metadata = metadata;
}
}
6 changes: 3 additions & 3 deletions src/main/java/com/jwplayer/southpaw/record/MapRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package com.jwplayer.southpaw.record;

import com.google.common.collect.ImmutableMap;

import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.ImmutableMap;


/**
* Wrapper record for a simple map object
Expand Down Expand Up @@ -57,6 +57,6 @@ public boolean isEmpty() {
@Override
public Map<String, ?> toMap() {
if(internalRecord == null) return ImmutableMap.of();
return ImmutableMap.copyOf(internalRecord);
return internalRecord;
}
}
78 changes: 78 additions & 0 deletions src/main/java/com/jwplayer/southpaw/serde/DebeziumJsonSerde.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.jwplayer.southpaw.serde;

import java.util.Collections;
import java.util.Map;

import org.apache.commons.lang.NotImplementedException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import com.jwplayer.southpaw.record.BaseRecord;
import com.jwplayer.southpaw.record.MapRecord;

import io.confluent.kafka.serializers.KafkaJsonDeserializer;

/**
* Combines key/value parsing. This is really not desirable, but follows the pattern of the other Serde classes.
* It would be better to have separate key and value
* - key can be a simple type or BaseRecord, and value can be BaseRecord.
*
* Alternatively this could be implemented using JsonNode and a new {@link BaseRecord} type.
*/
public class DebeziumJsonSerde implements BaseSerde<BaseRecord> {

KafkaJsonDeserializer<Map<String, ?>> internalDeserializer = new KafkaJsonDeserializer<>();
boolean isKey = false;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
internalDeserializer.configure(configs, isKey);
this.isKey = isKey;
}

@Override
public void close() {
internalDeserializer.close();
}

@Override
public Serializer<BaseRecord> serializer() {
throw new NotImplementedException();
}

@Override
public Deserializer<BaseRecord> deserializer() {
return new Deserializer<BaseRecord>() {

@Override
public BaseRecord deserialize(String topic, byte[] data) {
Map<String, ?> envelope = internalDeserializer.deserialize(topic, data);
if (envelope == null) {
return null; //tombstone
}
if (isKey || envelope.get("source") == null) {
if (envelope instanceof Map) {
return new MapRecord(envelope);
}
//provide a wrapper for a non-object type
return new MapRecord(Collections.singletonMap("id", envelope));
}
Map<String, ?> after = (Map<String, ?>) envelope.get("after");
MapRecord result = new MapRecord(after);
result.setMetadata(envelope);
return result;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
throw new NotImplementedException();
}

@Override
public void close() {
throw new NotImplementedException();
}
};
}

}
22 changes: 14 additions & 8 deletions src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package com.jwplayer.southpaw.topic;

import com.jwplayer.southpaw.util.ByteArray;
import com.jwplayer.southpaw.topic.TopicConfig;
import com.jwplayer.southpaw.filter.BaseFilter;
import com.jwplayer.southpaw.state.BaseState;
import com.jwplayer.southpaw.metric.Metrics;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;

import java.util.Iterator;
import com.jwplayer.southpaw.filter.BaseFilter;
import com.jwplayer.southpaw.metric.Metrics;
import com.jwplayer.southpaw.state.BaseState;
import com.jwplayer.southpaw.util.ByteArray;


/**
Expand Down Expand Up @@ -178,7 +175,7 @@ public BaseState getState() {
* Reads records in topic based on the current offset.
* @return The list of records read.
*/
public abstract Iterator<ConsumerRecord<K, V>> readNext();
public abstract ConsumerRecordIterator<K, V> readNext();

/**
* Resets the current offset to the beginning of the topic.
Expand All @@ -189,6 +186,7 @@ public BaseState getState() {
* Gives a nicely formatted string representation of this object. Useful for the Intellij debugger.
* @return Formatted string representation of this object
*/
@Override
public String toString() {
return String.format(
"{shortName=%s,topicName=%s,currentOffset=%s,keySerde=%s,valueSerde=%s}",
Expand All @@ -206,4 +204,12 @@ public String toString() {
* @param value - The serialized value.
*/
public abstract void write(K key, V value);

/**
* Name of the table referenced in transaction metadata
* @return
*/
public String getTableName() {
return null;
}
}
6 changes: 2 additions & 4 deletions src/main/java/com/jwplayer/southpaw/topic/BlackHoleTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
*/
package com.jwplayer.southpaw.topic;

import com.jwplayer.southpaw.util.ByteArray;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.commons.lang.NotImplementedException;

import java.util.Iterator;
import com.jwplayer.southpaw.util.ByteArray;


public class BlackHoleTopic<K, V> extends BaseTopic<K, V> {
Expand Down Expand Up @@ -49,7 +47,7 @@ public V readByPK(ByteArray primaryKey) {
}

@Override
public Iterator<ConsumerRecord<K, V>> readNext() {
public ConsumerRecordIterator<K, V> readNext() {
throw new NotImplementedException();
}

Expand Down
10 changes: 4 additions & 6 deletions src/main/java/com/jwplayer/southpaw/topic/ConsoleTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
*/
package com.jwplayer.southpaw.topic;

import com.jwplayer.southpaw.record.BaseRecord;
import com.jwplayer.southpaw.util.ByteArray;
import com.jwplayer.southpaw.filter.BaseFilter.FilterMode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.commons.lang.NotImplementedException;

import java.util.Iterator;
import com.jwplayer.southpaw.filter.BaseFilter.FilterMode;
import com.jwplayer.southpaw.record.BaseRecord;
import com.jwplayer.southpaw.util.ByteArray;


public class ConsoleTopic<K, V> extends BaseTopic<K, V> {
Expand Down Expand Up @@ -51,7 +49,7 @@ public V readByPK(ByteArray primaryKey) {
}

@Override
public Iterator<ConsumerRecord<K, V>> readNext() {
public ConsumerRecordIterator<K, V> readNext() {
throw new NotImplementedException();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.jwplayer.southpaw.topic;

import java.util.Iterator;

import org.apache.kafka.clients.consumer.ConsumerRecord;

public interface ConsumerRecordIterator<K, V> extends Iterator<ConsumerRecord<K, V>>{

int getApproximateCount();

V peekValue();

}
Loading