diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonDiffLogger.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonDiffLogger.java index 65abcc2dd..a8877aa51 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonDiffLogger.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonDiffLogger.java @@ -1,5 +1,6 @@ package com.redis.riot.redis; +import java.util.function.Function; import java.util.stream.StreamSupport; import org.slf4j.Logger; @@ -9,30 +10,39 @@ import com.redis.spring.batch.reader.KeyComparison; import com.redis.spring.batch.reader.KeyComparison.Status; +import com.redis.spring.batch.util.BatchUtils; -public class KeyComparisonDiffLogger implements ItemWriteListener { +import io.lettuce.core.codec.RedisCodec; + +public class KeyComparisonDiffLogger implements ItemWriteListener> { private final Logger log = LoggerFactory.getLogger(getClass()); + private final Function toStringKeyFunction; + + public KeyComparisonDiffLogger(RedisCodec codec) { + toStringKeyFunction = BatchUtils.toStringKeyFunction(codec); + } + @Override - public void afterWrite(Chunk items) { + public void afterWrite(Chunk> items) { StreamSupport.stream(items.spliterator(), false).filter(c -> c.getStatus() != Status.OK).forEach(this::log); } - public void log(KeyComparison comparison) { + public void log(KeyComparison comparison) { switch (comparison.getStatus()) { case MISSING: - log.error("Missing key {}", comparison.getSource().getKey()); + log.error("Missing key {}", key(comparison)); break; case TYPE: - log.error("Type mismatch on key {}. Expected {} but was {}", comparison.getSource().getKey(), + log.error("Type mismatch on key {}. Expected {} but was {}", key(comparison), comparison.getSource().getType(), comparison.getTarget().getType()); break; case VALUE: - log.error("Value mismatch on key {}", comparison.getSource().getKey()); + log.error("Value mismatch on key {}", key(comparison)); break; case TTL: - log.error("TTL mismatch on key {}. Expected {} but was {}", comparison.getSource().getKey(), + log.error("TTL mismatch on key {}. Expected {} but was {}", key(comparison), comparison.getSource().getTtl(), comparison.getTarget().getTtl()); break; default: @@ -40,4 +50,8 @@ public void log(KeyComparison comparison) { } } + private String key(KeyComparison comparison) { + return toStringKeyFunction.apply(comparison.getSource().getKey()); + } + } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonStatusCountItemWriter.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonStatusCountItemWriter.java index 814edf26b..54f747894 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonStatusCountItemWriter.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonStatusCountItemWriter.java @@ -13,52 +13,52 @@ import com.redis.spring.batch.reader.KeyComparison; import com.redis.spring.batch.reader.KeyComparison.Status; -public class KeyComparisonStatusCountItemWriter extends AbstractItemStreamItemWriter { +public class KeyComparisonStatusCountItemWriter extends AbstractItemStreamItemWriter> { - private final Map counts = Stream.of(Status.values()) - .collect(Collectors.toMap(Function.identity(), s -> new AtomicLong())); + private final Map counts = Stream.of(Status.values()) + .collect(Collectors.toMap(Function.identity(), s -> new AtomicLong())); - private long incrementAndGet(Status status) { - return counts.get(status).incrementAndGet(); - } + private long incrementAndGet(Status status) { + return counts.get(status).incrementAndGet(); + } - public long getOK() { - return getCount(Status.OK); - } + public long getOK() { + return getCount(Status.OK); + } - public long getMissing() { - return getCount(Status.MISSING); - } + public long getMissing() { + return getCount(Status.MISSING); + } - public long getType() { - return getCount(Status.TYPE); - } + public long getType() { + return getCount(Status.TYPE); + } - public long getTtl() { - return getCount(Status.TTL); - } + public long getTtl() { + return getCount(Status.TTL); + } - public long getValue() { - return getCount(Status.VALUE); - } + public long getValue() { + return getCount(Status.VALUE); + } - public long getCount(Status status) { - return counts.get(status).get(); - } + public long getCount(Status status) { + return counts.get(status).get(); + } - public List getCounts(Status... statuses) { - return Stream.of(statuses).map(this::getCount).collect(Collectors.toList()); - } + public List getCounts(Status... statuses) { + return Stream.of(statuses).map(this::getCount).collect(Collectors.toList()); + } - public long getTotal() { - return counts.values().stream().collect(Collectors.summingLong(AtomicLong::get)); - } + public long getTotal() { + return counts.values().stream().collect(Collectors.summingLong(AtomicLong::get)); + } - @Override - public void write(Chunk items) { - for (KeyComparison comparison : items) { - incrementAndGet(comparison.getStatus()); - } - } + @Override + public void write(Chunk> items) { + for (KeyComparison comparison : items) { + incrementAndGet(comparison.getStatus()); + } + } } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyValueWriteListener.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyValueWriteListener.java deleted file mode 100644 index 614561485..000000000 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyValueWriteListener.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.redis.riot.redis; - -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import org.slf4j.Logger; -import org.springframework.batch.core.ItemWriteListener; -import org.springframework.batch.item.Chunk; - -import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.util.BatchUtils; - -import io.lettuce.core.codec.RedisCodec; - -public class KeyValueWriteListener> implements ItemWriteListener { - - private final Logger log; - private final Function toStringKeyFunction; - - public KeyValueWriteListener(RedisCodec codec, Logger log) { - this.toStringKeyFunction = BatchUtils.toStringKeyFunction(codec); - this.log = log; - } - - @Override - public void beforeWrite(Chunk items) { - log.debug("Writing keys {}", keys(items)); - } - - private List keys(Chunk items) { - return StreamSupport.stream(items.spliterator(), false).map(this::toStringKey).collect(Collectors.toList()); - } - - private String toStringKey(T item) { - return toStringKeyFunction.apply(item.getKey()); - } - -} diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java index a1b3c8c43..424b6151d 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java @@ -1,6 +1,7 @@ package com.redis.riot.redis; import java.time.Duration; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,6 +13,7 @@ import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.core.step.builder.SimpleStepBuilder; import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; @@ -21,6 +23,7 @@ import com.redis.lettucemod.api.StatefulRedisModulesConnection; import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.core.AbstractExport; +import com.redis.riot.core.LoggingWriteListener; import com.redis.riot.core.RedisClientOptions; import com.redis.riot.core.RedisWriterOptions; import com.redis.spring.batch.KeyValue; @@ -31,6 +34,7 @@ import com.redis.spring.batch.reader.KeyComparatorOptions; import com.redis.spring.batch.reader.KeyComparatorOptions.StreamMessageIdPolicy; import com.redis.spring.batch.reader.KeyComparisonItemReader; +import com.redis.spring.batch.util.BatchUtils; import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.ReadFrom; @@ -69,7 +73,7 @@ public class Replication extends AbstractExport { protected boolean isStruct() { return type == ReplicationType.STRUCT; } - + @Override public void execute() throws Exception { try { @@ -100,6 +104,10 @@ protected Job job() { liveReader.setMode(ReaderMode.LIVE); FlushingStepBuilder, KeyValue> liveStep = flushingStep( step(STEP_LIVE, liveReader, writer()).processor(processor)); + if (log.isInfoEnabled()) { + addLoggingWriteListener(scanStep); + addLoggingWriteListener(liveStep); + } KeyComparisonStatusCountItemWriter compareWriter = new KeyComparisonStatusCountItemWriter(); TaskletStep compareStep = step(STEP_COMPARE, comparisonReader(), compareWriter).build(); switch (mode) { @@ -130,6 +138,20 @@ protected Job job() { } } + private static final Function toString = BatchUtils.toStringKeyFunction(ByteArrayCodec.INSTANCE); + + private void addLoggingWriteListener(SimpleStepBuilder, KeyValue> step) { + step.listener(new LoggingWriteListener<>(this::log)); + } + + private void log(Chunk> chunk) { + chunk.getItems().stream().forEach(this::log); + } + + private void log(KeyValue keyValue) { + log.info("Wrote {} {}", keyValue.getType().getCode(), toString.apply(keyValue.getKey())); + } + private FlowBuilder flow(String name) { return new FlowBuilder<>(name(name)); } @@ -141,21 +163,11 @@ private boolean shouldCompare() { @Override protected FaultTolerantStepBuilder step(String name, ItemReader reader, ItemWriter writer) { FaultTolerantStepBuilder step = super.step(name, reader, writer); - switch (name) { - case STEP_COMPARE: + if (STEP_COMPARE.equals(name)) { if (showDiffs) { - step.listener(new KeyComparisonDiffLogger()); + step.listener(new KeyComparisonDiffLogger<>(ByteArrayCodec.INSTANCE)); } step.listener(new KeyComparisonSummaryLogger((KeyComparisonStatusCountItemWriter) writer)); - break; - case STEP_LIVE: - case STEP_SCAN: - if (log.isDebugEnabled()) { - step.listener(new KeyValueWriteListener<>(ByteArrayCodec.INSTANCE, log)); - } - break; - default: - break; } return step; } @@ -166,8 +178,7 @@ private void checkKeyspaceNotificationEnabled() { String config = connection.sync().configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS) .getOrDefault(CONFIG_NOTIFY_KEYSPACE_EVENTS, ""); if (!config.contains("K")) { - log.error( - "Keyspace notifications not property configured ({}={}). Make sure it contains at least \"K\".", + log.error("Keyspace notifications not property configured ({}={}). Use the string KEA to enable them.", CONFIG_NOTIFY_KEYSPACE_EVENTS, config); } } catch (RedisException e) { @@ -190,9 +201,10 @@ private RedisItemReader> createReader() return (RedisItemReader) RedisItemReader.dump(); } - private KeyComparisonItemReader comparisonReader() { - KeyComparisonItemReader reader = compareMode == CompareMode.FULL ? RedisItemReader.compare() - : RedisItemReader.compareQuick(); + private KeyComparisonItemReader comparisonReader() { + KeyComparisonItemReader reader = compareMode == CompareMode.FULL + ? RedisItemReader.compare(ByteArrayCodec.INSTANCE) + : RedisItemReader.compareQuick(ByteArrayCodec.INSTANCE); configureReader(reader); reader.setClient(getRedisClient()); reader.setTargetClient(targetRedisClient); diff --git a/connectors/riot-redis/src/test/java/com/redis/riot/redis/ReplicationTests.java b/connectors/riot-redis/src/test/java/com/redis/riot/redis/ReplicationTests.java index 218848034..89d8b856c 100644 --- a/connectors/riot-redis/src/test/java/com/redis/riot/redis/ReplicationTests.java +++ b/connectors/riot-redis/src/test/java/com/redis/riot/redis/ReplicationTests.java @@ -1,5 +1,6 @@ package com.redis.riot.redis; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.function.Predicate; @@ -100,9 +101,27 @@ void keyProcessorWithDate(TestInfo info) throws Throwable { } @Test - void binaryKeySnapshotReplication(TestInfo info) throws Exception { + void binaryKeyValueSnapshotReplicationType(TestInfo info) throws Exception { byte[] key = Hex.decode("aced0005"); - byte[] value = "value".getBytes(); + byte[] value = Hex.decode("aced0004"); + Map hash = new HashMap<>(); + hash.put(key, value); + StatefulRedisModulesConnection connection = RedisModulesUtils.connection(redisClient, + ByteArrayCodec.INSTANCE); + StatefulRedisModulesConnection targetConnection = RedisModulesUtils + .connection(targetRedisClient, ByteArrayCodec.INSTANCE); + connection.sync().hset(key, hash); + Replication replication = new Replication(); + replication.setCompareMode(CompareMode.NONE); + replication.setType(ReplicationType.STRUCT); + execute(replication, info); + Assertions.assertArrayEquals(connection.sync().hget(key, key), targetConnection.sync().hget(key, key)); + } + + @Test + void binaryKeyValueSnapshotReplication(TestInfo info) throws Exception { + byte[] key = Hex.decode("aced0005"); + byte[] value = Hex.decode("aced0004"); StatefulRedisModulesConnection connection = RedisModulesUtils.connection(redisClient, ByteArrayCodec.INSTANCE); StatefulRedisModulesConnection targetConnection = RedisModulesUtils @@ -117,7 +136,7 @@ void binaryKeySnapshotReplication(TestInfo info) throws Exception { @Test void binaryKeyLiveReplication(TestInfo info) throws Exception { byte[] key = Hex.decode("aced0005"); - byte[] value = "value".getBytes(); + byte[] value = Hex.decode("aced0004"); StatefulRedisModulesConnection connection = RedisModulesUtils.connection(redisClient, ByteArrayCodec.INSTANCE); StatefulRedisModulesConnection targetConnection = RedisModulesUtils