Skip to content

Commit

Permalink
feat: Added sync alias for replicate command
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Nov 4, 2024
1 parent 6059f27 commit c53b4ed
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 21 deletions.
19 changes: 10 additions & 9 deletions plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ public class RedisReaderArgs {
public static final int DEFAULT_MEMORY_USAGE_SAMPLES = KeyValueRead.DEFAULT_MEM_USAGE_SAMPLES;
public static final long DEFAULT_SCAN_COUNT = 1000;
public static final Duration DEFAULT_FLUSH_INTERVAL = RedisItemReader.DEFAULT_FLUSH_INTERVAL;
public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = RedisItemReader.DEFAULT_NOTIFICATION_QUEUE_CAPACITY;
public static final int DEFAULT_EVENT_QUEUE_CAPACITY = RedisItemReader.DEFAULT_EVENT_QUEUE_CAPACITY;
public static final ReaderMode DEFAULT_MODE = RedisItemReader.DEFAULT_MODE;

@Option(names = "--mode", description = "Source for keys: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "<name>")
private ReaderMode mode = RedisItemReader.DEFAULT_MODE;
private ReaderMode mode = DEFAULT_MODE;

@Option(names = "--key-pattern", description = "Pattern of keys to read (default: *).", paramLabel = "<glob>")
private String keyPattern;
Expand Down Expand Up @@ -65,8 +66,8 @@ public class RedisReaderArgs {
@Option(names = "--idle-timeout", description = "Min duration in seconds to consider reader complete in live mode (default: no timeout).", paramLabel = "<sec>")
private long idleTimeout;

@Option(names = "--event-queue", description = "Capacity of the keyspace notification event queue (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int notificationQueueCapacity = DEFAULT_NOTIFICATION_QUEUE_CAPACITY;
@Option(names = "--event-queue", description = "Capacity of the keyspace notification queue (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int eventQueueCapacity = DEFAULT_EVENT_QUEUE_CAPACITY;

@Option(names = "--read-retry", description = "Max number of times to try failed reads. 0 and 1 both mean no retry (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int retryLimit;
Expand All @@ -89,7 +90,7 @@ public <K> void configure(RedisItemReader<K, ?> reader) {
reader.setKeyPattern(keyPattern);
reader.setKeyType(keyType);
reader.setMode(mode);
reader.setNotificationQueueCapacity(notificationQueueCapacity);
reader.setEventQueueCapacity(eventQueueCapacity);
reader.setPollTimeout(Duration.ofMillis(pollTimeout));
reader.setProcessor(keyProcessor(reader.getCodec(), keyFilterArgs));
reader.setQueueCapacity(queueCapacity);
Expand Down Expand Up @@ -198,12 +199,12 @@ public void setIdleTimeout(long idleTimeout) {
this.idleTimeout = idleTimeout;
}

public int getNotificationQueueCapacity() {
return notificationQueueCapacity;
public int getEventQueueCapacity() {
return eventQueueCapacity;
}

public void setNotificationQueueCapacity(int capacity) {
this.notificationQueueCapacity = capacity;
public void setEventQueueCapacity(int capacity) {
this.eventQueueCapacity = capacity;
}

public ReaderMode getMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class RedisWriterArgs {
@Option(names = "--wait-timeout", description = "Timeout in millis for WAIT command (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>")
private Duration waitTimeout = DEFAULT_WAIT_TIMEOUT;

@Option(names = "--merge", description = "Merge collection data structures (hash, list, ...) instead of overwriting them.")
@Option(names = "--merge", description = "Merge collection data structures (hash, list, ...) instead of overwriting them. Only used in `--struct` mode.")
private boolean merge;

public <K, V, T> void configure(RedisItemWriter<K, V, T> writer) {
Expand Down
34 changes: 24 additions & 10 deletions plugins/riot/src/main/java/com/redis/riot/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

@Command(name = "replicate", description = "Replicate a Redis database into another Redis database.")
@Command(name = "replicate", description = "Replicate a Redis database into another Redis database.", aliases = "sync")
public class Replicate extends AbstractReplicateCommand {

public enum Type {
STRUCT, DUMP
}

public static final Type DEFAULT_TYPE = Type.DUMP;
public static final CompareMode DEFAULT_COMPARE_MODE = CompareMode.QUICK;

private static final String COMPARE_STEP_NAME = "compare";
private static final String SCAN_TASK_NAME = "Scanning";
private static final String LIVEONLY_TASK_NAME = "Listening";
private static final String LIVE_TASK_NAME = "Scanning/Listening";

@Option(names = "--struct", description = "Enable data structure-specific replication")
private boolean struct;
@Option(names = "--type", description = "Replication type: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<name>")
private Type type = DEFAULT_TYPE;

@ArgGroup(exclusive = false)
private RedisWriterArgs targetRedisWriterArgs = new RedisWriterArgs();
Expand All @@ -38,6 +43,11 @@ public class Replicate extends AbstractReplicateCommand {
@Option(names = "--compare", description = "Compare mode: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<mode>")
private CompareMode compareMode = DEFAULT_COMPARE_MODE;

@Option(names = "--struct", description = "Enable data structure-specific replication")
public void setStruct(boolean enable) {
this.type = enable ? Type.STRUCT : Type.DUMP;
}

@Override
protected boolean isQuickCompare() {
return compareMode == CompareMode.QUICK;
Expand Down Expand Up @@ -85,7 +95,7 @@ private boolean shouldCompare() {

@SuppressWarnings({ "unchecked", "rawtypes" })
private RedisItemReader<byte[], byte[]> reader() {
if (struct) {
if (isStruct()) {
log.info("Creating Redis data-structure reader");
return RedisItemReader.struct(ByteArrayCodec.INSTANCE);
}
Expand All @@ -95,14 +105,19 @@ private RedisItemReader<byte[], byte[]> reader() {

@SuppressWarnings({ "unchecked", "rawtypes" })
private RedisItemWriter<byte[], byte[], KeyValue<byte[]>> writer() {
if (struct) {
if (isStruct()) {
log.info("Creating Redis data-structure writer");
return RedisItemWriter.struct(ByteArrayCodec.INSTANCE);
}
log.info("Creating Redis dump writer");
return (RedisItemWriter) RedisItemWriter.dump();
}

@Override
protected boolean isStruct() {
return type == Type.STRUCT;
}

private String taskName(RedisItemReader<?, ?> reader) {
switch (reader.getMode()) {
case SCAN:
Expand All @@ -129,13 +144,12 @@ public void setTargetRedisWriterArgs(RedisWriterArgs redisWriterArgs) {
this.targetRedisWriterArgs = redisWriterArgs;
}

@Override
public boolean isStruct() {
return struct;
public Type getType() {
return type;
}

public void setStruct(boolean type) {
this.struct = type;
public void setType(Type type) {
this.type = type;
}

public boolean isLogKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private void configure(RedisArgs redisArgs) {

private void configure(RedisReaderArgs redisReaderArgs) {
redisReaderArgs.setIdleTimeout(DEFAULT_IDLE_TIMEOUT_SECONDS);
redisReaderArgs.setNotificationQueueCapacity(DEFAULT_NOTIFICATION_QUEUE_CAPACITY);
redisReaderArgs.setEventQueueCapacity(DEFAULT_EVENT_QUEUE_CAPACITY);
}

@Override
Expand Down

0 comments on commit c53b4ed

Please sign in to comment.