Skip to content

Commit

Permalink
deps: Upgraded spring batch
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 21, 2024
1 parent a357abb commit a50daf6
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.riot.redis;

import java.util.function.Function;
import java.util.stream.StreamSupport;

import org.slf4j.Logger;
Expand All @@ -9,35 +10,48 @@

import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.KeyComparison.Status;
import com.redis.spring.batch.util.BatchUtils;

public class KeyComparisonDiffLogger implements ItemWriteListener<KeyComparison> {
import io.lettuce.core.codec.RedisCodec;

public class KeyComparisonDiffLogger<K> implements ItemWriteListener<KeyComparison<K>> {

private final Logger log = LoggerFactory.getLogger(getClass());

private final Function<K, String> toStringKeyFunction;

public KeyComparisonDiffLogger(RedisCodec<K, ?> codec) {
toStringKeyFunction = BatchUtils.toStringKeyFunction(codec);
}

@Override
public void afterWrite(Chunk<? extends KeyComparison> items) {
public void afterWrite(Chunk<? extends KeyComparison<K>> items) {
StreamSupport.stream(items.spliterator(), false).filter(c -> c.getStatus() != Status.OK).forEach(this::log);
}

public void log(KeyComparison comparison) {
public void log(KeyComparison<K> comparison) {
switch (comparison.getStatus()) {
case MISSING:
log.error("Missing key {}", comparison.getSource().getKey());
log.error("Missing key {}", key(comparison));
break;
case TYPE:
log.error("Type mismatch on key {}. Expected {} but was {}", comparison.getSource().getKey(),
log.error("Type mismatch on key {}. Expected {} but was {}", key(comparison),
comparison.getSource().getType(), comparison.getTarget().getType());
break;
case VALUE:
log.error("Value mismatch on key {}", comparison.getSource().getKey());
log.error("Value mismatch on key {}", key(comparison));
break;
case TTL:
log.error("TTL mismatch on key {}. Expected {} but was {}", comparison.getSource().getKey(),
log.error("TTL mismatch on key {}. Expected {} but was {}", key(comparison),
comparison.getSource().getTtl(), comparison.getTarget().getTtl());
break;
default:
break;
}
}

private String key(KeyComparison<K> comparison) {
return toStringKeyFunction.apply(comparison.getSource().getKey());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,52 @@
import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.KeyComparison.Status;

public class KeyComparisonStatusCountItemWriter extends AbstractItemStreamItemWriter<KeyComparison> {
public class KeyComparisonStatusCountItemWriter extends AbstractItemStreamItemWriter<KeyComparison<?>> {

private final Map<Status, AtomicLong> counts = Stream.of(Status.values())
.collect(Collectors.toMap(Function.identity(), s -> new AtomicLong()));
private final Map<Status, AtomicLong> counts = Stream.of(Status.values())
.collect(Collectors.toMap(Function.identity(), s -> new AtomicLong()));

private long incrementAndGet(Status status) {
return counts.get(status).incrementAndGet();
}
private long incrementAndGet(Status status) {
return counts.get(status).incrementAndGet();
}

public long getOK() {
return getCount(Status.OK);
}
public long getOK() {
return getCount(Status.OK);
}

public long getMissing() {
return getCount(Status.MISSING);
}
public long getMissing() {
return getCount(Status.MISSING);
}

public long getType() {
return getCount(Status.TYPE);
}
public long getType() {
return getCount(Status.TYPE);
}

public long getTtl() {
return getCount(Status.TTL);
}
public long getTtl() {
return getCount(Status.TTL);
}

public long getValue() {
return getCount(Status.VALUE);
}
public long getValue() {
return getCount(Status.VALUE);
}

public long getCount(Status status) {
return counts.get(status).get();
}
public long getCount(Status status) {
return counts.get(status).get();
}

public List<Long> getCounts(Status... statuses) {
return Stream.of(statuses).map(this::getCount).collect(Collectors.toList());
}
public List<Long> getCounts(Status... statuses) {
return Stream.of(statuses).map(this::getCount).collect(Collectors.toList());
}

public long getTotal() {
return counts.values().stream().collect(Collectors.summingLong(AtomicLong::get));
}
public long getTotal() {
return counts.values().stream().collect(Collectors.summingLong(AtomicLong::get));
}

@Override
public void write(Chunk<? extends KeyComparison> items) {
for (KeyComparison comparison : items) {
incrementAndGet(comparison.getStatus());
}
}
@Override
public void write(Chunk<? extends KeyComparison<?>> items) {
for (KeyComparison<?> comparison : items) {
incrementAndGet(comparison.getStatus());
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.redis.riot.redis;

import java.time.Duration;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,6 +13,7 @@
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
Expand All @@ -21,6 +23,7 @@
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.AbstractExport;
import com.redis.riot.core.LoggingWriteListener;
import com.redis.riot.core.RedisClientOptions;
import com.redis.riot.core.RedisWriterOptions;
import com.redis.spring.batch.KeyValue;
Expand All @@ -31,6 +34,7 @@
import com.redis.spring.batch.reader.KeyComparatorOptions;
import com.redis.spring.batch.reader.KeyComparatorOptions.StreamMessageIdPolicy;
import com.redis.spring.batch.reader.KeyComparisonItemReader;
import com.redis.spring.batch.util.BatchUtils;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
Expand Down Expand Up @@ -69,7 +73,7 @@ public class Replication extends AbstractExport {
protected boolean isStruct() {
return type == ReplicationType.STRUCT;
}

@Override
public void execute() throws Exception {
try {
Expand Down Expand Up @@ -100,6 +104,10 @@ protected Job job() {
liveReader.setMode(ReaderMode.LIVE);
FlushingStepBuilder<KeyValue<byte[], Object>, KeyValue<byte[], Object>> liveStep = flushingStep(
step(STEP_LIVE, liveReader, writer()).processor(processor));
if (log.isInfoEnabled()) {
addLoggingWriteListener(scanStep);
addLoggingWriteListener(liveStep);
}
KeyComparisonStatusCountItemWriter compareWriter = new KeyComparisonStatusCountItemWriter();
TaskletStep compareStep = step(STEP_COMPARE, comparisonReader(), compareWriter).build();
switch (mode) {
Expand Down Expand Up @@ -130,6 +138,20 @@ protected Job job() {
}
}

private static final Function<byte[], String> toString = BatchUtils.toStringKeyFunction(ByteArrayCodec.INSTANCE);

private void addLoggingWriteListener(SimpleStepBuilder<KeyValue<byte[], Object>, KeyValue<byte[], Object>> step) {
step.listener(new LoggingWriteListener<>(this::log));
}

private void log(Chunk<? extends KeyValue<byte[], Object>> chunk) {
chunk.getItems().stream().forEach(this::log);
}

private void log(KeyValue<byte[], Object> keyValue) {
log.info("Wrote {} {}", keyValue.getType().getCode(), toString.apply(keyValue.getKey()));
}

private FlowBuilder<SimpleFlow> flow(String name) {
return new FlowBuilder<>(name(name));
}
Expand All @@ -141,21 +163,11 @@ private boolean shouldCompare() {
@Override
protected <I, O> FaultTolerantStepBuilder<I, O> step(String name, ItemReader<I> reader, ItemWriter<O> writer) {
FaultTolerantStepBuilder<I, O> step = super.step(name, reader, writer);
switch (name) {
case STEP_COMPARE:
if (STEP_COMPARE.equals(name)) {
if (showDiffs) {
step.listener(new KeyComparisonDiffLogger());
step.listener(new KeyComparisonDiffLogger<>(ByteArrayCodec.INSTANCE));
}
step.listener(new KeyComparisonSummaryLogger((KeyComparisonStatusCountItemWriter) writer));
break;
case STEP_LIVE:
case STEP_SCAN:
if (log.isDebugEnabled()) {
step.listener(new KeyValueWriteListener<>(ByteArrayCodec.INSTANCE, log));
}
break;
default:
break;
}
return step;
}
Expand All @@ -166,8 +178,7 @@ private void checkKeyspaceNotificationEnabled() {
String config = connection.sync().configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS)
.getOrDefault(CONFIG_NOTIFY_KEYSPACE_EVENTS, "");
if (!config.contains("K")) {
log.error(
"Keyspace notifications not property configured ({}={}). Make sure it contains at least \"K\".",
log.error("Keyspace notifications not property configured ({}={}). Use the string KEA to enable them.",
CONFIG_NOTIFY_KEYSPACE_EVENTS, config);
}
} catch (RedisException e) {
Expand All @@ -190,9 +201,10 @@ private RedisItemReader<byte[], byte[], KeyValue<byte[], Object>> createReader()
return (RedisItemReader) RedisItemReader.dump();
}

private KeyComparisonItemReader comparisonReader() {
KeyComparisonItemReader reader = compareMode == CompareMode.FULL ? RedisItemReader.compare()
: RedisItemReader.compareQuick();
private KeyComparisonItemReader<byte[], byte[]> comparisonReader() {
KeyComparisonItemReader<byte[], byte[]> reader = compareMode == CompareMode.FULL
? RedisItemReader.compare(ByteArrayCodec.INSTANCE)
: RedisItemReader.compareQuick(ByteArrayCodec.INSTANCE);
configureReader(reader);
reader.setClient(getRedisClient());
reader.setTargetClient(targetRedisClient);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.riot.redis;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
Expand Down Expand Up @@ -100,9 +101,27 @@ void keyProcessorWithDate(TestInfo info) throws Throwable {
}

@Test
void binaryKeySnapshotReplication(TestInfo info) throws Exception {
void binaryKeyValueSnapshotReplicationType(TestInfo info) throws Exception {
byte[] key = Hex.decode("aced0005");
byte[] value = "value".getBytes();
byte[] value = Hex.decode("aced0004");
Map<byte[], byte[]> hash = new HashMap<>();
hash.put(key, value);
StatefulRedisModulesConnection<byte[], byte[]> connection = RedisModulesUtils.connection(redisClient,
ByteArrayCodec.INSTANCE);
StatefulRedisModulesConnection<byte[], byte[]> targetConnection = RedisModulesUtils
.connection(targetRedisClient, ByteArrayCodec.INSTANCE);
connection.sync().hset(key, hash);
Replication replication = new Replication();
replication.setCompareMode(CompareMode.NONE);
replication.setType(ReplicationType.STRUCT);
execute(replication, info);
Assertions.assertArrayEquals(connection.sync().hget(key, key), targetConnection.sync().hget(key, key));
}

@Test
void binaryKeyValueSnapshotReplication(TestInfo info) throws Exception {
byte[] key = Hex.decode("aced0005");
byte[] value = Hex.decode("aced0004");
StatefulRedisModulesConnection<byte[], byte[]> connection = RedisModulesUtils.connection(redisClient,
ByteArrayCodec.INSTANCE);
StatefulRedisModulesConnection<byte[], byte[]> targetConnection = RedisModulesUtils
Expand All @@ -117,7 +136,7 @@ void binaryKeySnapshotReplication(TestInfo info) throws Exception {
@Test
void binaryKeyLiveReplication(TestInfo info) throws Exception {
byte[] key = Hex.decode("aced0005");
byte[] value = "value".getBytes();
byte[] value = Hex.decode("aced0004");
StatefulRedisModulesConnection<byte[], byte[]> connection = RedisModulesUtils.connection(redisClient,
ByteArrayCodec.INSTANCE);
StatefulRedisModulesConnection<byte[], byte[]> targetConnection = RedisModulesUtils
Expand Down

0 comments on commit a50daf6

Please sign in to comment.