Skip to content

Commit

Permalink
implementing debezium transactional handling
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Jan 7, 2021
1 parent 936ac5d commit 1d25a25
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 78 deletions.
323 changes: 248 additions & 75 deletions src/main/java/com/jwplayer/southpaw/Southpaw.java

Large diffs are not rendered by default.

20 changes: 18 additions & 2 deletions src/main/java/com/jwplayer/southpaw/record/BaseRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
*/
package com.jwplayer.southpaw.record;

import com.jwplayer.southpaw.util.ByteArray;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.jwplayer.southpaw.util.ByteArray;


/**
* A record abstraction for standardizing and exposing some basic functionality.
*/
public abstract class BaseRecord {

private Map<String, ?> metadata;

/**
* Accessor for a particular field in this record
* @param fieldName - The name of the field to get
Expand Down Expand Up @@ -78,7 +81,20 @@ public ByteArray toByteArray() {
* Gives a friendly string representation of the object. Particularly useful for debugging in Intellij.
* @return The string representation of this object.
*/
@Override
public String toString() {
return toMap().toString();
}

/**
* Metadata associated with the record to be used in initial processing decisions.
* @return
*/
public Map<String, ?> getMetadata() {
return metadata;
}

public void setMetadata(Map<String, ?> metadata) {
this.metadata = metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public BaseRecord deserialize(String topic, byte[] data) {
}
Map<String, ?> after = (Map<String, ?>) envelope.get("after");
MapRecord result = new MapRecord(after);
//result.setMetadata(envelope);
result.setMetadata(envelope);
return result;
}

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,12 @@ public String toString() {
* @param value - The serialized value.
*/
public abstract void write(K key, V value);

/**
* Name of the table referenced in transaction metadata
* @return
*/
public String getTableName() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ public interface ConsumerRecordIterator<K, V> extends Iterator<ConsumerRecord<K,

int getApproximateCount();

V peekValue();

}
5 changes: 5 additions & 0 deletions src/main/java/com/jwplayer/southpaw/topic/InMemoryTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public boolean hasNext() {
public ConsumerRecord<K, V> next() {
return retVal.get(index++);
}

@Override
public V peekValue() {
return retVal.get(index).value();
}
};
}

Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
*/
public class KafkaTopic<K, V> extends BaseTopic<K, V> {
public static final long END_OFFSET_REFRESH_MS_DEFAULT = 60000;

public static final String PERSISTENT = "persistent";
public static final boolean PERSISTENT_DEFAULT = true;
public static final String TABLE_NAME = "table.name";
/**
* Le Logger
*/
Expand Down Expand Up @@ -100,6 +102,11 @@ public int getApproximateCount() {
return approximateCount;
}

@Override
public V peekValue() {
return nextValue;
}

/**
* Internal helper to obtain and stage the next non-skipped record
*
Expand Down Expand Up @@ -296,8 +303,15 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
* The callback for Kafka producer writes
*/
private final Callback producerCallback = new KafkaProducerCallback();
/**
* If the values for this topic should be saved
*/
private boolean persistent;
private TopicPartition topicPartition;
/**
* The table name associated with this topic if transactional
*/
private String tableName;

@Override
public void commit() {
Expand Down Expand Up @@ -352,6 +366,7 @@ public void configure(TopicConfig<K, V> topicConfig) {
}

this.persistent = (Boolean)spConfig.getOrDefault(PERSISTENT, PERSISTENT_DEFAULT);
this.tableName = (String)spConfig.getOrDefault(TABLE_NAME, null);
}

@Override
Expand Down Expand Up @@ -443,4 +458,9 @@ private void checkCallbackExceptions() throws RuntimeException {
public void setPollTimeout(long pollTimeout) {
this.pollTimeout = pollTimeout;
}

@Override
public String getTableName() {
return this.tableName;
}
}

0 comments on commit 1d25a25

Please sign in to comment.