Skip to content

Commit

Permalink
fix: backup files are re-created on every restart (#6348)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Oct 2, 2020
1 parent 25bb352 commit 28b8486
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.benchmark;

import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.avro.random.generator.Generator;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -183,6 +184,7 @@ private static RowGenerator getRowGenerator(final String schemaName) throws IOEx
return new RowGenerator(generator, keyField, Optional.empty());
}

@SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
private static Generator getGenerator(final String schemaName) throws IOException {
final Path schemaPath = SCHEMA_DIR.resolve(schemaName + SCHEMA_FILE_SUFFIX);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,79 +15,70 @@

package io.confluent.ksql.rest.server;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* A file that is used by the backup service to replay command_topic commands.
*/
public class BackupReplayFile implements Closeable {
private static final ObjectMapper MAPPER = PlanJsonMapper.INSTANCE.get();
private static final String KEY_VALUE_SEPARATOR = ":";
public final class BackupReplayFile implements Closeable {
private static final String KEY_VALUE_SEPARATOR_STR = ":";
private static final String NEW_LINE_SEPARATOR_STR = "\n";

private static final byte[] KEY_VALUE_SEPARATOR_BYTES =
KEY_VALUE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8);
private static final byte[] NEW_LINE_SEPARATOR_BYTES =
NEW_LINE_SEPARATOR_STR.getBytes(StandardCharsets.UTF_8);

private final File file;
private final BufferedWriter writer;
private final FileOutputStream writer;

public BackupReplayFile(final File file) {
this.file = Objects.requireNonNull(file, "file");
this.writer = createWriter(file);
}

private static BufferedWriter createWriter(final File file) {
private static FileOutputStream createWriter(final File file) {
try {
return new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(file, true),
StandardCharsets.UTF_8)
);
return new FileOutputStream(file, true);
} catch (final FileNotFoundException e) {
throw new KsqlException(
String.format("Failed to create replay file: %s", file.getAbsolutePath()), e);
String.format("Failed to create/open replay file: %s", file.getAbsolutePath()), e);
}
}

public String getPath() {
return file.getAbsolutePath();
}

public void write(final CommandId commandId, final Command command) throws IOException {
writer.write(MAPPER.writeValueAsString(commandId));
writer.write(KEY_VALUE_SEPARATOR);
writer.write(MAPPER.writeValueAsString(command));
writer.write("\n");
public void write(final ConsumerRecord<byte[], byte[]> record) throws IOException {
writer.write(record.key());
writer.write(KEY_VALUE_SEPARATOR_BYTES);
writer.write(record.value());
writer.write(NEW_LINE_SEPARATOR_BYTES);
writer.flush();
}

public void write(final List<Pair<CommandId, Command>> records) throws IOException {
for (final Pair<CommandId, Command> record : records) {
write(record.left, record.right);
}
}

public List<Pair<CommandId, Command>> readRecords() throws IOException {
final List<Pair<CommandId, Command>> commands = new ArrayList<>();
public List<Pair<byte[], byte[]>> readRecords() throws IOException {
final List<Pair<byte[], byte[]>> commands = new ArrayList<>();
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) {
final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR));
final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR) + 1);
final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR_STR));
final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR_STR) + 1);

