Skip to content

Commit

Permalink
deps: Upgraded to spring batch redis 3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed Sep 9, 2022
1 parent b618273 commit 1c4c71c
Show file tree
Hide file tree
Showing 75 changed files with 955 additions and 842 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

public class DataSourceOptions {

@Option(names = "--driver", description = "Fully qualified name of the JDBC driver", paramLabel = "<class>")
@Option(names = "--driver", description = "Fully qualified name of the JDBC driver.", paramLabel = "<class>")
private String driver;
@Option(names = "--url", required = true, description = "JDBC URL to connect to the database", paramLabel = "<string>")
@Option(names = "--url", required = true, description = "JDBC URL to connect to the database.", paramLabel = "<string>")
private String url;
@Option(names = "--username", description = "Login username of the database", paramLabel = "<string>")
@Option(names = "--username", description = "Login username of the database.", paramLabel = "<string>")
private String username;
@Option(names = "--password", arity = "0..1", interactive = true, description = "Login password of the database", paramLabel = "<pwd>")
@Option(names = "--password", arity = "0..1", interactive = true, description = "Login password of the database.", paramLabel = "<pwd>")
private String password;

public String getDriver() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import com.redis.riot.AbstractExportCommand;
import com.redis.riot.JobCommandContext;
import com.redis.riot.processor.DataStructureItemProcessor;
import com.redis.spring.batch.DataStructure;
import com.redis.riot.processor.DataStructureToMapProcessor;
import com.redis.spring.batch.common.DataStructure;

import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
Expand Down Expand Up @@ -58,7 +58,7 @@ protected Job job(JobCommandContext context) throws SQLException {
builder.assertUpdates(exportOptions.isAssertUpdates());
JdbcBatchItemWriter<Map<String, Object>> writer = builder.build();
writer.afterPropertiesSet();
ItemProcessor<DataStructure<String>, Map<String, Object>> processor = DataStructureItemProcessor
ItemProcessor<DataStructure<String>, Map<String, Object>> processor = DataStructureToMapProcessor
.of(exportOptions.getKeyRegex());
String task = String.format("Exporting to %s", dbName);
return job(context, NAME, step(context, NAME, reader(context), processor, writer), task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.spring.batch.DataStructure.Type;
import com.redis.spring.batch.common.DataStructure.Type;
import com.redis.spring.batch.reader.DataStructureGeneratorItemReader;
import com.redis.testcontainers.junit.RedisTestContext;
import com.redis.testcontainers.junit.RedisTestContextsSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.redis.riot.ProgressMonitor;
import com.redis.riot.RedisWriterOptions;
import com.redis.riot.file.resource.XmlItemReader;
import com.redis.spring.batch.DataStructure;
import com.redis.spring.batch.DataStructure.Type;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.DataStructure.Type;

import io.lettuce.core.ScoredValue;
import io.lettuce.core.StreamMessage;
Expand Down Expand Up @@ -72,8 +72,8 @@ protected Job job(JobCommandContext context) throws Exception {
reader.setName(name);
ProgressMonitor monitor = progressMonitor().task("Importing " + expandedFile).build();
DataStructureProcessor processor = new DataStructureProcessor();
RedisItemWriter<String, String, DataStructure<String>> writer = writerOptions
.configure(RedisItemWriter.dataStructure(context.getRedisClient())).build();
RedisItemWriter<String, String, DataStructure<String>> writer = RedisItemWriter
.dataStructure(context.pool()).options(writerOptions.writerOptions()).build();
steps.add(step(step(context, name, reader, processor, writer), monitor).build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

public class DumpFileOptions extends FileOptions {

@Option(names = { "-t", "--filetype" }, description = "File type: ${COMPLETION-CANDIDATES}", paramLabel = "<type>")
@Option(names = { "-t", "--filetype" }, description = "File type: ${COMPLETION-CANDIDATES}.", paramLabel = "<type>")
protected Optional<DumpFileType> type = Optional.empty();

public Optional<DumpFileType> getType() {
Expand All @@ -30,18 +30,18 @@ public DumpFileType type(Resource resource) {
return type.get();
}
Optional<FileExtension> extension = FileUtils.extension(resource);
if (extension.isEmpty()) {
throw new UnknownFileTypeException("Unknown file extension");
}
switch (extension.get()) {
return type(extension.orElseThrow(() -> new UnknownFileTypeException("Unknown file extension")));
}

private DumpFileType type(FileExtension extension) {
switch (extension) {
case XML:
return DumpFileType.XML;
case JSON:
return DumpFileType.JSON;
default:
throw new UnsupportedOperationException("Unsupported file extension: " + extension.get());
throw new UnsupportedOperationException("Unsupported file extension: " + extension);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.redis.riot.JobCommandContext;
import com.redis.riot.file.resource.JsonResourceItemWriterBuilder;
import com.redis.riot.file.resource.XmlResourceItemWriterBuilder;
import com.redis.spring.batch.DataStructure;
import com.redis.spring.batch.common.DataStructure;

import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ public class FileExportOptions extends DumpFileOptions {
public static final String DEFAULT_ELEMENT_NAME = "record";
public static final String DEFAULT_ROOT_NAME = "root";

@Option(names = "--append", description = "Append to file if it exists")
@Option(names = "--append", description = "Append to file if it exists.")
private boolean append;
@Option(names = "--root", description = "XML root element tag name (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
@Option(names = "--root", description = "XML root element tag name (default: ${DEFAULT-VALUE}).", paramLabel = "<string>")
private String rootName = DEFAULT_ROOT_NAME;
@Option(names = "--element", description = "XML element tag name (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
@Option(names = "--element", description = "XML element tag name (default: ${DEFAULT-VALUE}).", paramLabel = "<string>")
private String elementName = DEFAULT_ELEMENT_NAME;
@Option(names = "--line-sep", description = "String to separate lines (default: system default)", paramLabel = "<string>")
@Option(names = "--line-sep", description = "String to separate lines (default: system default).", paramLabel = "<string>")
private String lineSeparator = AbstractFileItemWriter.DEFAULT_LINE_SEPARATOR;

public boolean isAppend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -107,39 +106,13 @@ private Resource resource(String file) throws IOException {
return options.inputResource(file);
}

private FileType type(Optional<FileExtension> extension) {
Optional<FileType> type = options.getType();
if (type.isPresent()) {
return type.get();
}
if (extension.isPresent()) {
switch (extension.get()) {
case FW:
return FileType.FIXED;
case JSON:
return FileType.JSON;
case XML:
return FileType.XML;
case CSV:
case PSV:
case TSV:
return FileType.DELIMITED;
default:
throw new UnknownFileTypeException("Unknown file extension: " + extension);
}
}
throw new UnknownFileTypeException("Could not determine file type");
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private AbstractItemStreamItemReader<Map<String, Object>> reader(Resource resource) {
Optional<FileExtension> extension = FileUtils.extension(resource);
FileType type = type(extension);
FileType type = options.getType().orElseGet(() -> type(resource));
switch (type) {
case DELIMITED:
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
options.getDelimiter().ifPresentOrElse(tokenizer::setDelimiter,
() -> tokenizer.setDelimiter(delimiter(extension.get())));
tokenizer.setDelimiter(options.getDelimiter().orElseGet(() -> delimiter(extension(resource))));
tokenizer.setQuoteCharacter(options.getQuoteCharacter());
if (!ObjectUtils.isEmpty(options.getIncludedFields())) {
tokenizer.setIncludedFields(options.getIncludedFields());
Expand Down Expand Up @@ -169,6 +142,29 @@ private AbstractItemStreamItemReader<Map<String, Object>> reader(Resource resour
}
}

private FileType type(Resource resource) {
FileExtension extension = extension(resource);
switch (extension) {
case FW:
return FileType.FIXED;
case JSON:
return FileType.JSON;
case XML:
return FileType.XML;
case CSV:
case PSV:
case TSV:
return FileType.DELIMITED;
default:
throw new UnknownFileTypeException("Unknown file extension: " + extension);
}
}

private FileExtension extension(Resource resource) {
return FileUtils.extension(resource)
.orElseThrow(() -> new UnknownFileTypeException("Could not determine file type"));
}

private String delimiter(FileExtension extension) {
switch (extension) {
case CSV:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ public enum FileType {

public static final String DEFAULT_CONTINUATION_STRING = "\\";

@Option(names = { "-t", "--filetype" }, description = "File type: ${COMPLETION-CANDIDATES}", paramLabel = "<type>")
@Option(names = { "-t", "--filetype" }, description = "File type: ${COMPLETION-CANDIDATES}.", paramLabel = "<type>")
private Optional<FileType> type = Optional.empty();
@Option(names = "--fields", arity = "1..*", description = "Delimited/FW field names", paramLabel = "<names>")
@Option(names = "--fields", arity = "1..*", description = "Delimited/FW field names.", paramLabel = "<names>")
private List<String> names = new ArrayList<>();
@Option(names = { "-h", "--header" }, description = "Delimited/FW first line contains field names")
@Option(names = { "-h", "--header" }, description = "Delimited/FW first line contains field names.")
private boolean header;
@Option(names = "--delimiter", description = "Delimiter character", paramLabel = "<string>")
@Option(names = "--delimiter", description = "Delimiter character.", paramLabel = "<string>")
private Optional<String> delimiter = Optional.empty();
@Option(names = "--skip", description = "Delimited/FW lines to skip at start", paramLabel = "<count>")
@Option(names = "--skip", description = "Delimited/FW lines to skip at start.", paramLabel = "<count>")
private Optional<Integer> linesToSkip = Optional.empty();
@Option(names = "--include", arity = "1..*", description = "Delimited/FW field indices to include (0-based)", paramLabel = "<index>")
@Option(names = "--include", arity = "1..*", description = "Delimited/FW field indices to include (0-based).", paramLabel = "<index>")
private int[] includedFields;
@Option(names = "--ranges", arity = "1..*", description = "Fixed-width column ranges", paramLabel = "<string>")
@Option(names = "--ranges", arity = "1..*", description = "Fixed-width column ranges.", paramLabel = "<string>")
private List<String> columnRanges = new ArrayList<>();
@Option(names = "--quote", description = "Escape character for delimited files (default: ${DEFAULT-VALUE})", paramLabel = "<char>")
@Option(names = "--quote", description = "Escape character for delimited files (default: ${DEFAULT-VALUE}).", paramLabel = "<char>")
private Character quoteCharacter = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER;
@Option(names = "--cont", description = "Line continuation string (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
@Option(names = "--cont", description = "Line continuation string (default: ${DEFAULT-VALUE}).", paramLabel = "<string>")
private String continuationString = DEFAULT_CONTINUATION_STRING;

public FileImportOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public class FileOptions {

public static final Charset DEFAULT_ENCODING = Charset.defaultCharset();

@Option(names = "--encoding", description = "File encoding (default: ${DEFAULT-VALUE})", paramLabel = "<charset>")
@Option(names = "--encoding", description = "File encoding (default: ${DEFAULT-VALUE}).", paramLabel = "<charset>")
protected Charset encoding = DEFAULT_ENCODING;
@Option(names = { "-z", "--gzip" }, description = "File is gzip compressed")
@Option(names = { "-z", "--gzip" }, description = "File is gzip compressed.")
protected boolean gzip;
@ArgGroup(exclusive = false, heading = "Amazon Simple Storage Service options%n")
protected S3Options s3 = new S3Options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

public class GcsOptions {

@Option(names = "--gcs-key-file", description = "GCS private key (e.g. /usr/local/key.json)", paramLabel = "<file>")
@Option(names = "--gcs-key-file", description = "GCS private key (e.g. /usr/local/key.json).", paramLabel = "<file>")
private Optional<File> credentials = Optional.empty();
@Option(names = "--gcs-project", description = "GCP project id", paramLabel = "<id>")
@Option(names = "--gcs-project", description = "GCP project id.", paramLabel = "<id>")
private Optional<String> projectId = Optional.empty();
@Option(names = "--gcs-key", arity = "0..1", interactive = true, description = "GCS Base64 encoded key", paramLabel = "<key>")
@Option(names = "--gcs-key", arity = "0..1", interactive = true, description = "GCS Base64 encoded key.", paramLabel = "<key>")
private Optional<String> encodedKey = Optional.empty();

public void setCredentials(File credentials) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import org.springframework.cloud.aws.core.io.s3.SimpleStorageProtocolResolver;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
Expand All @@ -18,11 +16,11 @@

public class S3Options {

@Option(names = "--s3-access", description = "Access key", paramLabel = "<key>")
@Option(names = "--s3-access", description = "Access key.", paramLabel = "<key>")
private Optional<String> accessKey = Optional.empty();
@Option(names = "--s3-secret", arity = "0..1", interactive = true, description = "Secret key", paramLabel = "<key>")
@Option(names = "--s3-secret", arity = "0..1", interactive = true, description = "Secret key.", paramLabel = "<key>")
private Optional<String> secretKey = Optional.empty();
@Option(names = "--s3-region", description = "AWS region", paramLabel = "<name>")
@Option(names = "--s3-region", description = "AWS region.", paramLabel = "<name>")
private Optional<String> region = Optional.empty();

public void setAccessKey(String accessKey) {
Expand All @@ -40,10 +38,9 @@ public void setRegion(String region) {
public Resource resource(String location) {
AmazonS3ClientBuilder clientBuilder = AmazonS3Client.builder();
region.ifPresent(clientBuilder::withRegion);
if (accessKey.isPresent()) {
Assert.isTrue(secretKey.isPresent(), "Secret key is missing");
clientBuilder.withCredentials(new SimpleAWSCredentialsProvider(accessKey.get(), secretKey.get()));
}
accessKey.ifPresent(
a -> clientBuilder.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(a,
secretKey.orElseThrow(() -> new IllegalArgumentException("Secret key is missing"))))));
SimpleStorageProtocolResolver resolver = new SimpleStorageProtocolResolver() {
@Override
public AmazonS3 getAmazonS3() {
Expand All @@ -54,28 +51,6 @@ public AmazonS3 getAmazonS3() {
return resolver.resolve(location, new DefaultResourceLoader());
}

private static class SimpleAWSCredentialsProvider implements AWSCredentialsProvider {

private final String accessKey;
private final String secretKey;

public SimpleAWSCredentialsProvider(String accessKey, String secretKey) {
this.accessKey = accessKey;
this.secretKey = secretKey;
}

@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(accessKey, secretKey);
}

@Override
public void refresh() {
// do nothing
}

}

@Override
public String toString() {
return "S3Options [accessKey=" + accessKey + ", secretKey=" + secretKey + ", region=" + region + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.redis.riot.file.resource.XmlItemReaderBuilder;
import com.redis.riot.file.resource.XmlObjectReader;
import com.redis.riot.redis.HsetCommand;
import com.redis.spring.batch.DataStructure;
import com.redis.spring.batch.common.DataStructure;
import com.redis.testcontainers.junit.RedisTestContext;
import com.redis.testcontainers.junit.RedisTestContextsSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.redis.riot.file.resource.XmlResourceItemWriter;
import com.redis.riot.file.resource.XmlResourceItemWriterBuilder;
import com.redis.spring.batch.DataStructure;
import com.redis.spring.batch.DataStructure.Type;
import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.DataStructure.Type;

class TestXmlItemWriter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import com.redis.riot.JobCommandContext;
import com.redis.riot.ProgressMonitor;
import com.redis.riot.RedisWriterOptions;
import com.redis.spring.batch.DataStructure;
import com.redis.spring.batch.DataStructure.Type;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.DataStructure.Type;
import com.redis.spring.batch.reader.DataStructureGeneratorItemReader;

import picocli.CommandLine.ArgGroup;
Expand All @@ -35,8 +35,8 @@ public class DataStructureGeneratorCommand extends AbstractTransferCommand {

@Override
protected Job job(JobCommandContext context) throws Exception {
RedisItemWriter<String, String, DataStructure<String>> writer = writerOptions
.configure(RedisItemWriter.dataStructure(context.getRedisClient())).build();
RedisItemWriter<String, String, DataStructure<String>> writer = RedisItemWriter.dataStructure(context.pool())
.options(writerOptions.writerOptions()).build();
log.log(Level.FINE, "Creating random data structure reader with {0}", options);
ProgressMonitor monitor = options.configure(progressMonitor()).task("Generating").build();
return job(context, NAME, step(context, NAME, reader(), null, writer), monitor);
Expand All @@ -53,10 +53,9 @@ private ItemReader<DataStructure<String>> reader() {
.zsetScore(options.getZsetScore()).hashSize(options.getHashSize())
.hashFieldSize(options.getHashFieldSize()).jsonFieldCount(options.getJsonSize())
.jsonFieldSize(options.getJsonFieldSize());
options.getTimeseriesStartTime().ifPresent(t -> reader.timeseriesStartTime(t.toEpochMilli()));
options.getExpiration().ifPresent(reader::expiration);
options.configureReader(reader);
Optional<Long> sleep = options.getSleep();
if (sleep.isPresent() && sleep.get() > 0) {
if (sleep.isPresent()) {
return new ThrottledItemReader<>(reader.build(), sleep.get());
}
return reader.build();
Expand Down
Loading

0 comments on commit 1c4c71c

Please sign in to comment.