Skip to content

Commit

Permalink
feat: Improved replication status reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Sep 7, 2023
1 parent f5ccaeb commit f24e7a4
Show file tree
Hide file tree
Showing 34 changed files with 218 additions and 164 deletions.
3 changes: 0 additions & 3 deletions connectors/riot-db/riot-db.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ dependencies {
implementation group: 'com.microsoft.sqlserver', name: 'mssql-jdbc', version: mssqlVersion
implementation group: 'com.oracle.ojdbc', name: 'ojdbc8', version: oracleVersion
implementation group: 'org.xerial', name: 'sqlite-jdbc', version: sqliteVersion
testImplementation group: 'com.redis', name: 'spring-batch-redis', version: springBatchRedisVersion, classifier: 'tests'
testImplementation group: 'org.testcontainers', name: 'postgresql', version: testcontainersVersion
testImplementation group: 'org.testcontainers', name: 'oracle-xe', version: testcontainersVersion
}

compileJava {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@

public class DatabaseExport extends AbstractExport {

private final String sql;

public static final boolean DEFAULT_ASSERT_UPDATES = true;

public static final Pattern DEFAULT_KEY_PATTERN = Pattern.compile("\\w+:(?<id>.+)");

private final String sql;

private DataSourceOptions dataSourceOptions = new DataSourceOptions();

private Pattern keyPattern = DEFAULT_KEY_PATTERN;

private boolean assertUpdates = DEFAULT_ASSERT_UPDATES;

private DataSourceOptions dataSourceOptions = new DataSourceOptions();

public DatabaseExport(AbstractRedisClient client, String sql) {
super(client);
this.sql = sql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.junit.jupiter.api.Test;
import org.springframework.batch.item.ExecutionContext;

import com.redis.riot.faker.FakerItemReader;
import com.redis.spring.batch.util.BatchUtils;

class FakerReaderTests {
Expand Down
1 change: 1 addition & 0 deletions connectors/riot-file/riot-file.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
exclude group: 'javax.annotation', module: 'javax.annotation-api'
}
testImplementation group: 'com.redis', name: 'spring-batch-redis', version: springBatchRedisVersion, classifier: 'tests'
testImplementation 'commons-io:commons-io:2.13.0'
}

compileJava {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@
import org.springframework.core.io.WritableResource;
import org.springframework.util.Assert;

import com.redis.riot.file.resource.AbstractResourceItemWriter;
import com.redis.riot.file.resource.CountingOutputStream;
import com.redis.riot.file.resource.TransactionAwareBufferedWriter;

/**
* Base class for item writers that write data to a file or stream. This class provides common features like restart, force
* sync, append etc. The location of the output file is defined by a {@link Resource} which must represent a writable file.<br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@

package com.redis.riot.file.resource;

import java.util.List;

import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.redis.riot.file.resource.AbstractResourceItemWriter;
import com.redis.riot.file.resource.FlatResourceItemWriter;

import java.util.List;

/**
* This class is an item writer that writes data to a file or stream. The writer
* also provides restart. The location of the output file is defined by a
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package com.redis.riot.file.resource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;

import org.springframework.batch.item.file.FlatFileFooterCallback;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.transform.*;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.FieldExtractor;
import org.springframework.batch.item.file.transform.FormatterLineAggregator;
import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.core.io.Resource;
import org.springframework.core.io.WritableResource;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import com.redis.riot.file.resource.AbstractResourceItemWriter;
import com.redis.riot.file.resource.FlatResourceItemWriter;
import com.redis.riot.file.resource.FlatResourceItemWriterBuilder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;

public class FlatResourceItemWriterBuilder<T> {

private WritableResource resource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.redis.riot.file.resource;

import java.util.Iterator;
import java.util.List;

import org.springframework.batch.item.json.GsonJsonObjectMarshaller;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonObjectMarshaller;
Expand All @@ -24,12 +27,6 @@
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.redis.riot.file.resource.AbstractResourceItemWriter;
import com.redis.riot.file.resource.JsonResourceItemWriter;

import java.util.Iterator;
import java.util.List;

/**
* Item writer that writes data in json format to an output file. The location
* of the output file is defined by a {@link Resource} and must represent a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
import org.springframework.core.io.WritableResource;
import org.springframework.util.Assert;

import com.redis.riot.file.resource.AbstractResourceItemWriter;
import com.redis.riot.file.resource.JsonResourceItemWriter;
import com.redis.riot.file.resource.JsonResourceItemWriterBuilder;

/**
* Builder for {@link JsonResourceItemWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import com.redis.riot.file.resource.XmlItemReader;
import com.redis.riot.file.resource.XmlObjectReader;

/**
* {@link ItemStreamReader} implementation that reads XML objects from a {@link Resource} having the following format:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import com.redis.riot.file.resource.XmlItemReader;
import com.redis.riot.file.resource.XmlItemReaderBuilder;
import com.redis.riot.file.resource.XmlObjectReader;

public class XmlItemReaderBuilder<T> {

private XmlObjectReader<T> xmlObjectReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.springframework.util.ClassUtils;

import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.redis.riot.file.resource.AbstractResourceItemWriter;
import com.redis.riot.file.resource.XmlResourceItemWriter;

/**
* Item writer that writes data in XML format to an output file. The location of the output file is defined by a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import org.springframework.core.io.WritableResource;
import org.springframework.util.Assert;

import com.redis.riot.file.resource.XmlResourceItemWriter;
import com.redis.riot.file.resource.XmlResourceItemWriterBuilder;

/**
* Builder for {@link XmlResourceItemWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.springframework.batch.item.UnexpectedInputException;

import com.redis.riot.core.operation.HsetBuilder;
import com.redis.riot.file.FileImport;
import com.redis.spring.batch.test.AbstractTestBase;

abstract class FileTests extends AbstractTestBase {
Expand All @@ -27,6 +26,7 @@ abstract class FileTests extends AbstractTestBase {

private static final String keyspace = "beer";

@SuppressWarnings("unchecked")
@Test
void fileImportJSON() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
FileImport executable = new FileImport(client, BEERS_JSON_URL);
Expand All @@ -43,6 +43,7 @@ void fileImportJSON() throws UnexpectedInputException, ParseException, NonTransi
Assertions.assertEquals("Hocus Pocus", beer1.get("name"));
}

@SuppressWarnings("unchecked")
@Test
void fileApiImportCSV() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
FileImport executable = new FileImport(client, "https://storage.googleapis.com/jrx/beers.csv");
Expand All @@ -59,6 +60,7 @@ void fileApiImportCSV() throws UnexpectedInputException, ParseException, NonTran

}

@SuppressWarnings("unchecked")
@Test
void fileApiFileExpansion() throws IOException {
Path temp = Files.createTempDirectory("fileExpansion");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,27 @@ private long incrementAndGet(Status status) {
return counts.get(status).incrementAndGet();
}

public long getMissing() {
return getCount(Status.MISSING);
}

public long getType() {
return getCount(Status.TYPE);
}

public long getTtl() {
return getCount(Status.TTL);
}

public long getValue() {
return getCount(Status.VALUE);
}

public long getCount(Status status) {
return counts.get(status).get();
}

public long getTotalCount() {
public long getTotal() {
return counts.values().stream().collect(Collectors.summingLong(AtomicLong::get));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getStatus().isUnsuccessful()) {
return null;
}
if (writer.getTotalCount() == writer.getCount(Status.OK)) {
if (writer.getTotal() == writer.getCount(Status.OK)) {
out.println("Verification completed: all OK");
return ExitStatus.COMPLETED;
}
Expand Down
17 changes: 7 additions & 10 deletions core/riot-core/src/main/java/com/redis/riot/core/Replication.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class Replication extends AbstractExport {

private final AbstractRedisClient targetClient;

private PrintWriter out = new PrintWriter(System.out);
private final PrintWriter out;

private ReplicationMode mode = ReplicationMode.SNAPSHOT;

Expand All @@ -66,6 +66,12 @@ public class Replication extends AbstractExport {

private RedisWriterOptions targetWriterOptions = new RedisWriterOptions();

public Replication(AbstractRedisClient client, AbstractRedisClient targetClient, PrintWriter out) {
super(client);
this.targetClient = targetClient;
this.out = out;
}

public void setTargetReaderOptions(RedisReaderOptions targetReaderOptions) {
this.targetReaderOptions = targetReaderOptions;
}
Expand Down Expand Up @@ -98,15 +104,6 @@ public void setShowDiff(boolean showDiff) {
this.showDiff = showDiff;
}

public Replication(AbstractRedisClient client, AbstractRedisClient targetClient) {
super(client);
this.targetClient = targetClient;
}

public void setOut(PrintWriter out) {
this.out = out;
}

@Override
protected ValueType getValueType() {
return valueType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public ItemReader<I> getReader() {
return reader;
}

public ItemWriter<O> getWriter() {
return writer;
}

@SuppressWarnings("unchecked")
public StepBuilder<I, O> skippableExceptions(Class<? extends Throwable>... exceptions) {
this.skippableExceptions = Arrays.asList(exceptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.PrintWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -41,6 +42,8 @@ protected static double abv(Map<String, String> beer) {
return Double.parseDouble(beer.get("abv"));
}

private PrintWriter printWriter = new PrintWriter(System.out);

protected void execute(AbstractJobExecutable executable, TestInfo info) {
executable.setName(name(info));
executable.execute();
Expand Down Expand Up @@ -94,7 +97,7 @@ void testMapProcessorFilter(TestInfo info) throws JobExecutionException {
void replicate(TestInfo info) throws Throwable {
generate(info);
Assertions.assertTrue(commands.dbsize() > 0);
Replication replicate = new Replication(client, targetClient);
Replication replicate = new Replication(client, targetClient, printWriter);
replicate.execute();
Assertions.assertTrue(compare(info));
}
Expand All @@ -104,7 +107,7 @@ void keyProcessor(TestInfo info) throws Throwable {
String key1 = "key1";
String value1 = "value1";
commands.set(key1, value1);
Replication replication = new Replication(client, targetClient);
Replication replication = new Replication(client, targetClient, printWriter);
replication.setProcessorOptions(operatorOptions("#{type}:#{key}"));
execute(replication, info);
Assertions.assertEquals(value1, targetCommands.get("string:" + key1));
Expand All @@ -121,7 +124,7 @@ void keyProcessorWithDate(TestInfo info) throws Throwable {
String key1 = "key1";
String value1 = "value1";
commands.set(key1, value1);
Replication replication = new Replication(client, targetClient);
Replication replication = new Replication(client, targetClient, printWriter);
replication.setProcessorOptions(
operatorOptions(String.format("#{#date.parse('%s').getTime()}:#{key}", "2010-05-10T00:00:00.000+0000")));
execute(replication, info);
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ oracleVersion = 19.3.0.0
picocliVersion = 4.7.5
plexusVersion = 4.0.0
progressbarVersion = 0.10.0
springBatchRedisVersion = 3.6.6
springBatchRedisVersion = 3.6.7-SNAPSHOT
sqliteVersion = 3.43.0.0
testcontainersRedisVersion = 1.6.4
testcontainersVersion = 1.19.0
Expand Down
4 changes: 4 additions & 0 deletions plugins/riot/riot.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ dependencies {
implementation 'org.slf4j:slf4j-simple'
testImplementation group: 'com.redis', name: 'spring-batch-redis', version: springBatchRedisVersion, classifier: 'tests'
testImplementation group: 'org.awaitility', name: 'awaitility', version: awaitilityVersion
testImplementation 'org.springframework.boot:spring-boot-autoconfigure'
testImplementation 'org.springframework:spring-jdbc'
testImplementation group: 'org.testcontainers', name: 'postgresql', version: testcontainersVersion
testImplementation group: 'org.testcontainers', name: 'oracle-xe', version: testcontainersVersion
}

bootStartScripts.enabled = false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.redis.riot.cli;

import java.util.function.Supplier;

import com.redis.riot.core.AbstractExport;
import com.redis.riot.core.AbstractJobExecutable;
import com.redis.riot.core.RedisReaderOptions;
Expand Down Expand Up @@ -38,4 +40,9 @@ protected String taskName(StepBuilder<?, ?> step) {
return "Exporting";
}

@Override
protected Supplier<String> extraMessage(StepBuilder<?, ?> step) {
return null;
}

}
Loading

0 comments on commit f24e7a4

Please sign in to comment.