From 28b84867d7e55b6ee2e27a692f42760d27fb5aca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Fri, 2 Oct 2020 14:44:20 -0500 Subject: [PATCH] fix: backup files are re-created on every restart (#6348) --- .../ksql/benchmark/SerdeBenchmark.java | 2 + .../ksql/rest/server/BackupReplayFile.java | 57 +++----- .../ksql/rest/server/CommandTopic.java | 9 +- .../rest/server/CommandTopicBackupImpl.java | 54 ++++--- .../rest/server/BackupReplayFileTest.java | 98 ++++++------- .../server/CommandTopicBackupImplTest.java | 138 +++++++++++------- 6 files changed, 189 insertions(+), 169 deletions(-) diff --git a/ksqldb-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java b/ksqldb-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java index afea797c714e..aa745a294019 100644 --- a/ksqldb-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java +++ b/ksqldb-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java @@ -16,6 +16,7 @@ package io.confluent.ksql.benchmark; import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.avro.random.generator.Generator; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -183,6 +184,7 @@ private static RowGenerator getRowGenerator(final String schemaName) throws IOEx return new RowGenerator(generator, keyField, Optional.empty()); } + @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") private static Generator getGenerator(final String schemaName) throws IOException { final Path schemaPath = SCHEMA_DIR.resolve(schemaName + SCHEMA_FILE_SUFFIX); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java index 08fb1c16dcaa..d4a3095af3b5 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java @@ -15,49 +15,46 @@ package io.confluent.ksql.rest.server; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.confluent.ksql.execution.json.PlanJsonMapper; -import io.confluent.ksql.rest.entity.CommandId; -import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; -import java.io.BufferedWriter; import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import org.apache.kafka.clients.consumer.ConsumerRecord; /** * A file that is used by the backup service to replay command_topic commands. */ -public class BackupReplayFile implements Closeable { - private static final ObjectMapper MAPPER = PlanJsonMapper.INSTANCE.get(); - private static final String KEY_VALUE_SEPARATOR = ":"; +public final class BackupReplayFile implements Closeable { + private static final String KEY_VALUE_SEPARATOR_STR = ":"; + private static final String NEW_LINE_SEPARATOR_STR = "\n"; + + private static final byte[] KEY_VALUE_SEPARATOR_BYTES = + KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); + private static final byte[] NEW_LINE_SEPARATOR_BYTES = + NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8); private final File file; - private final BufferedWriter writer; + private final FileOutputStream writer; public BackupReplayFile(final File file) { this.file = Objects.requireNonNull(file, "file"); this.writer = createWriter(file); } - private static BufferedWriter createWriter(final File file) { + private static FileOutputStream createWriter(final File file) { try { - return new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(file, true), - StandardCharsets.UTF_8) - ); + return new FileOutputStream(file, true); } catch (final FileNotFoundException e) { throw new KsqlException( - String.format("Failed to create replay file: %s", file.getAbsolutePath()), e); + String.format("Failed to create/open replay file: %s", file.getAbsolutePath()), e); } } @@ -65,29 +62,23 @@ public String getPath() { return file.getAbsolutePath(); } - public void write(final CommandId commandId, final Command command) throws IOException { - writer.write(MAPPER.writeValueAsString(commandId)); - writer.write(KEY_VALUE_SEPARATOR); - writer.write(MAPPER.writeValueAsString(command)); - writer.write("\n"); + public void write(final ConsumerRecord record) throws IOException { + writer.write(record.key()); + writer.write(KEY_VALUE_SEPARATOR_BYTES); + writer.write(record.value()); + writer.write(NEW_LINE_SEPARATOR_BYTES); writer.flush(); } - public void write(final List> records) throws IOException { - for (final Pair record : records) { - write(record.left, record.right); - } - } - - public List> readRecords() throws IOException { - final List> commands = new ArrayList<>(); + public List> readRecords() throws IOException { + final List> commands = new ArrayList<>(); for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) { - final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR)); - final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR) + 1); + final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR)); + final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1); commands.add(new Pair<>( - MAPPER.readValue(commandId.getBytes(StandardCharsets.UTF_8), CommandId.class), - MAPPER.readValue(command.getBytes(StandardCharsets.UTF_8), Command.class) + commandId.getBytes(StandardCharsets.UTF_8), + command.getBytes(StandardCharsets.UTF_8) )); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java index e74bceb58296..d86a313b1a76 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java @@ -17,7 +17,6 @@ import com.google.common.collect.Lists; import io.confluent.ksql.rest.server.computation.QueuedCommand; -import io.confluent.ksql.util.KsqlServerException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -87,10 +86,10 @@ public Iterable> getNewCommands(final Duration ti for (ConsumerRecord record : iterable) { try { backupRecord(record); - } catch (final KsqlServerException e) { + } catch (final Exception e) { log.warn("Backup is out of sync with the current command topic. " + "Backups will not work until the previous command topic is " - + "restored or all backup files are deleted."); + + "restored or all backup files are deleted.", e); return records; } records.add(record); @@ -114,10 +113,10 @@ public List getRestoreCommands(final Duration duration) { for (final ConsumerRecord record : records) { try { backupRecord(record); - } catch (final KsqlServerException e) { + } catch (final Exception e) { log.warn("Backup is out of sync with the current command topic. " + "Backups will not work until the previous command topic is " - + "restored or all backup files are deleted."); + + "restored or all backup files are deleted.", e); return restoreCommands; } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java index 5e8cd8539473..492842e3041c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java @@ -21,13 +21,16 @@ import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.InternalTopicSerdes; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.Pair; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.attribute.PosixFilePermissions; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -57,7 +60,7 @@ public long read() { private final Ticker ticker; private BackupReplayFile replayFile; - private List> latestReplay; + private List> latestReplay; private int latestReplayIdx; private boolean corruptionDetected; @@ -118,10 +121,11 @@ private boolean isRestoring() { return latestReplayIdx < latestReplay.size(); } - private boolean isRecordInLatestReplay(final ConsumerRecord record) { - final Pair latestReplayRecord = latestReplay.get(latestReplayIdx); - if (record.key().equals(latestReplayRecord.left) - && record.value().equals(latestReplayRecord.right)) { + private boolean isRecordInLatestReplay(final ConsumerRecord record) { + final Pair latestReplayRecord = latestReplay.get(latestReplayIdx); + + if (Arrays.equals(record.key(), latestReplayRecord.getLeft()) + && Arrays.equals(record.value(), latestReplayRecord.getRight())) { latestReplayIdx++; return true; } @@ -129,33 +133,35 @@ private boolean isRecordInLatestReplay(final ConsumerRecord return false; } - @Override - public void writeRecord(final ConsumerRecord record) { - final ConsumerRecord deserializedRecord; + private void throwIfInvalidRecord(final ConsumerRecord record) { try { - deserializedRecord = new ConsumerRecord<>( - record.topic(), - record.partition(), - record.offset(), - InternalTopicSerdes.deserializer(CommandId.class) - .deserialize(record.topic(), record.key()), - InternalTopicSerdes.deserializer(Command.class) - .deserialize(record.topic(), record.value()) - ); - } catch (Exception e) { - LOG.error("Failed to deserialize command topic record when backing it up: {}:{}", - record.key(), record.value()); - return; + InternalTopicSerdes.deserializer(CommandId.class).deserialize(record.topic(), record.key()); + } catch (final Exception e) { + throw new KsqlException(String.format( + "Failed to backup record because it cannot deserialize key: %s", + new String(record.key(), StandardCharsets.UTF_8), e + )); + } + + try { + InternalTopicSerdes.deserializer(Command.class).deserialize(record.topic(), record.value()); + } catch (final Exception e) { + throw new KsqlException(String.format( + "Failed to backup record because it cannot deserialize value: %s", + new String(record.value(), StandardCharsets.UTF_8), e + )); } - writeCommandToBackup(deserializedRecord); } - void writeCommandToBackup(final ConsumerRecord record) { + @Override + public void writeRecord(final ConsumerRecord record) { if (corruptionDetected) { throw new KsqlServerException( "Failed to write record due to out of sync command topic and backup file: " + record); } + throwIfInvalidRecord(record); + if (isRestoring()) { if (isRecordInLatestReplay(record)) { // Ignore backup because record was already replayed @@ -171,7 +177,7 @@ void writeCommandToBackup(final ConsumerRecord record) { } try { - replayFile.write(record.key(), record.value()); + replayFile.write(record); } catch (final IOException e) { LOG.warn("Failed to write to file {}. The command topic backup is not complete. " + "Make sure the file exists and has permissions to write. KSQL must be restarted " diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java index a9066f79326b..49ee95bccf20 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java @@ -17,17 +17,16 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import io.confluent.ksql.rest.entity.CommandId; -import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.util.Pair; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.util.Arrays; import java.util.List; -import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -65,54 +64,30 @@ public void shouldGetFilePath() { @Test public void shouldWriteRecord() throws IOException { // Given - final Pair record = newStreamRecord("stream1"); + final ConsumerRecord record = newStreamRecord("stream1"); // When - replayFile.write(record.left, record.right); + replayFile.write(record); // Then final List commands = Files.readAllLines(internalReplayFile.toPath()); assertThat(commands.size(), is(1)); assertThat(commands.get(0), is( "\"stream/stream1/create\"" + KEY_VALUE_SEPARATOR - + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"}" - )); - } - - @Test - public void shouldWriteListOfRecords() throws IOException { - // Given - final Pair record1 = newStreamRecord("stream1"); - final Pair record2 = newStreamRecord("stream2"); - - // When - replayFile.write(Arrays.asList(record1, record2)); - - // Then - final List commands = Files.readAllLines(internalReplayFile.toPath()); - assertThat(commands.size(), is(2)); - assertThat(commands.get(0), is( - "\"stream/stream1/create\"" + KEY_VALUE_SEPARATOR - + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"}" - )); - assertThat(commands.get(1), is( - "\"stream/stream2/create\"" + KEY_VALUE_SEPARATOR - + "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"}" + + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"" + + ",\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}" )); } @Test public void shouldWriteRecordWithNewLineCharacterInCommand() throws IOException { // Given - final CommandId commandId1 = new CommandId( - CommandId.Type.STREAM, "stream1", CommandId.Action.CREATE); - final Command command1 = new Command( - "CREATE STREAM stream1 (id INT, f\n1 INT) WITH (kafka_topic='topic1)", - Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty() - ); + final String commandId = buildKey("stream1"); + final String command = + "{\"statement\":\"CREATE STREAM stream1 (id INT, f\\n1 INT) WITH (kafka_topic='topic1')\"}"; // When - replayFile.write(commandId1, command1); + replayFile.write(newStreamRecord(commandId, command)); // Then final List commands = Files.readAllLines(internalReplayFile.toPath()); @@ -120,7 +95,7 @@ public void shouldWriteRecordWithNewLineCharacterInCommand() throws IOException assertThat(commands.get(0), is( "\"stream/stream1/create\"" + KEY_VALUE_SEPARATOR + "{\"statement\":" - + "\"CREATE STREAM stream1 (id INT, f\\n1 INT) WITH (kafka_topic='topic1)\"}" + + "\"CREATE STREAM stream1 (id INT, f\\n1 INT) WITH (kafka_topic='topic1')\"}" )); } @@ -136,37 +111,52 @@ public void shouldBeEmptyWhenReadAllCommandsFromEmptyFile() throws IOException { @Test public void shouldReadCommands() throws IOException { // Given - final Pair record1 = newStreamRecord("stream1"); - final Pair record2 = newStreamRecord("stream2"); + final ConsumerRecord record1 = newStreamRecord("stream1"); + final ConsumerRecord record2 = newStreamRecord("stream2"); Files.write(internalReplayFile.toPath(), String.format("%s%s%s%n%s%s%s", "\"stream/stream1/create\"", KEY_VALUE_SEPARATOR, - "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"}", + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"," + + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}", "\"stream/stream2/create\"", KEY_VALUE_SEPARATOR, - "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"}" + "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"," + + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}" ).getBytes(StandardCharsets.UTF_8)); // When - final List> commands = replayFile.readRecords(); + final List> commands = replayFile.readRecords(); // Then assertThat(commands.size(), is(2)); - assertThat(commands.get(0).left, is(record1.left)); - assertThat(commands.get(0).right, is(record1.right)); - assertThat(commands.get(1).left, is(record2.left)); - assertThat(commands.get(1).right, is(record2.right)); + assertThat(commands.get(0).left, is(record1.key())); + assertThat(commands.get(0).right, is(record1.value())); + assertThat(commands.get(1).left, is(record2.key())); + assertThat(commands.get(1).right, is(record2.value())); + } + + private ConsumerRecord newStreamRecord(final String streamName) { + return newStreamRecord(buildKey(streamName), buildValue(streamName)); } - private Pair newStreamRecord(final String streamName) { - final CommandId commandId = new CommandId( - CommandId.Type.STREAM, streamName, CommandId.Action.CREATE); - final Command command = new Command( - String.format("CREATE STREAM %s (id INT) WITH (kafka_topic='%s')", streamName, streamName), - Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty() - ); + @SuppressWarnings("unchecked") + private ConsumerRecord newStreamRecord(final String key, final String value) { + final ConsumerRecord consumerRecord = mock(ConsumerRecord.class); + + when(consumerRecord.key()).thenReturn(key.getBytes(StandardCharsets.UTF_8)); + when(consumerRecord.value()).thenReturn(value.getBytes(StandardCharsets.UTF_8)); + + return consumerRecord; + } + + private String buildKey(final String streamName) { + return String.format("\"stream/%s/create\"", streamName); + } - return new Pair<>(commandId, command); + private String buildValue(final String streamName) { + return String.format("{\"statement\":\"CREATE STREAM %s (id INT) WITH (kafka_topic='%s')\"," + + "\"streamsProperties\":{},\"originalProperties\":{},\"plan\":null}", + streamName, streamName); } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java index 266021bbf746..91e27947dd4a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java @@ -19,21 +19,21 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.google.common.base.Ticker; -import io.confluent.ksql.rest.entity.CommandId; -import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.Pair; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.PosixFilePermissions; import java.util.List; -import java.util.Optional; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; import org.junit.Rule; @@ -47,9 +47,9 @@ public class CommandTopicBackupImplTest { private static final String COMMAND_TOPIC_NAME = "command_topic"; - private Pair command1 = newStreamRecord("stream1"); - private Pair command2 = newStreamRecord("stream2"); - private Pair command3 = newStreamRecord("stream3"); + private ConsumerRecord command1 = newStreamRecord("stream1"); + private ConsumerRecord command2 = newStreamRecord("stream2"); + private ConsumerRecord command3 = newStreamRecord("stream3"); @Mock private Ticker ticker; @@ -65,15 +65,27 @@ public void setup() { backupLocation.getRoot().getAbsolutePath(), COMMAND_TOPIC_NAME, ticker); } - private Pair newStreamRecord(final String streamName) { - final CommandId commandId = new CommandId( - CommandId.Type.STREAM, streamName, CommandId.Action.CREATE); - final Command command = new Command( - String.format("CREATE STREAM %s (id INT) WITH (kafka_topic='%s", streamName, streamName), - Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty() - ); + private ConsumerRecord newStreamRecord(final String streamName) { + return newStreamRecord(buildKey(streamName), buildValue(streamName)); + } + + @SuppressWarnings("unchecked") + private ConsumerRecord newStreamRecord(final String key, final String value) { + final ConsumerRecord consumerRecord = mock(ConsumerRecord.class); + + when(consumerRecord.key()).thenReturn(key.getBytes(StandardCharsets.UTF_8)); + when(consumerRecord.value()).thenReturn(value.getBytes(StandardCharsets.UTF_8)); + + return consumerRecord; + } + + private String buildKey(final String streamName) { + return String.format("\"stream/%s/create\"", streamName); + } - return new Pair<>(commandId, command); + private String buildValue(final String streamName) { + return String.format("{\"statement\":\"CREATE STREAM %s (id INT) WITH (kafka_topic='%s')\"}", + streamName, streamName); } @Test @@ -146,42 +158,73 @@ public void shouldCreateBackupLocationWhenDoesNotExist() throws IOException { assertThat(Files.exists(dir), is(true)); } + @Test + public void shouldThrowWhenRecordIsNotValidCommandId() { + // Given + commandTopicBackup.initialize(); + + // When + final Exception e = assertThrows( + KsqlException.class, + () -> commandTopicBackup.writeRecord(new ConsumerRecord<>( + "topic1", 0, 0, + "stream/a/create/invalid".getBytes(StandardCharsets.UTF_8), command1.value()))); + + // Then + assertThat(e.getMessage(), containsString( + "Failed to backup record because it cannot deserialize key: stream/a/create/invalid")); + } + + @Test + public void shouldThrowWhenRecordIsNotValidCommand() { + // Given + commandTopicBackup.initialize(); + + // When + final Exception e = assertThrows( + KsqlException.class, + () -> commandTopicBackup.writeRecord(new ConsumerRecord<>( + "topic1", 0, 0, command1.key(), + "my command".getBytes(StandardCharsets.UTF_8)))); + + // Then + assertThat(e.getMessage(), containsString( + "Failed to backup record because it cannot deserialize value: my command")); + } + @Test public void shouldWriteCommandToBackupToReplayFile() throws IOException { // Given commandTopicBackup.initialize(); // When - final ConsumerRecord record = newConsumerRecord(command1); - commandTopicBackup.writeCommandToBackup(record); + commandTopicBackup.writeRecord(command1); // Then - final List> commands = - commandTopicBackup.getReplayFile().readRecords(); + final List> commands = commandTopicBackup.getReplayFile().readRecords(); assertThat(commands.size(), is(1)); - assertThat(commands.get(0).left, is(command1.left)); - assertThat(commands.get(0).right, is(command1.right)); + assertThat(commands.get(0).left, is(command1.key())); + assertThat(commands.get(0).right, is(command1.value())); } @Test public void shouldIgnoreRecordPreviouslyReplayed() throws IOException { // Given - final ConsumerRecord record = newConsumerRecord(command1); commandTopicBackup.initialize(); - commandTopicBackup.writeCommandToBackup(record); + commandTopicBackup.writeRecord(command1); final BackupReplayFile previousReplayFile = commandTopicBackup.getReplayFile(); // When // A 2nd initialize call will open the latest backup and read the previous replayed commands commandTopicBackup.initialize(); - commandTopicBackup.writeCommandToBackup(record); + commandTopicBackup.writeRecord(command1); final BackupReplayFile currentReplayFile = commandTopicBackup.getReplayFile(); // Then - final List> commands = currentReplayFile.readRecords(); + final List> commands = currentReplayFile.readRecords(); assertThat(commands.size(), is(1)); - assertThat(commands.get(0).left, is(command1.left)); - assertThat(commands.get(0).right, is(command1.right)); + assertThat(commands.get(0).left, is(command1.key())); + assertThat(commands.get(0).right, is(command1.value())); assertThat(currentReplayFile.getPath(), is(previousReplayFile.getPath())); } @@ -190,18 +233,16 @@ public void shouldNotCreateNewReplayFileIfNewRecordsDoNotMatchPreviousBackups() // Given commandTopicBackup = new CommandTopicBackupImpl( backupLocation.getRoot().getAbsolutePath(), COMMAND_TOPIC_NAME, ticker); - final ConsumerRecord record1 = newConsumerRecord(command1); commandTopicBackup.initialize(); - commandTopicBackup.writeCommandToBackup(record1); + commandTopicBackup.writeRecord(command1); final BackupReplayFile previousReplayFile = commandTopicBackup.getReplayFile(); // When // A 2nd initialize call will open the latest backup and read the previous replayed commands commandTopicBackup.initialize(); - final ConsumerRecord record2 = newConsumerRecord(command2); // The write command will conflicts with what's already in the backup file try { - commandTopicBackup.writeCommandToBackup(record2); + commandTopicBackup.writeRecord(command2); assertThat(true, is(false)); } catch (final KsqlServerException e) { // This is expected so we do nothing @@ -212,7 +253,7 @@ public void shouldNotCreateNewReplayFileIfNewRecordsDoNotMatchPreviousBackups() assertThat(currentReplayFile.getPath(), is(previousReplayFile.getPath())); assertThat(commandTopicBackup.commandTopicCorruption(), is(true)); try { - commandTopicBackup.writeCommandToBackup(record2); + commandTopicBackup.writeRecord(command2); assertThat(true, is(false)); } catch (final KsqlServerException e) { // This is expected so we do nothing @@ -222,22 +263,19 @@ public void shouldNotCreateNewReplayFileIfNewRecordsDoNotMatchPreviousBackups() @Test public void shouldWritePreviousReplayedRecordsAlreadyChecked() throws IOException { // Given - final ConsumerRecord record1 = newConsumerRecord(command1); - final ConsumerRecord record2 = newConsumerRecord(command2); commandTopicBackup.initialize(); - commandTopicBackup.writeCommandToBackup(record1); - commandTopicBackup.writeCommandToBackup(record2); + commandTopicBackup.writeRecord(command1); + commandTopicBackup.writeRecord(command2); final BackupReplayFile previousReplayFile = commandTopicBackup.getReplayFile(); // When // A 2nd initialize call will open the latest backup and read the previous replayed commands commandTopicBackup.initialize(); // command1 is ignored because it was previously replayed - commandTopicBackup.writeCommandToBackup(record1); + commandTopicBackup.writeRecord(command1); // The write command will conflicts with what's already in the backup file - final ConsumerRecord record3 = newConsumerRecord(command3); try { - commandTopicBackup.writeCommandToBackup(record3); + commandTopicBackup.writeRecord(command3); assertThat(true, is(false)); } catch (final KsqlServerException e) { // This is expected so we do nothing @@ -245,20 +283,20 @@ public void shouldWritePreviousReplayedRecordsAlreadyChecked() throws IOExceptio final BackupReplayFile currentReplayFile = commandTopicBackup.getReplayFile(); // Then - List> commands = previousReplayFile.readRecords(); + List> commands = previousReplayFile.readRecords(); assertThat(commands.size(), is(2)); - assertThat(commands.get(0).left, is(command1.left)); - assertThat(commands.get(0).right, is(command1.right)); - assertThat(commands.get(1).left, is(command2.left)); - assertThat(commands.get(1).right, is(command2.right)); + assertThat(commands.get(0).left, is(command1.key())); + assertThat(commands.get(0).right, is(command1.value())); + assertThat(commands.get(1).left, is(command2.key())); + assertThat(commands.get(1).right, is(command2.value())); // the backup file should be the same and the contents shouldn't have been modified commands = currentReplayFile.readRecords(); assertThat(commands.size(), is(2)); - assertThat(commands.get(0).left, is(command1.left)); - assertThat(commands.get(0).right, is(command1.right)); - assertThat(commands.get(1).left, is(command2.left)); - assertThat(commands.get(1).right, is(command2.right)); + assertThat(commands.get(0).left, is(command1.key())); + assertThat(commands.get(0).right, is(command1.value())); + assertThat(commands.get(1).left, is(command2.key())); + assertThat(commands.get(1).right, is(command2.value())); assertThat(currentReplayFile.getPath(), is(previousReplayFile.getPath())); assertThat(commandTopicBackup.commandTopicCorruption(), is(true)); } @@ -335,10 +373,4 @@ public void shouldOpenReplayFileAndIgnoreFileWithInvalidTimestamp() throws IOExc "%s/backup_command_topic_111", backupLocation.getRoot().getAbsolutePath() ))); } - - private ConsumerRecord newConsumerRecord( - final Pair record - ) { - return new ConsumerRecord<>("topic", 0, 0, record.left, record.right); - } }