From 27ecbd5ababd708e232e806ef664e1e1ca888204 Mon Sep 17 00:00:00 2001
From: Mario Molina <mmolimar@gmail.com>
Date: Fri, 5 Mar 2021 10:10:15 -0600
Subject: [PATCH 1/2] fix: prevent IOB when printing key/value with an empty
 string

---
 .../resources/streaming/RecordFormatter.java       | 14 +++++++++-----
 .../resources/streaming/RecordFormatterTest.java   |  8 ++++++--
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java
index 438e33c9834d..0b8be2168bbd 100644
--- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java
+++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java
@@ -109,11 +109,15 @@ public List<String> format(final Iterable<ConsumerRecord<Bytes, Bytes>> records)
 
     final List<String> formatted = formatRecords(records);
 
-    final boolean sameKeyFormatChanged = activeKeyFormat
-        .equals(keyDeserializers.getPossibleFormats().get(0));
-
-    final boolean sameValueFormatChanged = activeValueFormat
-        .equals(valueDeserializers.getPossibleFormats().get(0));
+    final boolean sameKeyFormatChanged = keyDeserializers.getPossibleFormats().stream()
+        .map(activeKeyFormat::equals)
+        .findFirst()
+        .orElse(false);
+
+    final boolean sameValueFormatChanged = valueDeserializers.getPossibleFormats().stream()
+        .map(activeValueFormat::equals)
+        .findFirst()
+        .orElse(false);
 
     if (sameKeyFormatChanged && sameValueFormatChanged) {
       return formatted;
diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java
index 8809c734b7f3..56ddc86ee415 100644
--- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java
+++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java
@@ -1274,7 +1274,8 @@ public void shouldReprocessBatchIfLikelyKeyFormatChanges() {
       final Iterable<ConsumerRecord<Bytes, Bytes>> records = consumerRecords(
           // Key that is same size as  BIGINT / DOUBLE:
           consumerRecord(Bytes.wrap("Die Hard".getBytes(UTF_8)), null),
-          consumerRecord(Bytes.wrap("Key that's clearly a string".getBytes(UTF_8)), null)
+          consumerRecord(Bytes.wrap("Key that's clearly a string".getBytes(UTF_8)), null),
+          consumerRecord(Bytes.wrap("".getBytes(UTF_8)), null)
       );
 
       // When:
@@ -1282,6 +1283,7 @@ public void shouldReprocessBatchIfLikelyKeyFormatChanges() {
 
       // Then:
       assertThat(formatted.get(0), containsString("Die Hard"));
+      assertThat(formatted.get(1), containsString("Key that's clearly a string"));
     }
 
     @Test
@@ -1290,7 +1292,8 @@ public void shouldReprocessBatchIfLikelyValueFormatChanges() {
       final Iterable<ConsumerRecord<Bytes, Bytes>> records = consumerRecords(
           // Value that is same size as  BIGINT / DOUBLE:
           consumerRecord(null, Bytes.wrap("Die Hard".getBytes(UTF_8))),
-          consumerRecord(null, Bytes.wrap("Key that's clearly a string".getBytes(UTF_8)))
+          consumerRecord(null, Bytes.wrap("Value that's clearly a string".getBytes(UTF_8))),
+          consumerRecord(null, Bytes.wrap("".getBytes(UTF_8)))
       );
 
       // When:
@@ -1298,6 +1301,7 @@ public void shouldReprocessBatchIfLikelyValueFormatChanges() {
 
       // Then:
       assertThat(formatted.get(0), containsString("Die Hard"));
+      assertThat(formatted.get(1), containsString("Value that's clearly a string"));
     }
 
 

From 35d003f14de06239563d9788fd9992428e680920 Mon Sep 17 00:00:00 2001
From: Mario Molina <mmolimar@gmail.com>
Date: Tue, 9 Mar 2021 12:27:07 -0600
Subject: [PATCH 2/2] fix: updating pattern

---
 .../resources/streaming/RecordFormatter.java   | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java
index 0b8be2168bbd..865c68f13cc3 100644
--- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java
+++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java
@@ -109,15 +109,15 @@ public List<String> format(final Iterable<ConsumerRecord<Bytes, Bytes>> records)
 
     final List<String> formatted = formatRecords(records);
 
-    final boolean sameKeyFormatChanged = keyDeserializers.getPossibleFormats().stream()
-        .map(activeKeyFormat::equals)
-        .findFirst()
-        .orElse(false);
-
-    final boolean sameValueFormatChanged = valueDeserializers.getPossibleFormats().stream()
-        .map(activeValueFormat::equals)
-        .findFirst()
-        .orElse(false);
+    final boolean sameKeyFormatChanged = keyDeserializers
+        .getPossibleFormats()
+        .stream()
+        .anyMatch(activeKeyFormat::equals);
+
+    final boolean sameValueFormatChanged = valueDeserializers
+        .getPossibleFormats()
+        .stream()
+        .anyMatch(activeValueFormat::equals);
 
     if (sameKeyFormatChanged && sameValueFormatChanged) {
       return formatted;