commands.add(new Pair<>(
MAPPER.readValue(commandId.getBytes(StandardCharsets.UTF_8), CommandId.class),
MAPPER.readValue(command.getBytes(StandardCharsets.UTF_8), Command.class)
commandId.getBytes(StandardCharsets.UTF_8),
command.getBytes(StandardCharsets.UTF_8)
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.collect.Lists;
import io.confluent.ksql.rest.server.computation.QueuedCommand;
import io.confluent.ksql.util.KsqlServerException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -87,10 +86,10 @@ public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(final Duration ti
for (ConsumerRecord<byte[], byte[]> record : iterable) {
try {
backupRecord(record);
} catch (final KsqlServerException e) {
} catch (final Exception e) {
log.warn("Backup is out of sync with the current command topic. "
+ "Backups will not work until the previous command topic is "
+ "restored or all backup files are deleted.");
+ "restored or all backup files are deleted.", e);
return records;
}
records.add(record);
Expand All @@ -114,10 +113,10 @@ public List<QueuedCommand> getRestoreCommands(final Duration duration) {
for (final ConsumerRecord<byte[], byte[]> record : records) {
try {
backupRecord(record);
} catch (final KsqlServerException e) {
} catch (final Exception e) {
log.warn("Backup is out of sync with the current command topic. "
+ "Backups will not work until the previous command topic is "
+ "restored or all backup files are deleted.");
+ "restored or all backup files are deleted.", e);
return restoreCommands;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -57,7 +60,7 @@ public long read() {
private final Ticker ticker;

private BackupReplayFile replayFile;
private List<Pair<CommandId, Command>> latestReplay;
private List<Pair<byte[], byte[]>> latestReplay;
private int latestReplayIdx;
private boolean corruptionDetected;

Expand Down Expand Up @@ -118,44 +121,47 @@ private boolean isRestoring() {
return latestReplayIdx < latestReplay.size();
}

private boolean isRecordInLatestReplay(final ConsumerRecord<CommandId, Command> record) {
final Pair<CommandId, Command> latestReplayRecord = latestReplay.get(latestReplayIdx);
if (record.key().equals(latestReplayRecord.left)
&& record.value().equals(latestReplayRecord.right)) {
private boolean isRecordInLatestReplay(final ConsumerRecord<byte[], byte[]> record) {
final Pair<byte[], byte[]> latestReplayRecord = latestReplay.get(latestReplayIdx);

if (Arrays.equals(record.key(), latestReplayRecord.getLeft())
&& Arrays.equals(record.value(), latestReplayRecord.getRight())) {
latestReplayIdx++;
return true;
}

return false;
}

@Override
public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
final ConsumerRecord<CommandId, Command> deserializedRecord;
private void throwIfInvalidRecord(final ConsumerRecord<byte[], byte[]> record) {
try {
deserializedRecord = new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
InternalTopicSerdes.deserializer(CommandId.class)
.deserialize(record.topic(), record.key()),
InternalTopicSerdes.deserializer(Command.class)
.deserialize(record.topic(), record.value())
);
} catch (Exception e) {
LOG.error("Failed to deserialize command topic record when backing it up: {}:{}",
record.key(), record.value());
return;
InternalTopicSerdes.deserializer(CommandId.class).deserialize(record.topic(), record.key());
} catch (final Exception e) {
throw new KsqlException(String.format(
"Failed to backup record because it cannot deserialize key: %s",
new String(record.key(), StandardCharsets.UTF_8), e
));
}

try {
InternalTopicSerdes.deserializer(Command.class).deserialize(record.topic(), record.value());
} catch (final Exception e) {
throw new KsqlException(String.format(
"Failed to backup record because it cannot deserialize value: %s",
new String(record.value(), StandardCharsets.UTF_8), e
));
}
writeCommandToBackup(deserializedRecord);
}

void writeCommandToBackup(final ConsumerRecord<CommandId, Command> record) {
@Override
public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
if (corruptionDetected) {
throw new KsqlServerException(
"Failed to write record due to out of sync command topic and backup file: " + record);
}

throwIfInvalidRecord(record);

if (isRestoring()) {
if (isRecordInLatestReplay(record)) {
// Ignore backup because record was already replayed
Expand All @@ -171,7 +177,7 @@ void writeCommandToBackup(final ConsumerRecord<CommandId, Command> record) {
}

try {
replayFile.write(record.key(), record.value());
replayFile.write(record);
} catch (final IOException e) {
LOG.warn("Failed to write to file {}. The command topic backup is not complete. "
+ "Make sure the file exists and has permissions to write. KSQL must be restarted "
Expand Down
Loading

0 comments on commit 28b8486

Please sign in to comment.