From 1c4c71c7ebd1da8336db9757dde29d505141ca3b Mon Sep 17 00:00:00 2001 From: Julien Ruaux Date: Thu, 8 Sep 2022 20:55:31 -0700 Subject: [PATCH] deps: Upgraded to spring batch redis 3.0.0 --- .../com/redis/riot/db/DataSourceOptions.java | 8 +- .../redis/riot/db/DatabaseExportCommand.java | 6 +- .../riot/db/PostgresIntegrationTests.java | 2 +- .../riot/file/DumpFileImportCommand.java | 8 +- .../com/redis/riot/file/DumpFileOptions.java | 14 +- .../redis/riot/file/FileExportCommand.java | 2 +- .../redis/riot/file/FileExportOptions.java | 8 +- .../redis/riot/file/FileImportCommand.java | 54 ++-- .../redis/riot/file/FileImportOptions.java | 18 +- .../java/com/redis/riot/file/FileOptions.java | 4 +- .../java/com/redis/riot/file/GcsOptions.java | 6 +- .../java/com/redis/riot/file/S3Options.java | 39 +-- .../redis/riot/file/FileIntegrationTests.java | 2 +- .../redis/riot/file/TestXmlItemWriter.java | 4 +- .../gen/DataStructureGeneratorCommand.java | 13 +- .../gen/DataStructureGeneratorOptions.java | 28 +- .../redis/riot/gen/FakerGeneratorCommand.java | 6 +- .../redis/riot/gen/FakerGeneratorOptions.java | 6 +- .../com/redis/riot/gen/FakerReaderTests.java | 2 +- .../riot/redis/AbstractRedisCommand.java | 12 +- .../riot/redis/AbstractReplicateCommand.java | 109 +++---- .../riot/redis/AbstractTargetCommand.java | 132 +++++--- .../com/redis/riot/redis/CompareOptions.java | 2 +- .../com/redis/riot/redis/LatencyCommand.java | 8 +- .../redis/riot/redis/ReplicateCommand.java | 27 +- .../redis/riot/redis/ReplicateDSCommand.java | 26 +- .../redis/riot/redis/ReplicationOptions.java | 25 +- .../riot/redis/TargetCommandContext.java | 33 +- .../riot/redis/RedisIntegrationTests.java | 4 +- .../test/resources/replicate-key-processor | 2 +- .../riot/stream/StreamExportCommand.java | 18 +- .../riot/stream/StreamImportCommand.java | 9 +- .../com/redis/riot/AbstractExportCommand.java | 15 +- .../com/redis/riot/AbstractImportCommand.java | 52 +++- .../com/redis/riot/AbstractJobCommand.java | 12 +- .../redis/riot/AbstractTransferCommand.java | 30 +- .../redis/riot/DoubleRangeTypeConverter.java | 2 +- .../redis/riot/FlushingTransferOptions.java | 34 +-- .../main/java/com/redis/riot/HelpOptions.java | 2 +- .../com/redis/riot/IntRangeTypeConverter.java | 2 +- .../com/redis/riot/JobCommandContext.java | 62 +++- .../redis/riot/KeyValueProcessorOptions.java | 27 -- .../java/com/redis/riot/LoggingOptions.java | 44 +-- .../com/redis/riot/MapProcessorOptions.java | 75 +---- .../java/com/redis/riot/ProgressMonitor.java | 2 +- .../redis/riot/ProgressMonitorOptions.java | 4 +- .../java/com/redis/riot/RedisOptions.java | 287 ++++++++++++------ .../com/redis/riot/RedisReaderOptions.java | 126 +++----- .../com/redis/riot/RedisWriterOptions.java | 40 +-- .../src/main/java/com/redis/riot/RiotApp.java | 6 +- .../java/com/redis/riot/StepSkipPolicy.java | 23 ++ .../java/com/redis/riot/TransferOptions.java | 18 +- .../processor/AbstractKeyValueProcessor.java | 23 -- ....java => DataStructureToMapProcessor.java} | 15 +- .../riot/processor/KeyValueKeyProcessor.java | 27 -- .../riot/processor/KeyValueProcessor.java | 32 ++ .../riot/processor/KeyValueTTLProcessor.java | 23 -- .../redis/riot/processor/SpelProcessor.java | 48 +-- .../redis/riot/redis/CollectionOptions.java | 4 +- .../com/redis/riot/redis/EvalOptions.java | 8 +- .../com/redis/riot/redis/ExpireOptions.java | 4 +- .../redis/riot/redis/FilteringOptions.java | 4 +- .../com/redis/riot/redis/GeoaddOptions.java | 4 +- .../com/redis/riot/redis/JsonSetCommand.java | 2 +- .../java/com/redis/riot/redis/KeyOptions.java | 4 +- .../redis/riot/redis/RedisCommandOptions.java | 6 +- .../java/com/redis/riot/redis/SetOptions.java | 6 +- .../com/redis/riot/redis/SugaddOptions.java | 10 +- .../com/redis/riot/redis/TsAddOptions.java | 4 +- .../com/redis/riot/redis/XaddOptions.java | 4 +- .../com/redis/riot/redis/ZaddOptions.java | 4 +- .../riot/convert/RangeTypeConverterTests.java | 4 +- core/riot-test/riot-test.gradle | 1 - .../riot/AbstractRiotIntegrationTests.java | 50 ++- gradle.properties | 10 +- 75 files changed, 955 insertions(+), 842 deletions(-) delete mode 100644 core/riot-core/src/main/java/com/redis/riot/KeyValueProcessorOptions.java create mode 100644 core/riot-core/src/main/java/com/redis/riot/StepSkipPolicy.java delete mode 100644 core/riot-core/src/main/java/com/redis/riot/processor/AbstractKeyValueProcessor.java rename core/riot-core/src/main/java/com/redis/riot/processor/{DataStructureItemProcessor.java => DataStructureToMapProcessor.java} (87%) delete mode 100644 core/riot-core/src/main/java/com/redis/riot/processor/KeyValueKeyProcessor.java create mode 100644 core/riot-core/src/main/java/com/redis/riot/processor/KeyValueProcessor.java delete mode 100644 core/riot-core/src/main/java/com/redis/riot/processor/KeyValueTTLProcessor.java diff --git a/connectors/riot-db/src/main/java/com/redis/riot/db/DataSourceOptions.java b/connectors/riot-db/src/main/java/com/redis/riot/db/DataSourceOptions.java index aa49240f3..8e2749bb2 100644 --- a/connectors/riot-db/src/main/java/com/redis/riot/db/DataSourceOptions.java +++ b/connectors/riot-db/src/main/java/com/redis/riot/db/DataSourceOptions.java @@ -7,13 +7,13 @@ public class DataSourceOptions { - @Option(names = "--driver", description = "Fully qualified name of the JDBC driver", paramLabel = "") + @Option(names = "--driver", description = "Fully qualified name of the JDBC driver.", paramLabel = "") private String driver; - @Option(names = "--url", required = true, description = "JDBC URL to connect to the database", paramLabel = "") + @Option(names = "--url", required = true, description = "JDBC URL to connect to the database.", paramLabel = "") private String url; - @Option(names = "--username", description = "Login username of the database", paramLabel = "") + @Option(names = "--username", description = "Login username of the database.", paramLabel = "") private String username; - @Option(names = "--password", arity = "0..1", interactive = true, description = "Login password of the database", paramLabel = "") + @Option(names = "--password", arity = "0..1", interactive = true, description = "Login password of the database.", paramLabel = "") private String password; public String getDriver() { diff --git a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseExportCommand.java b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseExportCommand.java index 9b1990ac0..9e4e5ae20 100644 --- a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseExportCommand.java +++ b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseExportCommand.java @@ -15,8 +15,8 @@ import com.redis.riot.AbstractExportCommand; import com.redis.riot.JobCommandContext; -import com.redis.riot.processor.DataStructureItemProcessor; -import com.redis.spring.batch.DataStructure; +import com.redis.riot.processor.DataStructureToMapProcessor; +import com.redis.spring.batch.common.DataStructure; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; @@ -58,7 +58,7 @@ protected Job job(JobCommandContext context) throws SQLException { builder.assertUpdates(exportOptions.isAssertUpdates()); JdbcBatchItemWriter> writer = builder.build(); writer.afterPropertiesSet(); - ItemProcessor, Map> processor = DataStructureItemProcessor + ItemProcessor, Map> processor = DataStructureToMapProcessor .of(exportOptions.getKeyRegex()); String task = String.format("Exporting to %s", dbName); return job(context, NAME, step(context, NAME, reader(context), processor, writer), task); diff --git a/connectors/riot-db/src/test/java/com/redis/riot/db/PostgresIntegrationTests.java b/connectors/riot-db/src/test/java/com/redis/riot/db/PostgresIntegrationTests.java index c03b057a7..e1fa73ef8 100644 --- a/connectors/riot-db/src/test/java/com/redis/riot/db/PostgresIntegrationTests.java +++ b/connectors/riot-db/src/test/java/com/redis/riot/db/PostgresIntegrationTests.java @@ -28,7 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.redis.lettucemod.api.sync.RedisModulesCommands; -import com.redis.spring.batch.DataStructure.Type; +import com.redis.spring.batch.common.DataStructure.Type; import com.redis.spring.batch.reader.DataStructureGeneratorItemReader; import com.redis.testcontainers.junit.RedisTestContext; import com.redis.testcontainers.junit.RedisTestContextsSource; diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileImportCommand.java b/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileImportCommand.java index 1d8a943c9..6a41c2d1e 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileImportCommand.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileImportCommand.java @@ -21,9 +21,9 @@ import com.redis.riot.ProgressMonitor; import com.redis.riot.RedisWriterOptions; import com.redis.riot.file.resource.XmlItemReader; -import com.redis.spring.batch.DataStructure; -import com.redis.spring.batch.DataStructure.Type; import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.common.DataStructure; +import com.redis.spring.batch.common.DataStructure.Type; import io.lettuce.core.ScoredValue; import io.lettuce.core.StreamMessage; @@ -72,8 +72,8 @@ protected Job job(JobCommandContext context) throws Exception { reader.setName(name); ProgressMonitor monitor = progressMonitor().task("Importing " + expandedFile).build(); DataStructureProcessor processor = new DataStructureProcessor(); - RedisItemWriter> writer = writerOptions - .configure(RedisItemWriter.dataStructure(context.getRedisClient())).build(); + RedisItemWriter> writer = RedisItemWriter + .dataStructure(context.pool()).options(writerOptions.writerOptions()).build(); steps.add(step(step(context, name, reader, processor, writer), monitor).build()); } } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileOptions.java b/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileOptions.java index fc272cd98..90a600371 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileOptions.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileOptions.java @@ -8,7 +8,7 @@ public class DumpFileOptions extends FileOptions { - @Option(names = { "-t", "--filetype" }, description = "File type: ${COMPLETION-CANDIDATES}", paramLabel = "") + @Option(names = { "-t", "--filetype" }, description = "File type: ${COMPLETION-CANDIDATES}.", paramLabel = "") protected Optional type = Optional.empty(); public Optional getType() { @@ -30,18 +30,18 @@ public DumpFileType type(Resource resource) { return type.get(); } Optional extension = FileUtils.extension(resource); - if (extension.isEmpty()) { - throw new UnknownFileTypeException("Unknown file extension"); - } - switch (extension.get()) { + return type(extension.orElseThrow(() -> new UnknownFileTypeException("Unknown file extension"))); + } + + private DumpFileType type(FileExtension extension) { + switch (extension) { case XML: return DumpFileType.XML; case JSON: return DumpFileType.JSON; default: - throw new UnsupportedOperationException("Unsupported file extension: " + extension.get()); + throw new UnsupportedOperationException("Unsupported file extension: " + extension); } - } } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportCommand.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportCommand.java index f17307661..9fe1bd134 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportCommand.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportCommand.java @@ -11,7 +11,7 @@ import com.redis.riot.JobCommandContext; import com.redis.riot.file.resource.JsonResourceItemWriterBuilder; import com.redis.riot.file.resource.XmlResourceItemWriterBuilder; -import com.redis.spring.batch.DataStructure; +import com.redis.spring.batch.common.DataStructure; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportOptions.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportOptions.java index 1054948ff..425a4e2b4 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportOptions.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportOptions.java @@ -9,13 +9,13 @@ public class FileExportOptions extends DumpFileOptions { public static final String DEFAULT_ELEMENT_NAME = "record"; public static final String DEFAULT_ROOT_NAME = "root"; - @Option(names = "--append", description = "Append to file if it exists") + @Option(names = "--append", description = "Append to file if it exists.") private boolean append; - @Option(names = "--root", description = "XML root element tag name (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--root", description = "XML root element tag name (default: ${DEFAULT-VALUE}).", paramLabel = "") private String rootName = DEFAULT_ROOT_NAME; - @Option(names = "--element", description = "XML element tag name (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--element", description = "XML element tag name (default: ${DEFAULT-VALUE}).", paramLabel = "") private String elementName = DEFAULT_ELEMENT_NAME; - @Option(names = "--line-sep", description = "String to separate lines (default: system default)", paramLabel = "") + @Option(names = "--line-sep", description = "String to separate lines (default: system default).", paramLabel = "") private String lineSeparator = AbstractFileItemWriter.DEFAULT_LINE_SEPARATOR; public boolean isAppend() { diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportCommand.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportCommand.java index bdcccaffb..75956c684 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportCommand.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportCommand.java @@ -6,7 +6,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; @@ -107,39 +106,13 @@ private Resource resource(String file) throws IOException { return options.inputResource(file); } - private FileType type(Optional extension) { - Optional type = options.getType(); - if (type.isPresent()) { - return type.get(); - } - if (extension.isPresent()) { - switch (extension.get()) { - case FW: - return FileType.FIXED; - case JSON: - return FileType.JSON; - case XML: - return FileType.XML; - case CSV: - case PSV: - case TSV: - return FileType.DELIMITED; - default: - throw new UnknownFileTypeException("Unknown file extension: " + extension); - } - } - throw new UnknownFileTypeException("Could not determine file type"); - } - @SuppressWarnings({ "unchecked", "rawtypes" }) private AbstractItemStreamItemReader> reader(Resource resource) { - Optional extension = FileUtils.extension(resource); - FileType type = type(extension); + FileType type = options.getType().orElseGet(() -> type(resource)); switch (type) { case DELIMITED: DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); - options.getDelimiter().ifPresentOrElse(tokenizer::setDelimiter, - () -> tokenizer.setDelimiter(delimiter(extension.get()))); + tokenizer.setDelimiter(options.getDelimiter().orElseGet(() -> delimiter(extension(resource)))); tokenizer.setQuoteCharacter(options.getQuoteCharacter()); if (!ObjectUtils.isEmpty(options.getIncludedFields())) { tokenizer.setIncludedFields(options.getIncludedFields()); @@ -169,6 +142,29 @@ private AbstractItemStreamItemReader> reader(Resource resour } } + private FileType type(Resource resource) { + FileExtension extension = extension(resource); + switch (extension) { + case FW: + return FileType.FIXED; + case JSON: + return FileType.JSON; + case XML: + return FileType.XML; + case CSV: + case PSV: + case TSV: + return FileType.DELIMITED; + default: + throw new UnknownFileTypeException("Unknown file extension: " + extension); + } + } + + private FileExtension extension(Resource resource) { + return FileUtils.extension(resource) + .orElseThrow(() -> new UnknownFileTypeException("Could not determine file type")); + } + private String delimiter(FileExtension extension) { switch (extension) { case CSV: diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportOptions.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportOptions.java index 30c834f3c..728d2beee 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportOptions.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportOptions.java @@ -17,23 +17,23 @@ public enum FileType { public static final String DEFAULT_CONTINUATION_STRING = "\\"; - @Option(names = { "-t", "--filetype" }, description = "File type: ${COMPLETION-CANDIDATES}", paramLabel = "") + @Option(names = { "-t", "--filetype" }, description = "File type: ${COMPLETION-CANDIDATES}.", paramLabel = "") private Optional type = Optional.empty(); - @Option(names = "--fields", arity = "1..*", description = "Delimited/FW field names", paramLabel = "") + @Option(names = "--fields", arity = "1..*", description = "Delimited/FW field names.", paramLabel = "") private List names = new ArrayList<>(); - @Option(names = { "-h", "--header" }, description = "Delimited/FW first line contains field names") + @Option(names = { "-h", "--header" }, description = "Delimited/FW first line contains field names.") private boolean header; - @Option(names = "--delimiter", description = "Delimiter character", paramLabel = "") + @Option(names = "--delimiter", description = "Delimiter character.", paramLabel = "") private Optional delimiter = Optional.empty(); - @Option(names = "--skip", description = "Delimited/FW lines to skip at start", paramLabel = "") + @Option(names = "--skip", description = "Delimited/FW lines to skip at start.", paramLabel = "") private Optional linesToSkip = Optional.empty(); - @Option(names = "--include", arity = "1..*", description = "Delimited/FW field indices to include (0-based)", paramLabel = "") + @Option(names = "--include", arity = "1..*", description = "Delimited/FW field indices to include (0-based).", paramLabel = "") private int[] includedFields; - @Option(names = "--ranges", arity = "1..*", description = "Fixed-width column ranges", paramLabel = "") + @Option(names = "--ranges", arity = "1..*", description = "Fixed-width column ranges.", paramLabel = "") private List columnRanges = new ArrayList<>(); - @Option(names = "--quote", description = "Escape character for delimited files (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--quote", description = "Escape character for delimited files (default: ${DEFAULT-VALUE}).", paramLabel = "") private Character quoteCharacter = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER; - @Option(names = "--cont", description = "Line continuation string (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--cont", description = "Line continuation string (default: ${DEFAULT-VALUE}).", paramLabel = "") private String continuationString = DEFAULT_CONTINUATION_STRING; public FileImportOptions() { diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileOptions.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileOptions.java index d83c74e70..27f34c340 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileOptions.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileOptions.java @@ -23,9 +23,9 @@ public class FileOptions { public static final Charset DEFAULT_ENCODING = Charset.defaultCharset(); - @Option(names = "--encoding", description = "File encoding (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--encoding", description = "File encoding (default: ${DEFAULT-VALUE}).", paramLabel = "") protected Charset encoding = DEFAULT_ENCODING; - @Option(names = { "-z", "--gzip" }, description = "File is gzip compressed") + @Option(names = { "-z", "--gzip" }, description = "File is gzip compressed.") protected boolean gzip; @ArgGroup(exclusive = false, heading = "Amazon Simple Storage Service options%n") protected S3Options s3 = new S3Options(); diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/GcsOptions.java b/connectors/riot-file/src/main/java/com/redis/riot/file/GcsOptions.java index 49bbef427..0bb466507 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/GcsOptions.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/GcsOptions.java @@ -20,11 +20,11 @@ public class GcsOptions { - @Option(names = "--gcs-key-file", description = "GCS private key (e.g. /usr/local/key.json)", paramLabel = "") + @Option(names = "--gcs-key-file", description = "GCS private key (e.g. /usr/local/key.json).", paramLabel = "") private Optional credentials = Optional.empty(); - @Option(names = "--gcs-project", description = "GCP project id", paramLabel = "") + @Option(names = "--gcs-project", description = "GCP project id.", paramLabel = "") private Optional projectId = Optional.empty(); - @Option(names = "--gcs-key", arity = "0..1", interactive = true, description = "GCS Base64 encoded key", paramLabel = "") + @Option(names = "--gcs-key", arity = "0..1", interactive = true, description = "GCS Base64 encoded key.", paramLabel = "") private Optional encodedKey = Optional.empty(); public void setCredentials(File credentials) { diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/S3Options.java b/connectors/riot-file/src/main/java/com/redis/riot/file/S3Options.java index 6d1e250a6..610c14025 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/S3Options.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/S3Options.java @@ -5,10 +5,8 @@ import org.springframework.cloud.aws.core.io.s3.SimpleStorageProtocolResolver; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.io.Resource; -import org.springframework.util.Assert; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; @@ -18,11 +16,11 @@ public class S3Options { - @Option(names = "--s3-access", description = "Access key", paramLabel = "") + @Option(names = "--s3-access", description = "Access key.", paramLabel = "") private Optional accessKey = Optional.empty(); - @Option(names = "--s3-secret", arity = "0..1", interactive = true, description = "Secret key", paramLabel = "") + @Option(names = "--s3-secret", arity = "0..1", interactive = true, description = "Secret key.", paramLabel = "") private Optional secretKey = Optional.empty(); - @Option(names = "--s3-region", description = "AWS region", paramLabel = "") + @Option(names = "--s3-region", description = "AWS region.", paramLabel = "") private Optional region = Optional.empty(); public void setAccessKey(String accessKey) { @@ -40,10 +38,9 @@ public void setRegion(String region) { public Resource resource(String location) { AmazonS3ClientBuilder clientBuilder = AmazonS3Client.builder(); region.ifPresent(clientBuilder::withRegion); - if (accessKey.isPresent()) { - Assert.isTrue(secretKey.isPresent(), "Secret key is missing"); - clientBuilder.withCredentials(new SimpleAWSCredentialsProvider(accessKey.get(), secretKey.get())); - } + accessKey.ifPresent( + a -> clientBuilder.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(a, + secretKey.orElseThrow(() -> new IllegalArgumentException("Secret key is missing")))))); SimpleStorageProtocolResolver resolver = new SimpleStorageProtocolResolver() { @Override public AmazonS3 getAmazonS3() { @@ -54,28 +51,6 @@ public AmazonS3 getAmazonS3() { return resolver.resolve(location, new DefaultResourceLoader()); } - private static class SimpleAWSCredentialsProvider implements AWSCredentialsProvider { - - private final String accessKey; - private final String secretKey; - - public SimpleAWSCredentialsProvider(String accessKey, String secretKey) { - this.accessKey = accessKey; - this.secretKey = secretKey; - } - - @Override - public AWSCredentials getCredentials() { - return new BasicAWSCredentials(accessKey, secretKey); - } - - @Override - public void refresh() { - // do nothing - } - - } - @Override public String toString() { return "S3Options [accessKey=" + accessKey + ", secretKey=" + secretKey + ", region=" + region + "]"; diff --git a/connectors/riot-file/src/test/java/com/redis/riot/file/FileIntegrationTests.java b/connectors/riot-file/src/test/java/com/redis/riot/file/FileIntegrationTests.java index 5e3c993c6..ad16769e3 100644 --- a/connectors/riot-file/src/test/java/com/redis/riot/file/FileIntegrationTests.java +++ b/connectors/riot-file/src/test/java/com/redis/riot/file/FileIntegrationTests.java @@ -32,7 +32,7 @@ import com.redis.riot.file.resource.XmlItemReaderBuilder; import com.redis.riot.file.resource.XmlObjectReader; import com.redis.riot.redis.HsetCommand; -import com.redis.spring.batch.DataStructure; +import com.redis.spring.batch.common.DataStructure; import com.redis.testcontainers.junit.RedisTestContext; import com.redis.testcontainers.junit.RedisTestContextsSource; diff --git a/connectors/riot-file/src/test/java/com/redis/riot/file/TestXmlItemWriter.java b/connectors/riot-file/src/test/java/com/redis/riot/file/TestXmlItemWriter.java index f59175a4a..5c19ce3e3 100644 --- a/connectors/riot-file/src/test/java/com/redis/riot/file/TestXmlItemWriter.java +++ b/connectors/riot-file/src/test/java/com/redis/riot/file/TestXmlItemWriter.java @@ -16,8 +16,8 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper; import com.redis.riot.file.resource.XmlResourceItemWriter; import com.redis.riot.file.resource.XmlResourceItemWriterBuilder; -import com.redis.spring.batch.DataStructure; -import com.redis.spring.batch.DataStructure.Type; +import com.redis.spring.batch.common.DataStructure; +import com.redis.spring.batch.common.DataStructure.Type; class TestXmlItemWriter { diff --git a/connectors/riot-gen/src/main/java/com/redis/riot/gen/DataStructureGeneratorCommand.java b/connectors/riot-gen/src/main/java/com/redis/riot/gen/DataStructureGeneratorCommand.java index 7fced651f..e444e5e0f 100644 --- a/connectors/riot-gen/src/main/java/com/redis/riot/gen/DataStructureGeneratorCommand.java +++ b/connectors/riot-gen/src/main/java/com/redis/riot/gen/DataStructureGeneratorCommand.java @@ -11,9 +11,9 @@ import com.redis.riot.JobCommandContext; import com.redis.riot.ProgressMonitor; import com.redis.riot.RedisWriterOptions; -import com.redis.spring.batch.DataStructure; -import com.redis.spring.batch.DataStructure.Type; import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.common.DataStructure; +import com.redis.spring.batch.common.DataStructure.Type; import com.redis.spring.batch.reader.DataStructureGeneratorItemReader; import picocli.CommandLine.ArgGroup; @@ -35,8 +35,8 @@ public class DataStructureGeneratorCommand extends AbstractTransferCommand { @Override protected Job job(JobCommandContext context) throws Exception { - RedisItemWriter> writer = writerOptions - .configure(RedisItemWriter.dataStructure(context.getRedisClient())).build(); + RedisItemWriter> writer = RedisItemWriter.dataStructure(context.pool()) + .options(writerOptions.writerOptions()).build(); log.log(Level.FINE, "Creating random data structure reader with {0}", options); ProgressMonitor monitor = options.configure(progressMonitor()).task("Generating").build(); return job(context, NAME, step(context, NAME, reader(), null, writer), monitor); @@ -53,10 +53,9 @@ private ItemReader> reader() { .zsetScore(options.getZsetScore()).hashSize(options.getHashSize()) .hashFieldSize(options.getHashFieldSize()).jsonFieldCount(options.getJsonSize()) .jsonFieldSize(options.getJsonFieldSize()); - options.getTimeseriesStartTime().ifPresent(t -> reader.timeseriesStartTime(t.toEpochMilli())); - options.getExpiration().ifPresent(reader::expiration); + options.configureReader(reader); Optional sleep = options.getSleep(); - if (sleep.isPresent() && sleep.get() > 0) { + if (sleep.isPresent()) { return new ThrottledItemReader<>(reader.build(), sleep.get()); } return reader.build(); diff --git a/connectors/riot-gen/src/main/java/com/redis/riot/gen/DataStructureGeneratorOptions.java b/connectors/riot-gen/src/main/java/com/redis/riot/gen/DataStructureGeneratorOptions.java index 46157056b..3dfbcba07 100644 --- a/connectors/riot-gen/src/main/java/com/redis/riot/gen/DataStructureGeneratorOptions.java +++ b/connectors/riot-gen/src/main/java/com/redis/riot/gen/DataStructureGeneratorOptions.java @@ -1,16 +1,30 @@ package com.redis.riot.gen; -import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.*; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_HASH_FIELD_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_HASH_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_JSON_FIELD_COUNT; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_JSON_FIELD_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_KEYSPACE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_LIST_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_SET_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_STREAM_FIELD_COUNT; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_STREAM_FIELD_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_STREAM_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_STRING_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_TIMESERIES_SIZE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_ZSET_SCORE; +import static com.redis.spring.batch.reader.DataStructureGeneratorItemReader.DEFAULT_ZSET_SIZE; import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Optional; -import com.redis.spring.batch.DataStructure.Type; +import com.redis.spring.batch.common.DataStructure.Type; +import com.redis.spring.batch.common.DoubleRange; +import com.redis.spring.batch.common.IntRange; import com.redis.spring.batch.reader.DataStructureGeneratorItemReader; -import com.redis.spring.batch.support.DoubleRange; -import com.redis.spring.batch.support.IntRange; +import com.redis.spring.batch.reader.DataStructureGeneratorItemReader.Builder; import picocli.CommandLine.Option; @@ -202,4 +216,10 @@ public String toString() { + ", start=" + start + ", count=" + count + "]"; } + public Builder configureReader(Builder reader) { + timeseriesStartTime.ifPresent(t -> reader.timeseriesStartTime(t.toEpochMilli())); + expiration.ifPresent(reader::expiration); + return reader; + } + } diff --git a/connectors/riot-gen/src/main/java/com/redis/riot/gen/FakerGeneratorCommand.java b/connectors/riot-gen/src/main/java/com/redis/riot/gen/FakerGeneratorCommand.java index 7c6a78d56..dbc48137d 100644 --- a/connectors/riot-gen/src/main/java/com/redis/riot/gen/FakerGeneratorCommand.java +++ b/connectors/riot-gen/src/main/java/com/redis/riot/gen/FakerGeneratorCommand.java @@ -10,14 +10,13 @@ import org.springframework.batch.core.step.builder.SimpleStepBuilder; import org.springframework.batch.item.ItemReader; -import com.redis.lettucemod.RedisModulesUtils; import com.redis.lettucemod.api.StatefulRedisModulesConnection; import com.redis.lettucemod.api.sync.RediSearchCommands; import com.redis.lettucemod.search.Field; import com.redis.lettucemod.search.IndexInfo; +import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.AbstractImportCommand; import com.redis.riot.JobCommandContext; -import com.redis.riot.RedisOptions; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; @@ -51,8 +50,7 @@ private ItemReader> reader(JobCommandContext context) { } private void addFieldsFromIndex(JobCommandContext context, String index, Map fields) { - try (StatefulRedisModulesConnection connection = RedisOptions - .connect(context.getRedisClient())) { + try (StatefulRedisModulesConnection connection = context.connection()) { RediSearchCommands commands = connection.sync(); IndexInfo info = RedisModulesUtils.indexInfo(commands.ftInfo(index)); for (Field field : info.getFields()) { diff --git a/connectors/riot-gen/src/main/java/com/redis/riot/gen/FakerGeneratorOptions.java b/connectors/riot-gen/src/main/java/com/redis/riot/gen/FakerGeneratorOptions.java index 511222f53..6313b14cf 100644 --- a/connectors/riot-gen/src/main/java/com/redis/riot/gen/FakerGeneratorOptions.java +++ b/connectors/riot-gen/src/main/java/com/redis/riot/gen/FakerGeneratorOptions.java @@ -14,11 +14,11 @@ public class FakerGeneratorOptions extends GeneratorOptions { @Parameters(arity = "0..*", description = "SpEL expressions in the form field1=\"exp\" field2=\"exp\"...", paramLabel = "SPEL") private Map fields = new LinkedHashMap<>(); - @Option(names = "--infer", description = "Introspect given RediSearch index to infer Faker fields", paramLabel = "") + @Option(names = "--infer", description = "Introspect given RediSearch index to infer Faker fields.", paramLabel = "") private Optional redisearchIndex = Optional.empty(); - @Option(names = "--locale", description = "Faker locale (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--locale", description = "Faker locale (default: ${DEFAULT-VALUE}).", paramLabel = "") private Locale locale = Locale.ENGLISH; - @Option(names = "--metadata", description = "Include metadata (index, partition)") + @Option(names = "--metadata", description = "Include metadata (index, partition).") private boolean includeMetadata; public Map getFields() { diff --git a/connectors/riot-gen/src/test/java/com/redis/riot/gen/FakerReaderTests.java b/connectors/riot-gen/src/test/java/com/redis/riot/gen/FakerReaderTests.java index 05eb6622e..92f401c67 100644 --- a/connectors/riot-gen/src/test/java/com/redis/riot/gen/FakerReaderTests.java +++ b/connectors/riot-gen/src/test/java/com/redis/riot/gen/FakerReaderTests.java @@ -22,7 +22,7 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; -import com.redis.spring.batch.support.JobRunner; +import com.redis.spring.batch.common.JobRunner; @SpringBootTest(classes = FakerReaderTestApplication.class) @RunWith(SpringRunner.class) diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractRedisCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractRedisCommand.java index 772e161e7..8dbd71858 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractRedisCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractRedisCommand.java @@ -10,9 +10,7 @@ import com.redis.lettucemod.api.sync.RedisModulesCommands; import com.redis.riot.AbstractJobCommand; import com.redis.riot.JobCommandContext; -import com.redis.riot.RedisOptions; -import io.lettuce.core.AbstractRedisClient; import picocli.CommandLine.Command; @Command @@ -21,21 +19,21 @@ public abstract class AbstractRedisCommand extends AbstractJobCommand { @Override protected Job job(JobCommandContext context) throws Exception { String name = name(); - RedisCommandTasklet tasklet = new RedisCommandTasklet(context.getRedisClient()); + RedisCommandTasklet tasklet = new RedisCommandTasklet(context); return job(context, name, tasklet).build(); } private class RedisCommandTasklet implements Tasklet { - private final AbstractRedisClient redisClient; + private final JobCommandContext context; - public RedisCommandTasklet(AbstractRedisClient redisClient) { - this.redisClient = redisClient; + public RedisCommandTasklet(JobCommandContext context) { + this.context = context; } @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { - try (StatefulRedisModulesConnection connection = RedisOptions.connect(redisClient)) { + try (StatefulRedisModulesConnection connection = context.connection()) { AbstractRedisCommand.this.execute(connection.sync()); return RepeatStatus.FINISHED; } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractReplicateCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractReplicateCommand.java index 96a3298e9..c99be7f7f 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractReplicateCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractReplicateCommand.java @@ -12,44 +12,46 @@ import org.springframework.batch.core.job.builder.JobFlowBuilder; import org.springframework.batch.core.job.builder.SimpleJobBuilder; import org.springframework.batch.core.job.flow.support.SimpleFlow; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemProcessor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; -import org.springframework.expression.common.TemplateParserContext; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import com.redis.riot.FlushingTransferOptions; import com.redis.riot.JobCommandContext; -import com.redis.riot.KeyValueProcessorOptions; import com.redis.riot.ProgressMonitor; import com.redis.riot.RedisWriterOptions; import com.redis.riot.processor.CompositeItemStreamItemProcessor; -import com.redis.riot.processor.KeyValueKeyProcessor; -import com.redis.riot.processor.KeyValueTTLProcessor; -import com.redis.spring.batch.KeyValue; +import com.redis.riot.processor.KeyValueProcessor; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.RedisItemWriter; -import com.redis.spring.batch.RedisScanSizeEstimator; +import com.redis.spring.batch.common.KeyValue; +import com.redis.spring.batch.reader.LiveReaderOptions; +import com.redis.spring.batch.reader.LiveRedisItemReader; +import com.redis.spring.batch.reader.QueueOptions; +import com.redis.spring.batch.reader.ScanReaderOptions; +import com.redis.spring.batch.reader.ScanSizeEstimator; +import com.redis.spring.batch.step.FlushingSimpleStepBuilder; +import com.redis.spring.batch.writer.WriterOptions; import com.redis.spring.batch.writer.operation.Noop; import io.lettuce.core.codec.ByteArrayCodec; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Mixin; -public abstract class AbstractReplicateCommand> extends AbstractTargetCommand { +public abstract class AbstractReplicateCommand> extends AbstractTargetCommand { private static final Logger log = Logger.getLogger(AbstractReplicateCommand.class.getName()); @Mixin private FlushingTransferOptions flushingTransferOptions = new FlushingTransferOptions(); + @Mixin private ReplicationOptions replicationOptions = new ReplicationOptions(); - @Mixin - private KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions(); + @ArgGroup(exclusive = false, heading = "Writer options%n") private RedisWriterOptions writerOptions = new RedisWriterOptions(); @@ -61,29 +63,24 @@ public ReplicationOptions getReplicationOptions() { return replicationOptions; } - public KeyValueProcessorOptions getProcessorOptions() { - return processorOptions; - } - public RedisWriterOptions getWriterOptions() { return writerOptions; } @Override - protected Job job(JobCommandContext context) { + protected Job job(JobCommandContext jobCommandContext) { + TargetCommandContext context = (TargetCommandContext) jobCommandContext; switch (replicationOptions.getMode()) { case LIVE: - SimpleFlow liveFlow = new FlowBuilder("live-replication-live-flow") - .start(liveReplicationStep(context)).build(); - SimpleFlow scanFlow = new FlowBuilder("live-replication-scan-flow").start(scanStep(context)) - .build(); - SimpleFlow replicationFlow = new FlowBuilder("live-replication-flow") + SimpleFlow liveFlow = new FlowBuilder("live-flow").start(liveStep(context)).build(); + SimpleFlow scanFlow = new FlowBuilder("scan-flow").start(scanStep(context)).build(); + SimpleFlow replicationFlow = new FlowBuilder("replication-flow") .split(new SimpleAsyncTaskExecutor()).add(liveFlow, scanFlow).build(); JobFlowBuilder liveJob = context.job("live-replication").start(replicationFlow); optionalVerificationStep(context).ifPresent(liveJob::next); return liveJob.build().build(); case LIVEONLY: - return job(context, "liveonly-replication", this::liveReplicationStep); + return job(context, "liveonly-replication", this::liveStep); case SNAPSHOT: return job(context, "snapshot-replication", this::scanStep); default: @@ -91,18 +88,18 @@ protected Job job(JobCommandContext context) { } } - private Job job(JobCommandContext context, String name, Function step) { + private Job job(TargetCommandContext context, String name, Function step) { SimpleJobBuilder job = context.job(name).start(step.apply(context)); optionalVerificationStep(context).ifPresent(job::next); return job.build(); } - protected Optional optionalVerificationStep(JobCommandContext context) { + protected Optional optionalVerificationStep(TargetCommandContext context) { if (replicationOptions.isVerify()) { if (writerOptions.isDryRun()) { return Optional.empty(); } - if (processorOptions.getKeyProcessor().isPresent()) { + if (replicationOptions.getKeyProcessor().isPresent()) { // Verification cannot be done if a processor is set log.warning("Key processor enabled, verification will be skipped"); return Optional.empty(); @@ -112,53 +109,59 @@ protected Optional optionalVerificationStep(JobCommandContext context) { return Optional.empty(); } - private TaskletStep scanStep(JobCommandContext context) { - RedisItemReader reader = reader(context, "scan-reader").build(); - RedisItemWriter writer = createWriter(context).build(); - RedisScanSizeEstimator estimator = estimator(context).build(); + private TaskletStep scanStep(TargetCommandContext context) { + RedisItemReader reader = scanReader(context, readerOptions.readerOptions()); + reader.setName("scan-reader"); + RedisItemWriter writer = checkWriter(context, writerOptions.writerOptions()); + ScanSizeEstimator estimator = estimator(context); ProgressMonitor monitor = progressMonitor().task("Scanning").initialMax(estimator::execute).build(); return step(step(context, "snapshot-replication", reader, processor(context), writer), monitor).build(); } - private TaskletStep liveReplicationStep(JobCommandContext context) { - RedisItemReader reader = flushingTransferOptions.configure(reader(context, "redis-live-reader") - .live().notificationQueueCapacity(replicationOptions.getNotificationQueueCapacity()) - .database(context.getRedisOptions().uri().getDatabase())).build(); - RedisItemWriter writer = createWriter(context).build(); - SimpleStepBuilder step = flushingTransferOptions - .configure(step(context, "live-replication", reader, processor(context), writer)); + private TaskletStep liveStep(TargetCommandContext context) { + LiveRedisItemReader reader = liveReader(context, readerOptions.getMatch(), liveReaderOptions()); + reader.setName("live-reader"); + RedisItemWriter writer = checkWriter(context, writerOptions.writerOptions()); + FlushingSimpleStepBuilder step = new FlushingSimpleStepBuilder<>( + step(context, "live-replication", reader, processor(context), writer)) + .options(flushingTransferOptions.flushingOptions()); ProgressMonitor monitor = progressMonitor().task("Listening").build(); return step(step, monitor).build(); } - private RedisItemWriter.Builder createWriter(JobCommandContext context) { + private LiveReaderOptions liveReaderOptions() { + return LiveReaderOptions.builder(readerOptions.readerOptions()) + .flushingOptions(flushingTransferOptions.flushingOptions()) + .notificationQueueOptions( + QueueOptions.builder().capacity(replicationOptions.getNotificationQueueCapacity()).build()) + .build(); + } + + private RedisItemWriter checkWriter(TargetCommandContext context, WriterOptions options) { if (writerOptions.isDryRun()) { - return RedisItemWriter.operation(((TargetCommandContext) context).getTargetRedisClient(), - ByteArrayCodec.INSTANCE, new Noop<>()); + return RedisItemWriter.operation(context.targetPool(ByteArrayCodec.INSTANCE), new Noop()) + .options(options).build(); } - return writerOptions.configure(writer((TargetCommandContext) context)); + return writer(context, options); } - private RedisItemReader.Builder reader(JobCommandContext context, String name) { - return readerOptions.configure(reader(context)).name(name); - } + protected abstract RedisItemReader scanReader(JobCommandContext context, ScanReaderOptions options); - protected abstract RedisItemWriter.Builder writer(TargetCommandContext context); + protected abstract LiveRedisItemReader liveReader(JobCommandContext context, String keyPattern, + LiveReaderOptions options); - protected abstract RedisItemReader.Builder reader(JobCommandContext context); + protected abstract RedisItemWriter writer(TargetCommandContext context, WriterOptions options); - private ItemProcessor processor(JobCommandContext context) { + private ItemProcessor processor(TargetCommandContext context) { SpelExpressionParser parser = new SpelExpressionParser(); - List, ? extends KeyValue>> processors = new ArrayList<>(); - processorOptions.getKeyProcessor().ifPresent(p -> { + List> processors = new ArrayList<>(); + replicationOptions.getKeyProcessor().ifPresent(p -> { EvaluationContext evaluationContext = new StandardEvaluationContext(); - evaluationContext.setVariable("src", context.getRedisOptions().uri()); - evaluationContext.setVariable("dest", ((TargetCommandContext) context).getTargetRedisOptions().uri()); - Expression expression = parser.parseExpression(p, new TemplateParserContext()); - processors.add(new KeyValueKeyProcessor<>(expression, evaluationContext)); + evaluationContext.setVariable("src", context.getRedisURI()); + evaluationContext.setVariable("dest", context.getTargetRedisURI()); + Expression expression = parser.parseExpression(p); + processors.add(new KeyValueProcessor<>(expression, evaluationContext)); }); - processorOptions.getTtlProcessor().ifPresent(p -> processors - .add(new KeyValueTTLProcessor<>(parser.parseExpression(p), new StandardEvaluationContext()))); return CompositeItemStreamItemProcessor.delegates(processors.toArray(ItemProcessor[]::new)); } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractTargetCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractTargetCommand.java index 78e5ab2df..ff694b5c4 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractTargetCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractTargetCommand.java @@ -1,9 +1,11 @@ package com.redis.riot.redis; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.listener.StepExecutionListenerSupport; @@ -15,13 +17,14 @@ import com.redis.riot.ProgressMonitor.Style; import com.redis.riot.RedisOptions; import com.redis.riot.RedisReaderOptions; -import com.redis.spring.batch.DataStructure; import com.redis.spring.batch.RedisItemReader; -import com.redis.spring.batch.RedisScanSizeEstimator; -import com.redis.spring.batch.compare.KeyComparisonItemWriter; -import com.redis.spring.batch.compare.KeyComparisonLogger; -import com.redis.spring.batch.compare.KeyComparisonResults; -import com.redis.spring.batch.support.JobRunner; +import com.redis.spring.batch.common.JobRunner; +import com.redis.spring.batch.reader.KeyComparison; +import com.redis.spring.batch.reader.KeyComparison.Status; +import com.redis.spring.batch.reader.ScanSizeEstimator; +import com.redis.spring.batch.writer.KeyComparisonCountItemWriter; +import com.redis.spring.batch.writer.KeyComparisonCountItemWriter.Results; +import com.redis.spring.batch.writer.KeyComparisonLogger; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Mixin; @@ -36,8 +39,10 @@ public abstract class AbstractTargetCommand extends AbstractTransferCommand { @ArgGroup(exclusive = false, heading = "Target Redis connection options%n") private RedisOptions targetRedisOptions = new RedisOptions(); + @ArgGroup(exclusive = false, heading = "Reader options%n") protected RedisReaderOptions readerOptions = new RedisReaderOptions(); + @Mixin private CompareOptions compareOptions = new CompareOptions(); @@ -58,60 +63,87 @@ protected JobCommandContext context(JobRunner jobRunner, RedisOptions redisOptio return new TargetCommandContext(jobRunner, redisOptions, targetRedisOptions); } - @Override - protected RedisScanSizeEstimator.Builder estimator(JobCommandContext context) { - return super.estimator(context).match(readerOptions.getScanMatch()).sampleSize(readerOptions.getSampleSize()) - .type(readerOptions.getScanType()); + protected ScanSizeEstimator estimator(JobCommandContext context) { + return ScanSizeEstimator.builder(context.pool()).options(readerOptions.estimatorOptions()).build(); } - protected Step verificationStep(JobCommandContext context) { - RedisItemReader> sourceReader = readerOptions - .configure(RedisItemReader.dataStructure(context.getRedisClient())).jobRunner(context.getJobRunner()) - .build(); - RedisItemReader> targetReader = readerOptions - .configure(RedisItemReader.dataStructure(((TargetCommandContext) context).getTargetRedisClient())) - .build(); + private static class KeyComparisonWriteListener implements ItemWriteListener> { + + private final KeyComparisonLogger logger; + + public KeyComparisonWriteListener(KeyComparisonLogger logger) { + this.logger = logger; + } + + @Override + public void onWriteError(Exception exception, List> items) { + // do nothing + } + + @Override + public void beforeWrite(List> items) { + // do nothing + } + + @Override + public void afterWrite(List> items) { + items.forEach(logger::log); + } + } + + private class KeyComparisonStepListener extends StepExecutionListenerSupport { + + private final KeyComparisonCountItemWriter writer; + + public KeyComparisonStepListener(KeyComparisonCountItemWriter writer) { + this.writer = writer; + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + if (stepExecution.getStatus().isUnsuccessful()) { + return null; + } + Results results = writer.getResults(); + if (results.getTotalCount() == results.getCount(Status.OK)) { + log.info("Verification completed - all OK"); + return ExitStatus.COMPLETED; + } + try { + Thread.sleep(progressOptions.getUpdateIntervalMillis()); + } catch (InterruptedException e) { + log.fine("Verification interrupted"); + Thread.currentThread().interrupt(); + return ExitStatus.STOPPED; + } + log.log(Level.WARNING, "Verification failed: OK={0} Missing={1} Values={2} TTLs={3} Types={4}", + new Object[] { results.getCount(Status.OK), results.getCount(Status.MISSING), + results.getCount(Status.VALUE), results.getCount(Status.TTL), + results.getCount(Status.TYPE) }); + return new ExitStatus(ExitStatus.FAILED.getExitCode(), "Verification failed"); + } + } + + protected Step verificationStep(TargetCommandContext context) { log.log(Level.FINE, "Creating key comparator with TTL tolerance of {0} seconds", compareOptions.getTtlTolerance()); - KeyComparisonItemWriter writer = KeyComparisonItemWriter.valueReader(targetReader.getValueReader()) - .tolerance(compareOptions.getTtlToleranceDuration()).build(); + RedisItemReader> reader = RedisItemReader.comparator(context.getJobRunner(), + context.pool(), context.targetPool(), compareOptions.getTtlToleranceDuration()).build(); + KeyComparisonCountItemWriter writer = new KeyComparisonCountItemWriter<>(); + SimpleStepBuilder, KeyComparison> step = step(context, VERIFICATION_NAME, reader, + null, writer); if (compareOptions.isShowDiffs()) { - writer.addListener(new KeyComparisonLogger(java.util.logging.Logger.getLogger(getClass().getName()))); + step.listener(new KeyComparisonWriteListener(new KeyComparisonLogger(log))); } - SimpleStepBuilder, DataStructure> step = step(context, VERIFICATION_NAME, - sourceReader, null, writer); - step.listener(new StepExecutionListenerSupport() { - @Override - public ExitStatus afterStep(StepExecution stepExecution) { - if (stepExecution.getStatus().isUnsuccessful()) { - return null; - } - if (writer.getResults().isOK()) { - log.info("Verification completed - all OK"); - return ExitStatus.COMPLETED; - } - try { - Thread.sleep(progressOptions.getUpdateIntervalMillis()); - } catch (InterruptedException e) { - log.fine("Verification interrupted"); - Thread.currentThread().interrupt(); - return ExitStatus.STOPPED; - } - KeyComparisonResults results = writer.getResults(); - log.log(Level.WARNING, "Verification failed: OK={0} Missing={1} Values={2} TTLs={3} Types={4}", - new Object[] { results.getOK(), results.getMissing(), results.getValue(), results.getTTL(), - results.getType() }); - return new ExitStatus(ExitStatus.FAILED.getExitCode(), "Verification failed"); - } - }); - ProgressMonitor monitor = progressMonitor().task("Verifying").initialMax(estimator(context).build()::execute) + step.listener(new KeyComparisonStepListener(writer)); + ProgressMonitor monitor = progressMonitor().task("Verifying").initialMax(estimator(context)::execute) .extraMessage(() -> extraMessage(writer.getResults())).build(); return step(step, monitor).build(); } - private String extraMessage(KeyComparisonResults results) { - return String.format(extraMessageFormat(), results.getMissing(), results.getType(), results.getValue(), - results.getTTL()); + private String extraMessage(Results results) { + return String.format(extraMessageFormat(), results.getCount(Status.MISSING), results.getCount(Status.TYPE), + results.getCount(Status.VALUE), results.getCount(Status.TTL)); } private String extraMessageFormat() { diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/CompareOptions.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/CompareOptions.java index 8a96e042e..caad33707 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/CompareOptions.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/CompareOptions.java @@ -10,7 +10,7 @@ public class CompareOptions { @Option(names = "--ttl-tolerance", description = "Max TTL difference to use for dataset verification (default: ${DEFAULT-VALUE}).", paramLabel = "") private long ttlTolerance = DEFAULT_TTL_TOLERANCE_IN_SECONDS; - @Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification") + @Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification.") private boolean showDiffs; public Duration getTtlToleranceDuration() { diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/LatencyCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/LatencyCommand.java index cb8981832..91ec2bdaf 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/LatencyCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/LatencyCommand.java @@ -21,13 +21,13 @@ public class LatencyCommand extends AbstractRedisCommand { private static final Logger log = Logger.getLogger(LatencyCommand.class.getName()); - @Option(names = "--iterations", description = "Number of latency tests (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--iterations", description = "Number of latency tests (default: ${DEFAULT-VALUE}).", paramLabel = "") private int iterations = 1000; - @Option(names = "--sleep", description = "Sleep duration between calls (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--sleep", description = "Sleep duration between calls (default: ${DEFAULT-VALUE}).", paramLabel = "") private long sleep = 1; - @Option(names = "--unit", description = "Latency unit (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--unit", description = "Latency unit (default: ${DEFAULT-VALUE}).", paramLabel = "") private TimeUnit unit = TimeUnit.MILLISECONDS; - @Option(names = "--show-distribution", description = "Show latency distribution") + @Option(names = "--show-distribution", description = "Show latency distribution.") private boolean showDistribution = false; @Override diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateCommand.java index 2f7966768..37b2ebebc 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateCommand.java @@ -1,24 +1,39 @@ package com.redis.riot.redis; import com.redis.riot.JobCommandContext; -import com.redis.spring.batch.KeyValue; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.common.KeyDump; +import com.redis.spring.batch.reader.LiveReaderOptions; +import com.redis.spring.batch.reader.LiveRedisItemReader; +import com.redis.spring.batch.reader.ScanReaderOptions; +import com.redis.spring.batch.writer.WriterOptions; import io.lettuce.core.codec.ByteArrayCodec; import picocli.CommandLine.Command; @Command(name = "replicate", description = "Replicate a source Redis DB to a target Redis DB using DUMP+RESTORE") -public class ReplicateCommand extends AbstractReplicateCommand> { +public class ReplicateCommand extends AbstractReplicateCommand> { @Override - protected RedisItemWriter.Builder> writer(TargetCommandContext context) { - return RedisItemWriter.keyDump(context.getTargetRedisClient(), ByteArrayCodec.INSTANCE); + protected LiveRedisItemReader> liveReader(JobCommandContext context, String keyPattern, + LiveReaderOptions options) { + return RedisItemReader.liveKeyDump(context.pool(ByteArrayCodec.INSTANCE), context.getJobRunner(), + context.pubSubConnection(ByteArrayCodec.INSTANCE), ByteArrayCodec.INSTANCE, + context.getRedisURI().getDatabase(), keyPattern).options(options).build(); } @Override - protected RedisItemReader.Builder> reader(JobCommandContext context) { - return RedisItemReader.keyDump(context.getRedisClient(), ByteArrayCodec.INSTANCE); + protected RedisItemReader> scanReader(JobCommandContext context, + ScanReaderOptions options) { + return RedisItemReader.keyDump(context.pool(ByteArrayCodec.INSTANCE), context.getJobRunner()).options(options) + .build(); + } + + @Override + protected RedisItemWriter> writer(TargetCommandContext context, + WriterOptions options) { + return RedisItemWriter.keyDump(context.targetPool(ByteArrayCodec.INSTANCE)).options(options).build(); } } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateDSCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateDSCommand.java index 93741f28a..639b7a34f 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateDSCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateDSCommand.java @@ -1,9 +1,13 @@ package com.redis.riot.redis; import com.redis.riot.JobCommandContext; -import com.redis.spring.batch.DataStructure; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.common.DataStructure; +import com.redis.spring.batch.reader.LiveReaderOptions; +import com.redis.spring.batch.reader.LiveRedisItemReader; +import com.redis.spring.batch.reader.ScanReaderOptions; +import com.redis.spring.batch.writer.WriterOptions; import com.redis.spring.batch.writer.operation.Xadd; import io.lettuce.core.codec.ByteArrayCodec; @@ -13,13 +17,25 @@ public class ReplicateDSCommand extends AbstractReplicateCommand> { @Override - protected RedisItemWriter.Builder> writer(TargetCommandContext context) { - return RedisItemWriter.dataStructure(context.getTargetRedisClient(), ByteArrayCodec.INSTANCE, Xadd.identity()); + protected LiveRedisItemReader> liveReader(JobCommandContext context, + String keyPattern, LiveReaderOptions options) { + return RedisItemReader.liveDataStructure(context.pool(ByteArrayCodec.INSTANCE), context.getJobRunner(), + context.pubSubConnection(ByteArrayCodec.INSTANCE), ByteArrayCodec.INSTANCE, + context.getRedisURI().getDatabase(), keyPattern).options(options).build(); } @Override - protected RedisItemReader.Builder> reader(JobCommandContext context) { - return RedisItemReader.dataStructure(context.getRedisClient(), ByteArrayCodec.INSTANCE); + protected RedisItemReader> scanReader(JobCommandContext context, + ScanReaderOptions options) { + return RedisItemReader.dataStructure(context.pool(ByteArrayCodec.INSTANCE), context.getJobRunner()) + .options(options).build(); + } + + @Override + protected RedisItemWriter> writer(TargetCommandContext context, + WriterOptions options) { + return RedisItemWriter.dataStructure(context.targetPool(ByteArrayCodec.INSTANCE), Xadd.identity()) + .options(options).build(); } } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicationOptions.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicationOptions.java index 832035d19..3d7136591 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicationOptions.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicationOptions.java @@ -1,6 +1,8 @@ package com.redis.riot.redis; -import com.redis.spring.batch.reader.AbstractKeyspaceNotificationItemReader; +import java.util.Optional; + +import com.redis.spring.batch.reader.QueueOptions; import picocli.CommandLine.Option; @@ -10,13 +12,18 @@ public enum ReplicationMode { SNAPSHOT, LIVE, LIVEONLY } - @Option(names = "--mode", description = "Replication mode: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "") + @Option(names = "--mode", description = "Replication mode: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") private ReplicationMode mode = ReplicationMode.SNAPSHOT; - @Option(names = "--event-queue", description = "Capacity of the keyspace notification event queue (default: ${DEFAULT-VALUE})", paramLabel = "") - private int notificationQueueCapacity = AbstractKeyspaceNotificationItemReader.DEFAULT_QUEUE_CAPACITY; - @Option(names = "--no-verify", description = "Verify target against source dataset after replication. True by default", negatable = true) + + @Option(names = "--event-queue", description = "Capacity of the keyspace notification event queue (default: ${DEFAULT-VALUE}).", paramLabel = "") + private int notificationQueueCapacity = QueueOptions.DEFAULT_CAPACITY; + + @Option(names = "--no-verify", description = "Verify target against source dataset after replication (default: true).", negatable = true) private boolean verify = true; + @Option(names = "--key-process", description = "SpEL expression to transform each key.", paramLabel = "") + private Optional keyProcessor = Optional.empty(); + public ReplicationMode getMode() { return mode; } @@ -41,4 +48,12 @@ public void setVerify(boolean verify) { this.verify = verify; } + public Optional getKeyProcessor() { + return keyProcessor; + } + + public void setKeyProcessor(Optional keyProcessor) { + this.keyProcessor = keyProcessor; + } + } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/TargetCommandContext.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/TargetCommandContext.java index 1c7d639db..da56b76df 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/TargetCommandContext.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/TargetCommandContext.java @@ -1,20 +1,37 @@ package com.redis.riot.redis; +import org.apache.commons.pool2.impl.GenericObjectPool; + +import com.redis.lettucemod.util.RedisClientBuilder; import com.redis.riot.JobCommandContext; import com.redis.riot.RedisOptions; -import com.redis.spring.batch.support.JobRunner; +import com.redis.spring.batch.common.JobRunner; import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.codec.StringCodec; public class TargetCommandContext extends JobCommandContext { private final RedisOptions targetRedisOptions; private final AbstractRedisClient targetRedisClient; + private final RedisURI targetRedisURI; public TargetCommandContext(JobRunner jobRunner, RedisOptions redisOptions, RedisOptions targetRedisOptions) { super(jobRunner, redisOptions); this.targetRedisOptions = targetRedisOptions; - this.targetRedisClient = targetRedisOptions.client(); + RedisClientBuilder clientBuilder = RedisClientBuilder.create(targetRedisOptions.redisClientOptions()); + this.targetRedisURI = clientBuilder.uri(); + this.targetRedisClient = clientBuilder.client(); + } + + @Override + public void close() throws Exception { + targetRedisClient.shutdown(); + targetRedisClient.getResources().shutdown(); + super.close(); } public RedisOptions getTargetRedisOptions() { @@ -25,4 +42,16 @@ public AbstractRedisClient getTargetRedisClient() { return targetRedisClient; } + public RedisURI getTargetRedisURI() { + return targetRedisURI; + } + + public GenericObjectPool> targetPool() { + return targetPool(StringCodec.UTF8); + } + + public GenericObjectPool> targetPool(RedisCodec codec) { + return pool(targetRedisClient, codec, targetRedisOptions); + } + } diff --git a/connectors/riot-redis/src/test/java/com/redis/riot/redis/RedisIntegrationTests.java b/connectors/riot-redis/src/test/java/com/redis/riot/redis/RedisIntegrationTests.java index 832bedb39..49f9edeb6 100644 --- a/connectors/riot-redis/src/test/java/com/redis/riot/redis/RedisIntegrationTests.java +++ b/connectors/riot-redis/src/test/java/com/redis/riot/redis/RedisIntegrationTests.java @@ -30,7 +30,7 @@ class RedisIntegrationTests extends AbstractRiotIntegrationTests { private final Logger log = LoggerFactory.getLogger(getClass()); - private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(10); + private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(3); private final RedisContainer targetRedis = new RedisContainer( RedisContainer.DEFAULT_IMAGE_NAME.withTag(RedisContainer.DEFAULT_TAG)); @@ -124,7 +124,7 @@ private void runLiveReplication(String filename, RedisTestContext source) throws ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.schedule(() -> { DataStructureGeneratorItemReader reader = DataStructureGeneratorItemReader.builder().currentItemCount(3000) - .maxItemCount(2000).build(); + .maxItemCount(5000).build(); try { generate(1, reader, source); } catch (Exception e) { diff --git a/connectors/riot-redis/src/test/resources/replicate-key-processor b/connectors/riot-redis/src/test/resources/replicate-key-processor index 1941a563a..b3ad7e120 100644 --- a/connectors/riot-redis/src/test/resources/replicate-key-processor +++ b/connectors/riot-redis/src/test/resources/replicate-key-processor @@ -1 +1 @@ -riot-redis -h source -p 6379 replicate -h target -p 6380 --batch 10 --key-process="#{#src.database}:#{key}" \ No newline at end of file +riot-redis -h source -p 6379 replicate -h target -p 6380 --batch 10 --key-process="#src.database+':'+key" \ No newline at end of file diff --git a/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamExportCommand.java b/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamExportCommand.java index 585002903..20b88b901 100644 --- a/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamExportCommand.java +++ b/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamExportCommand.java @@ -10,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.batch.core.Job; import org.springframework.batch.core.job.builder.SimpleJobBuilder; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemProcessor; import org.springframework.core.convert.converter.Converter; @@ -27,7 +26,10 @@ import com.redis.riot.stream.processor.JsonProducerProcessor; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.reader.StreamItemReader; +import com.redis.spring.batch.reader.StreamReaderOptions; +import com.redis.spring.batch.step.FlushingSimpleStepBuilder; +import io.lettuce.core.Consumer; import io.lettuce.core.StreamMessage; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; @@ -40,14 +42,19 @@ public class StreamExportCommand extends AbstractTransferCommand { private static final Logger log = Logger.getLogger(StreamExportCommand.class.getName()); private static final String NAME = "stream-export"; + @Mixin private FlushingTransferOptions flushingTransferOptions = new FlushingTransferOptions(); + @Parameters(arity = "0..*", description = "One ore more streams to read from", paramLabel = "STREAM") private List streams; + @Mixin private KafkaOptions options = new KafkaOptions(); + @Option(names = "--offset", description = "XREAD offset (default: ${DEFAULT-VALUE})", paramLabel = "") private String offset = "0-0"; + @Option(names = "--topic", description = "Target topic key (default: same as stream)", paramLabel = "") private Optional topic = Optional.empty(); @@ -100,9 +107,12 @@ protected Job job(JobCommandContext context) throws Exception { private TaskletStep step(JobCommandContext context, String stream) { String name = stream + "-" + NAME; - StreamItemReader reader = RedisItemReader.stream(context.getRedisClient(), stream).build(); - SimpleStepBuilder, ProducerRecord> step = flushingTransferOptions - .configure(step(context, name, reader, processor(), writer())); + StreamItemReader reader = RedisItemReader + .stream(context.pool(), stream, Consumer.from(options.getGroupId(), "consumer1")) + .options(StreamReaderOptions.builder().offset(offset).build()).build(); + FlushingSimpleStepBuilder, ProducerRecord> step = new FlushingSimpleStepBuilder<>( + step(context, name, reader, processor(), writer())); + step.options(flushingTransferOptions.flushingOptions()); return step(step, progressMonitor().task("Exporting from " + stream).build()).build(); } diff --git a/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamImportCommand.java b/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamImportCommand.java index 12b9162b5..0bffb4954 100644 --- a/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamImportCommand.java +++ b/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamImportCommand.java @@ -11,7 +11,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.batch.core.Job; import org.springframework.batch.core.job.builder.SimpleJobBuilder; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemWriter; import org.springframework.core.convert.converter.Converter; @@ -27,6 +26,7 @@ import com.redis.riot.stream.kafka.KafkaItemReader; import com.redis.riot.stream.kafka.KafkaItemReaderBuilder; import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.step.FlushingSimpleStepBuilder; import com.redis.spring.batch.writer.operation.Xadd; import com.redis.spring.batch.writer.operation.Xadd.Builder; @@ -133,8 +133,9 @@ private TaskletStep topicImportStep(JobCommandContext context, String topic) { String name = topic + "-" + NAME; KafkaItemReader reader = new KafkaItemReaderBuilder().partitions(0) .consumerProperties(consumerProperties).partitions(0).name(topic).saveState(false).topic(topic).build(); - SimpleStepBuilder, ConsumerRecord> step = flushingTransferOptions - .configure(step(context, name, reader, null, writer(context))); + FlushingSimpleStepBuilder, ConsumerRecord> step = new FlushingSimpleStepBuilder<>( + step(context, name, reader, null, writer(context))); + step.options(flushingTransferOptions.flushingOptions()); ProgressMonitor monitor = progressMonitor().task("Importing from " + topic).build(); return step(step, monitor).build(); } @@ -142,7 +143,7 @@ private TaskletStep topicImportStep(JobCommandContext context, String topic) { private ItemWriter> writer(JobCommandContext context) { Builder> xadd = Xadd.key(keyConverter()).body(bodyConverter()); xAddArgs().ifPresent(xadd::args); - return writerOptions.configure(RedisItemWriter.operation(context.getRedisClient(), xadd.build())).build(); + return RedisItemWriter.operation(context.pool(), xadd.build()).options(writerOptions.writerOptions()).build(); } private Converter, Map> bodyConverter() { diff --git a/core/riot-core/src/main/java/com/redis/riot/AbstractExportCommand.java b/core/riot-core/src/main/java/com/redis/riot/AbstractExportCommand.java index 83dd07012..e224ce94c 100644 --- a/core/riot-core/src/main/java/com/redis/riot/AbstractExportCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/AbstractExportCommand.java @@ -1,25 +1,30 @@ package com.redis.riot; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.springframework.batch.core.Job; import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import com.redis.spring.batch.DataStructure; import com.redis.spring.batch.RedisItemReader; +import com.redis.spring.batch.common.DataStructure; +import com.redis.spring.batch.reader.ScanSizeEstimator; +import io.lettuce.core.api.StatefulConnection; import picocli.CommandLine.ArgGroup; public abstract class AbstractExportCommand extends AbstractTransferCommand { @ArgGroup(exclusive = false, heading = "Reader options%n") - private RedisReaderOptions redisReaderOptions = new RedisReaderOptions(); + private RedisReaderOptions readerArgs = new RedisReaderOptions(); protected RedisItemReader> reader(JobCommandContext context) { - return redisReaderOptions.configure(RedisItemReader.dataStructure(context.getRedisClient())).build(); + GenericObjectPool> pool = context.pool(); + return RedisItemReader.dataStructure(pool, context.getJobRunner()).options(readerArgs.readerOptions()).build(); } protected Job job(JobCommandContext context, String name, SimpleStepBuilder step, String task) { - ProgressMonitor monitor = progressMonitor().task(task).initialMax(estimator(context).build()::execute).build(); - return super.job(context, name, step, monitor); + ScanSizeEstimator estimator = ScanSizeEstimator.builder(context.pool()).options(readerArgs.estimatorOptions()) + .build(); + return super.job(context, name, step, progressMonitor().task(task).initialMax(estimator::execute).build()); } } diff --git a/core/riot-core/src/main/java/com/redis/riot/AbstractImportCommand.java b/core/riot-core/src/main/java/com/redis/riot/AbstractImportCommand.java index ac84b501b..22b4d9c59 100644 --- a/core/riot-core/src/main/java/com/redis/riot/AbstractImportCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/AbstractImportCommand.java @@ -1,6 +1,10 @@ package com.redis.riot; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -11,8 +15,17 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.CompositeItemWriter; +import org.springframework.core.convert.converter.Converter; +import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.util.Assert; +import com.redis.lettucemod.util.GeoLocation; +import com.redis.riot.convert.RegexNamedGroupsExtractor; +import com.redis.riot.processor.CompositeItemStreamItemProcessor; +import com.redis.riot.processor.FilteringProcessor; +import com.redis.riot.processor.MapAccessor; +import com.redis.riot.processor.MapProcessor; +import com.redis.riot.processor.SpelProcessor; import com.redis.riot.redis.EvalCommand; import com.redis.riot.redis.ExpireCommand; import com.redis.riot.redis.GeoaddCommand; @@ -28,6 +41,7 @@ import com.redis.riot.redis.XaddCommand; import com.redis.riot.redis.ZaddCommand; import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.writer.Operation; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; @@ -42,7 +56,7 @@ public abstract class AbstractImportCommand extends AbstractTransferCommand { private MapProcessorOptions processorOptions = new MapProcessorOptions(); @ArgGroup(exclusive = false, heading = "Writer options%n") - private RedisWriterOptions writerOptions = new RedisWriterOptions(); + private RedisWriterOptions writerArgs = new RedisWriterOptions(); /** * Initialized manually during command parsing @@ -58,7 +72,33 @@ public void setRedisCommands(List>> redisCo } protected ItemProcessor, Map> processor(JobCommandContext context) { - return processorOptions.processor(context.getRedisClient()); + List, Map>> processors = new ArrayList<>(); + if (processorOptions.hasSpelFields()) { + StandardEvaluationContext evaluationContext = new StandardEvaluationContext(); + evaluationContext.setVariable("date", new SimpleDateFormat(processorOptions.getDateFormat())); + evaluationContext.setVariable("redis", context.connection().sync()); + if (processorOptions.hasVariables()) { + processorOptions.getVariables() + .forEach((k, v) -> evaluationContext.setVariable(k, v.getValue(evaluationContext))); + } + try { + Method geoMethod = GeoLocation.class.getDeclaredMethod("toString", String.class, String.class); + evaluationContext.registerFunction("geo", geoMethod); + } catch (Exception e) { + throw new UnsupportedOperationException("Could not register geo function", e); + } + evaluationContext.setPropertyAccessors(Collections.singletonList(new MapAccessor())); + processors.add(new SpelProcessor(evaluationContext, processorOptions.getSpelFields())); + } + if (processorOptions.hasRegexes()) { + Map>> fields = new LinkedHashMap<>(); + processorOptions.getRegexes().forEach((f, r) -> fields.put(f, RegexNamedGroupsExtractor.of(r))); + processors.add(new MapProcessor(fields)); + } + if (processorOptions.hasFilters()) { + processors.add(new FilteringProcessor(processorOptions.getFilters())); + } + return CompositeItemStreamItemProcessor.delegates(processors.toArray(ItemProcessor[]::new)); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -66,8 +106,7 @@ protected ItemWriter> writer(JobCommandContext context) { Assert.notNull(redisCommands, "RedisCommands not set"); Assert.isTrue(!redisCommands.isEmpty(), "No Redis command specified"); List>> writers = redisCommands.stream().map(OperationCommand::operation) - .map(o -> writerOptions.configure(RedisItemWriter.operation(context.getRedisClient(), o)).build()) - .collect(Collectors.toList()); + .map(o -> writer(context, o)).collect(Collectors.toList()); if (writers.size() == 1) { return writers.get(0); } @@ -76,6 +115,11 @@ protected ItemWriter> writer(JobCommandContext context) { return writer; } + private ItemWriter> writer(JobCommandContext context, + Operation> operation) { + return RedisItemWriter.operation(context.pool(), operation).options(writerArgs.writerOptions()).build(); + } + protected Job job(JobCommandContext context, String name, ItemReader> reader, ProgressMonitor monitor) { return job(context, name, step(context, name, reader), monitor); diff --git a/core/riot-core/src/main/java/com/redis/riot/AbstractJobCommand.java b/core/riot-core/src/main/java/com/redis/riot/AbstractJobCommand.java index e0edfd4bc..ab1557531 100644 --- a/core/riot-core/src/main/java/com/redis/riot/AbstractJobCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/AbstractJobCommand.java @@ -8,7 +8,7 @@ import org.springframework.batch.core.job.builder.SimpleJobBuilder; import org.springframework.batch.core.step.tasklet.Tasklet; -import com.redis.spring.batch.support.JobRunner; +import com.redis.spring.batch.common.JobRunner; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; @@ -33,11 +33,13 @@ public void setApp(RiotApp app) { @Override public Integer call() throws Exception { JobRunner jobRunner = JobRunner.inMemory(); - JobExecution execution = jobRunner.run(job(context(jobRunner, app.getRedisOptions()))); - if (execution.getStatus().isUnsuccessful()) { - return 1; + try (JobCommandContext context = context(jobRunner, app.getRedisOptions())) { + JobExecution execution = jobRunner.run(job(context)); + if (execution.getStatus().isUnsuccessful()) { + return 1; + } + return 0; } - return 0; } protected JobCommandContext context(JobRunner jobRunner, RedisOptions redisOptions) { diff --git a/core/riot-core/src/main/java/com/redis/riot/AbstractTransferCommand.java b/core/riot-core/src/main/java/com/redis/riot/AbstractTransferCommand.java index 0d647ed57..a70435cfa 100644 --- a/core/riot-core/src/main/java/com/redis/riot/AbstractTransferCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/AbstractTransferCommand.java @@ -10,9 +10,7 @@ import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; -import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; import org.springframework.batch.core.step.skip.SkipPolicy; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; @@ -22,8 +20,6 @@ import org.springframework.core.task.SyncTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import com.redis.spring.batch.RedisScanSizeEstimator; -import com.redis.spring.batch.RedisScanSizeEstimator.Builder; import com.redis.spring.batch.reader.PollableItemReader; import io.lettuce.core.RedisCommandExecutionException; @@ -37,10 +33,6 @@ public abstract class AbstractTransferCommand extends AbstractJobCommand { @Mixin protected ProgressMonitorOptions progressOptions = new ProgressMonitorOptions(); - protected Builder estimator(JobCommandContext context) { - return RedisScanSizeEstimator.client(context.getRedisClient()); - } - protected SimpleStepBuilder step(JobCommandContext context, String name, ItemReader reader, ItemProcessor processor, ItemWriter writer) { return context.step(name).chunk(transferOptions.getChunkSize()).reader(synchronize(reader)) @@ -73,7 +65,14 @@ protected FaultTolerantStepBuilder step(SimpleStepBuilder ste step.listener((StepExecutionListener) monitor); step.listener((ItemWriteListener) monitor); } - return step.faultTolerant().skipPolicy(skipPolicy(transferOptions.getSkipPolicy())); + SkipPolicy skipPolicy = transferOptions.getSkipPolicy().getSkipPolicy(); + if (skipPolicy instanceof LimitCheckingItemSkipPolicy) { + LimitCheckingItemSkipPolicy limitSkipPolicy = (LimitCheckingItemSkipPolicy) skipPolicy; + limitSkipPolicy.setSkippableExceptionMap( + Stream.of(RedisCommandExecutionException.class, RedisCommandTimeoutException.class, + TimeoutException.class).collect(Collectors.toMap(t -> t, t -> true))); + } + return step.faultTolerant().skipPolicy(transferOptions.getSkipPolicy().getSkipPolicy()); } private ItemReader synchronize(ItemReader reader) { @@ -96,17 +95,4 @@ protected FaultTolerantStepBuilder faultTolerant(SimpleStepBuilder< return step.faultTolerant(); } - private SkipPolicy skipPolicy(TransferOptions.SkipPolicy policy) { - switch (policy) { - case ALWAYS: - return new AlwaysSkipItemSkipPolicy(); - case NEVER: - return new NeverSkipItemSkipPolicy(); - default: - return new LimitCheckingItemSkipPolicy(transferOptions.getSkipLimit(), - Stream.of(RedisCommandExecutionException.class, RedisCommandTimeoutException.class, - TimeoutException.class).collect(Collectors.toMap(t -> t, t -> true))); - } - } - } diff --git a/core/riot-core/src/main/java/com/redis/riot/DoubleRangeTypeConverter.java b/core/riot-core/src/main/java/com/redis/riot/DoubleRangeTypeConverter.java index 8e3f73bf6..fd8c2ca56 100644 --- a/core/riot-core/src/main/java/com/redis/riot/DoubleRangeTypeConverter.java +++ b/core/riot-core/src/main/java/com/redis/riot/DoubleRangeTypeConverter.java @@ -1,6 +1,6 @@ package com.redis.riot; -import com.redis.spring.batch.support.DoubleRange; +import com.redis.spring.batch.common.DoubleRange; import picocli.CommandLine.ITypeConverter; diff --git a/core/riot-core/src/main/java/com/redis/riot/FlushingTransferOptions.java b/core/riot-core/src/main/java/com/redis/riot/FlushingTransferOptions.java index 864eb8b10..68027c5e2 100644 --- a/core/riot-core/src/main/java/com/redis/riot/FlushingTransferOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/FlushingTransferOptions.java @@ -3,20 +3,17 @@ import java.time.Duration; import java.util.Optional; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; import org.springframework.util.Assert; -import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.reader.LiveRedisItemReader; -import com.redis.spring.batch.step.FlushingSimpleStepBuilder; +import com.redis.spring.batch.step.FlushingOptions; import picocli.CommandLine.Option; public class FlushingTransferOptions { - @Option(names = "--flush-interval", description = "Max duration between flushes (default: ${DEFAULT-VALUE})", paramLabel = "") - private long flushInterval = 50; - @Option(names = "--idle-timeout", description = "Min duration of inactivity to consider transfer complete", paramLabel = "") + @Option(names = "--flush-interval", description = "Max duration between flushes (default: ${DEFAULT-VALUE}).", paramLabel = "") + private long flushInterval = FlushingOptions.DEFAULT_FLUSHING_INTERVAL.toMillis(); + @Option(names = "--idle-timeout", description = "Min duration of inactivity to consider transfer complete.", paramLabel = "") private Optional idleTimeout = Optional.empty(); public void setFlushInterval(Duration flushInterval) { @@ -29,27 +26,14 @@ public void setIdleTimeout(Duration idleTimeoutDuration) { this.idleTimeout = Optional.of(idleTimeoutDuration.toMillis()); } - private Duration getFlushInterval() { - return Duration.ofMillis(flushInterval); - } - - public FlushingSimpleStepBuilder configure(SimpleStepBuilder step) { - FlushingSimpleStepBuilder builder = new FlushingSimpleStepBuilder<>(step) - .flushingInterval(getFlushInterval()); - idleTimeout.ifPresent(t -> builder.idleTimeout(Duration.ofMillis(t))); - return builder; - } - - public > LiveRedisItemReader.Builder configure( - LiveRedisItemReader.Builder reader) { - reader.flushingInterval(getFlushInterval()); - idleTimeout.ifPresent(t -> reader.idleTimeout(Duration.ofMillis(t))); - return reader; - } - @Override public String toString() { return "FlushingTransferOptions [flushInterval=" + flushInterval + ", idleTimeout=" + idleTimeout + "]"; } + public FlushingOptions flushingOptions() { + return FlushingOptions.builder().interval(Duration.ofMillis(flushInterval)) + .timeout(idleTimeout.map(Duration::ofMillis)).build(); + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/HelpOptions.java b/core/riot-core/src/main/java/com/redis/riot/HelpOptions.java index 6f1ebc155..2e45c82da 100644 --- a/core/riot-core/src/main/java/com/redis/riot/HelpOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/HelpOptions.java @@ -4,7 +4,7 @@ public final class HelpOptions { - @Option(names = { "-H", "--help" }, usageHelp = true, description = "Show this help message and exit") + @Option(names = { "-H", "--help" }, usageHelp = true, description = "Show this help message and exit.") private boolean helpRequested; } diff --git a/core/riot-core/src/main/java/com/redis/riot/IntRangeTypeConverter.java b/core/riot-core/src/main/java/com/redis/riot/IntRangeTypeConverter.java index 37c23133f..362cdffb5 100644 --- a/core/riot-core/src/main/java/com/redis/riot/IntRangeTypeConverter.java +++ b/core/riot-core/src/main/java/com/redis/riot/IntRangeTypeConverter.java @@ -1,6 +1,6 @@ package com.redis.riot; -import com.redis.spring.batch.support.IntRange; +import com.redis.spring.batch.common.IntRange; import picocli.CommandLine.ITypeConverter; diff --git a/core/riot-core/src/main/java/com/redis/riot/JobCommandContext.java b/core/riot-core/src/main/java/com/redis/riot/JobCommandContext.java index a6cc87cd8..54c90034e 100644 --- a/core/riot-core/src/main/java/com/redis/riot/JobCommandContext.java +++ b/core/riot-core/src/main/java/com/redis/riot/JobCommandContext.java @@ -1,22 +1,36 @@ package com.redis.riot; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.step.builder.StepBuilder; -import com.redis.spring.batch.support.JobRunner; +import com.redis.lettucemod.RedisModulesClient; +import com.redis.lettucemod.api.StatefulRedisModulesConnection; +import com.redis.lettucemod.cluster.RedisModulesClusterClient; +import com.redis.lettucemod.util.RedisClientBuilder; +import com.redis.spring.batch.common.JobRunner; +import com.redis.spring.batch.common.RedisConnectionPoolBuilder; import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; public class JobCommandContext implements AutoCloseable { private final JobRunner jobRunner; private final RedisOptions redisOptions; private final AbstractRedisClient redisClient; + private final RedisURI redisURI; public JobCommandContext(JobRunner jobRunner, RedisOptions redisOptions) { this.jobRunner = jobRunner; this.redisOptions = redisOptions; - this.redisClient = redisOptions.client(); + RedisClientBuilder builder = RedisClientBuilder.create(redisOptions.redisClientOptions()); + this.redisURI = builder.uri(); + this.redisClient = builder.client(); } public JobRunner getJobRunner() { @@ -31,6 +45,10 @@ public AbstractRedisClient getRedisClient() { return redisClient; } + public RedisURI getRedisURI() { + return redisURI; + } + @Override public void close() throws Exception { redisClient.shutdown(); @@ -45,4 +63,44 @@ public StepBuilder step(String name) { return jobRunner.step(name); } + public StatefulRedisModulesConnection connection() { + return connection(redisClient); + } + + protected StatefulRedisModulesConnection connection(AbstractRedisClient client) { + if (client instanceof RedisModulesClusterClient) { + return ((RedisModulesClusterClient) client).connect(); + } + return ((RedisModulesClient) client).connect(); + } + + public GenericObjectPool> pool() { + return pool(StringCodec.UTF8); + } + + public GenericObjectPool> pool(RedisCodec codec) { + return pool(redisOptions, codec); + } + + protected GenericObjectPool> pool(RedisOptions options, RedisCodec codec) { + return pool(redisClient, codec, options); + } + + protected static GenericObjectPool> pool(AbstractRedisClient client, RedisCodec codec, + RedisOptions options) { + return RedisConnectionPoolBuilder.create(options.poolOptions()).pool(client, codec); + } + + public StatefulRedisPubSubConnection pubSubConnection(RedisCodec codec) { + return pubSubConnection(redisClient, codec); + } + + public StatefulRedisPubSubConnection pubSubConnection(AbstractRedisClient client, + RedisCodec codec) { + if (client instanceof RedisModulesClusterClient) { + return ((RedisModulesClusterClient) client).connectPubSub(codec); + } + return ((RedisModulesClient) client).connectPubSub(codec); + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/KeyValueProcessorOptions.java b/core/riot-core/src/main/java/com/redis/riot/KeyValueProcessorOptions.java deleted file mode 100644 index d1d5ac9e4..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/KeyValueProcessorOptions.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.redis.riot; - -import java.util.Optional; - -import picocli.CommandLine.Option; - -public class KeyValueProcessorOptions { - - @Option(names = "--key-process", description = "SpEL expression to transform each key", paramLabel = "") - private Optional keyProcessor = Optional.empty(); - @Option(names = "--ttl-process", description = "SpEL expression to transform each key TTL", paramLabel = "") - private Optional ttlProcessor = Optional.empty(); - - public Optional getKeyProcessor() { - return keyProcessor; - } - - public Optional getTtlProcessor() { - return ttlProcessor; - } - - @Override - public String toString() { - return "KeyValueProcessorOptions [keyProcessor=" + keyProcessor + ", ttlProcessor=" + ttlProcessor + "]"; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/LoggingOptions.java b/core/riot-core/src/main/java/com/redis/riot/LoggingOptions.java index 9c8bf645e..b6b896df1 100644 --- a/core/riot-core/src/main/java/com/redis/riot/LoggingOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/LoggingOptions.java @@ -37,7 +37,7 @@ public class LoggingOptions { @Option(names = "--stacktrace", description = "Print out the stacktrace for all exceptions.") private boolean stacktrace; - public Level getLogLevel() { + public Level logLevel() { if (debug) { return Level.FINE; } @@ -61,7 +61,27 @@ public void configure() { handler.setLevel(Level.ALL); handler.setFormatter(stacktrace || debug ? new StackTraceOneLineLogFormat() : new OneLineLogFormat()); activeLogger.addHandler(handler); - Logger.getLogger(ROOT_LOGGER).setLevel(getLogLevel()); + Logger.getLogger(ROOT_LOGGER).setLevel(logLevel()); + } + + public void setDebug(boolean debug) { + this.debug = debug; + } + + public void setInfo(boolean info) { + this.info = info; + } + + public void setQuiet(boolean quiet) { + this.quiet = quiet; + } + + public void setWarning(boolean warning) { + this.warning = warning; + } + + public void setStacktrace(boolean stacktrace) { + this.stacktrace = stacktrace; } static class OneLineLogFormat extends Formatter { @@ -109,24 +129,4 @@ private String stackTrace(LogRecord logRecord) { } } - public void setDebug(boolean debug) { - this.debug = debug; - } - - public void setInfo(boolean info) { - this.info = info; - } - - public void setQuiet(boolean quiet) { - this.quiet = quiet; - } - - public void setWarning(boolean warning) { - this.warning = warning; - } - - public void setStacktrace(boolean stacktrace) { - this.stacktrace = stacktrace; - } - } diff --git a/core/riot-core/src/main/java/com/redis/riot/MapProcessorOptions.java b/core/riot-core/src/main/java/com/redis/riot/MapProcessorOptions.java index 48c2e0c6c..d0ab2632e 100644 --- a/core/riot-core/src/main/java/com/redis/riot/MapProcessorOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/MapProcessorOptions.java @@ -1,39 +1,16 @@ package com.redis.riot; -import java.lang.reflect.Method; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.logging.Level; -import java.util.logging.Logger; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.core.convert.converter.Converter; -import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; -import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.util.ObjectUtils; -import com.redis.lettucemod.RedisModulesUtils.GeoLocation; -import com.redis.riot.convert.RegexNamedGroupsExtractor; -import com.redis.riot.processor.CompositeItemStreamItemProcessor; -import com.redis.riot.processor.FilteringProcessor; -import com.redis.riot.processor.MapAccessor; -import com.redis.riot.processor.MapProcessor; -import com.redis.riot.processor.SpelProcessor; - -import io.lettuce.core.AbstractRedisClient; import picocli.CommandLine.Option; public class MapProcessorOptions { - private static final Logger log = Logger.getLogger(MapProcessorOptions.class.getName()); - @Option(arity = "1..*", names = "--process", description = "SpEL expressions in the form field1=\"exp\" field2=\"exp\"...", paramLabel = "") private Map spelFields; @Option(arity = "1..*", names = "--var", description = "Register a variable in the SpEL processor context.", paramLabel = "") @@ -85,46 +62,26 @@ public void setRegexes(Map regexes) { this.regexes = regexes; } - public ItemProcessor, Map> processor(AbstractRedisClient redisClient) { - List, Map>> processors = new ArrayList<>(); - if (!ObjectUtils.isEmpty(spelFields)) { - StandardEvaluationContext context = new StandardEvaluationContext(); - context.setVariable("date", dateFormat); - processors.add(new SpelProcessor(redisClient, context(), spelFields)); - } - if (!ObjectUtils.isEmpty(regexes)) { - Map>> fields = new LinkedHashMap<>(); - regexes.forEach((f, r) -> fields.put(f, RegexNamedGroupsExtractor.of(r))); - processors.add(new MapProcessor(fields)); - } - if (!ObjectUtils.isEmpty(filters)) { - processors.add(new FilteringProcessor(filters)); - } - return CompositeItemStreamItemProcessor.delegates(processors.toArray(ItemProcessor[]::new)); - } - - private EvaluationContext context() { - StandardEvaluationContext context = new StandardEvaluationContext(); - context.setVariable("date", new SimpleDateFormat(dateFormat)); - if (variables != null) { - for (Entry variable : variables.entrySet()) { - context.setVariable(variable.getKey(), variable.getValue().getValue(context)); - } - } - try { - Method geoMethod = GeoLocation.class.getDeclaredMethod("toString", String.class, String.class); - context.registerFunction("geo", geoMethod); - } catch (Exception e) { - log.log(Level.WARNING, "Could not register geo function", e); - } - context.setPropertyAccessors(Collections.singletonList(new MapAccessor())); - return context; - } - @Override public String toString() { return "MapProcessorOptions [spelFields=" + spelFields + ", variables=" + variables + ", dateFormat=" + dateFormat + ", filters=" + Arrays.toString(filters) + ", regexes=" + regexes + "]"; } + public boolean hasRegexes() { + return !ObjectUtils.isEmpty(regexes); + } + + public boolean hasFilters() { + return !ObjectUtils.isEmpty(filters); + } + + public boolean hasVariables() { + return !ObjectUtils.isEmpty(variables); + } + + public boolean hasSpelFields() { + return !ObjectUtils.isEmpty(spelFields); + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/ProgressMonitor.java b/core/riot-core/src/main/java/com/redis/riot/ProgressMonitor.java index 89df4dc0c..5a36421ad 100644 --- a/core/riot-core/src/main/java/com/redis/riot/ProgressMonitor.java +++ b/core/riot-core/src/main/java/com/redis/riot/ProgressMonitor.java @@ -12,7 +12,7 @@ import org.springframework.batch.core.StepExecutionListener; import org.springframework.util.Assert; -import com.redis.spring.batch.support.Utils; +import com.redis.spring.batch.common.Utils; import me.tongfei.progressbar.ProgressBar; import me.tongfei.progressbar.ProgressBarBuilder; diff --git a/core/riot-core/src/main/java/com/redis/riot/ProgressMonitorOptions.java b/core/riot-core/src/main/java/com/redis/riot/ProgressMonitorOptions.java index e092b7547..6adb8a0b8 100644 --- a/core/riot-core/src/main/java/com/redis/riot/ProgressMonitorOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/ProgressMonitorOptions.java @@ -9,9 +9,9 @@ public class ProgressMonitorOptions { - @Option(names = "--progress", description = "Style of progress bar: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "