Skip to content

Commit

Permalink
NIFI-12344 PutKudu Operation should accept Debezium Types
Browse files Browse the repository at this point in the history
This closes #8004

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
pvillard31 authored and exceptionfactory committed Nov 10, 2023
1 parent 279084d commit eb7d49c
Showing 1 changed file with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public class PutKudu extends AbstractKuduProcessor {
.name("Operation RecordPath")
.displayName("Operation RecordPath")
.description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the " +
"RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property" +
" will be ignored.")
"RecordPath must evaluate to one of the valid Kudu Operation Types (Debezium style operation types are also supported: \"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for "
+ "DELETE), or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property will be ignored.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
Expand Down Expand Up @@ -688,7 +688,18 @@ public OperationType apply(final Record record) {

final String resultValue = String.valueOf(resultList.get(0).getValue());
try {
return OperationType.valueOf(resultValue.toUpperCase());
// Support Operation Type character values from Debezium
switch (resultValue) {
case "c":
case "r":
return OperationType.INSERT;
case "u":
return OperationType.UPDATE;
case "d":
return OperationType.DELETE;
default:
return OperationType.valueOf(resultValue.toUpperCase());
}
} catch (final IllegalArgumentException iae) {
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue);
}
Expand Down

0 comments on commit eb7d49c

Please sign in to comment.