diff --git a/core/riot-file/src/main/java/com/redis/riot/file/AbstractRegistry.java b/core/riot-file/src/main/java/com/redis/riot/file/AbstractRegistry.java deleted file mode 100644 index c4cc68c7..00000000 --- a/core/riot-file/src/main/java/com/redis/riot/file/AbstractRegistry.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.redis.riot.file; - -import java.util.HashSet; -import java.util.Set; - -import org.springframework.core.io.ProtocolResolver; -import org.springframework.core.io.Resource; -import org.springframework.util.MimeType; - -public class AbstractRegistry { - - private ResourceMap resourceMap = new RiotResourceMap(); - private Set protocolResolvers = new HashSet<>(); - - protected MimeType type(Resource resource, FileOptions options) { - if (options.getContentType() == null) { - return MimeType.valueOf(resourceMap.getContentTypeFor(resource)); - } - return options.getContentType(); - } - - public void addProtocolResolver(ProtocolResolver protocolResolver) { - protocolResolvers.add(protocolResolver); - } - - protected Resource resource(String location, FileOptions options) { - RiotResourceLoader resourceLoader = new RiotResourceLoader(); - protocolResolvers.forEach(resourceLoader::addProtocolResolver); - resourceLoader.getS3ProtocolResolver().setClientSupplier(options.getS3Options()::client); - resourceLoader.getGoogleStorageProtocolResolver() - .setStorageSupplier(options.getGoogleStorageOptions()::storage); - return resourceLoader.getResource(location); - } - - public ResourceMap getResourceMap() { - return resourceMap; - } - - public void setResourceMap(ResourceMap resourceMap) { - this.resourceMap = resourceMap; - } - -} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileReaderRegistry.java b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderRegistry.java index 07e335c4..0b99b535 100644 --- a/core/riot-file/src/main/java/com/redis/riot/file/FileReaderRegistry.java +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderRegistry.java @@ -1,14 +1,11 @@ package com.redis.riot.file; -import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.zip.GZIPInputStream; -import org.springframework.core.io.Resource; import org.springframework.util.MimeType; -public class FileReaderRegistry extends AbstractRegistry { +public class FileReaderRegistry { private final Map factories = new HashMap<>(); @@ -16,43 +13,19 @@ public void register(MimeType type, ReaderFactory factory) { factories.put(type, factory); } - public FileReaderResult find(String location, ReadOptions options) { - Resource resource = resource(location, options); - MimeType type = type(resource, options); - FileReaderResult reader = new FileReaderResult(); - reader.setResource(resource); - reader.setType(type); - ReaderFactory factory = factories.get(type); - if (factory != null) { - reader.setReader(factory.create(resource, options)); - } - return reader; - } - - @Override - protected Resource resource(String location, FileOptions options) { - Resource resource = super.resource(location, options); - if (options.isGzip() || FileUtils.isGzip(resource.getFilename())) { - GZIPInputStream gzipInputStream; - try { - gzipInputStream = new GZIPInputStream(resource.getInputStream()); - } catch (IOException e) { - throw new RuntimeIOException("Could not create GZip input stream", e); - } - return new NamedInputStreamResource(gzipInputStream, resource.getFilename(), resource.getDescription()); - } - return resource; + public ReaderFactory getReaderFactory(MimeType type) { + return factories.get(type); } public static FileReaderRegistry defaultReaderRegistry() { FileReaderRegistry registry = new FileReaderRegistry(); - registry.register(FileUtils.JSON, new JsonReaderFactory()); - registry.register(FileUtils.JSON_LINES, new JsonLinesReaderFactory()); - registry.register(FileUtils.XML, new XmlReaderFactory()); - registry.register(FileUtils.CSV, new DelimitedReaderFactory(FileOptions.DELIMITER_COMMA)); - registry.register(FileUtils.PSV, new DelimitedReaderFactory(FileOptions.DELIMITER_PIPE)); - registry.register(FileUtils.TSV, new DelimitedReaderFactory(FileOptions.DELIMITER_TAB)); - registry.register(FileUtils.TEXT, new FixedWidthReaderFactory()); + registry.register(ResourceFactory.JSON, new JsonReaderFactory()); + registry.register(ResourceFactory.JSON_LINES, new JsonLinesReaderFactory()); + registry.register(ResourceFactory.XML, new XmlReaderFactory()); + registry.register(ResourceFactory.CSV, new DelimitedReaderFactory(FileOptions.DELIMITER_COMMA)); + registry.register(ResourceFactory.PSV, new DelimitedReaderFactory(FileOptions.DELIMITER_PIPE)); + registry.register(ResourceFactory.TSV, new DelimitedReaderFactory(FileOptions.DELIMITER_TAB)); + registry.register(ResourceFactory.TEXT, new FixedWidthReaderFactory()); return registry; } diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileUtils.java b/core/riot-file/src/main/java/com/redis/riot/file/FileUtils.java deleted file mode 100644 index d9f1a2b4..00000000 --- a/core/riot-file/src/main/java/com/redis/riot/file/FileUtils.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.redis.riot.file; - -import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; - -public abstract class FileUtils { - - public static final String GZ_SUFFIX = ".gz"; - - public static final MimeType CSV = new MimeType("text", "csv"); - public static final MimeType PSV = new MimeType("text", "psv"); - public static final MimeType TSV = new MimeType("text", "tsv"); - public static final MimeType TEXT = new MimeType("text", "plain"); - public static final MimeType JSON = MimeTypeUtils.APPLICATION_JSON; - public static final MimeType JSON_LINES = new MimeType("application", "jsonlines"); - public static final MimeType XML = MimeTypeUtils.APPLICATION_XML; - - private FileUtils() { - } - - public static boolean isGzip(String filename) { - return filename.endsWith(GZ_SUFFIX); - } - - public static String stripGzipSuffix(String filename) { - if (isGzip(filename)) { - return filename.substring(0, filename.length() - GZ_SUFFIX.length()); - } - return filename; - } - -} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileWriterRegistry.java b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterRegistry.java index ce748f78..c88a3960 100644 --- a/core/riot-file/src/main/java/com/redis/riot/file/FileWriterRegistry.java +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterRegistry.java @@ -1,71 +1,31 @@ package com.redis.riot.file; -import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.zip.GZIPOutputStream; -import org.springframework.core.io.Resource; -import org.springframework.core.io.WritableResource; -import org.springframework.util.Assert; import org.springframework.util.MimeType; -public class FileWriterRegistry extends AbstractRegistry { +public class FileWriterRegistry { private final Map factories = new HashMap<>(); - private ResourceMap resourceMap = new RiotResourceMap(); - - public ResourceMap getResourceMap() { - return resourceMap; - } - - public void setResourceMap(ResourceMap resourceMap) { - this.resourceMap = resourceMap; - } public void register(MimeType type, WriterFactory factory) { factories.put(type, factory); } - public FileWriterResult find(String location, WriteOptions options) { - WritableResource resource = resource(location, options); - MimeType type = type(resource, options); - FileWriterResult result = new FileWriterResult(); - result.setResource(resource); - result.setType(type); - WriterFactory factory = factories.get(type); - if (factory != null) { - result.setWriter(factory.create(resource, options)); - } - return result; - } - - @Override - protected WritableResource resource(String location, FileOptions options) { - Resource resource = super.resource(location, options); - Assert.isInstanceOf(WritableResource.class, resource, "Resource is not writable"); - WritableResource writableResource = (WritableResource) resource; - if (options.isGzip() || FileUtils.isGzip(resource.getFilename())) { - GZIPOutputStream gzipOutputStream; - try { - gzipOutputStream = new GZIPOutputStream(writableResource.getOutputStream()); - } catch (IOException e) { - throw new RuntimeIOException("Could not create GZip output stream", e); - } - return new OutputStreamResource(gzipOutputStream, resource.getFilename(), resource.getDescription()); - } - return writableResource; + public WriterFactory getWriterFactory(MimeType type) { + return factories.get(type); } public static FileWriterRegistry defaultWriterRegistry() { FileWriterRegistry registry = new FileWriterRegistry(); - registry.register(FileUtils.JSON, new JsonWriterFactory()); - registry.register(FileUtils.JSON_LINES, new JsonLinesWriterFactory()); - registry.register(FileUtils.XML, new XmlWriterFactory()); - registry.register(FileUtils.CSV, new DelimitedWriterFactory(FileOptions.DELIMITER_COMMA)); - registry.register(FileUtils.PSV, new DelimitedWriterFactory(FileOptions.DELIMITER_PIPE)); - registry.register(FileUtils.TSV, new DelimitedWriterFactory(FileOptions.DELIMITER_TAB)); - registry.register(FileUtils.TEXT, new FormattedWriterFactory()); + registry.register(ResourceFactory.JSON, new JsonWriterFactory()); + registry.register(ResourceFactory.JSON_LINES, new JsonLinesWriterFactory()); + registry.register(ResourceFactory.XML, new XmlWriterFactory()); + registry.register(ResourceFactory.CSV, new DelimitedWriterFactory(FileOptions.DELIMITER_COMMA)); + registry.register(ResourceFactory.PSV, new DelimitedWriterFactory(FileOptions.DELIMITER_PIPE)); + registry.register(ResourceFactory.TSV, new DelimitedWriterFactory(FileOptions.DELIMITER_TAB)); + registry.register(ResourceFactory.TEXT, new FormattedWriterFactory()); return registry; } diff --git a/core/riot-file/src/main/java/com/redis/riot/file/ResourceFactory.java b/core/riot-file/src/main/java/com/redis/riot/file/ResourceFactory.java new file mode 100644 index 00000000..e85f7f4e --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/ResourceFactory.java @@ -0,0 +1,121 @@ +package com.redis.riot.file; + +import java.io.IOException; +import java.net.FileNameMap; +import java.util.HashSet; +import java.util.Set; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.springframework.core.io.ProtocolResolver; +import org.springframework.core.io.Resource; +import org.springframework.core.io.WritableResource; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +public class ResourceFactory { + + public static final String GZ_SUFFIX = ".gz"; + + public static final MimeType CSV = new MimeType("text", "csv"); + public static final MimeType PSV = new MimeType("text", "psv"); + public static final MimeType TSV = new MimeType("text", "tsv"); + public static final MimeType TEXT = new MimeType("text", "plain"); + public static final MimeType JSON = MimeTypeUtils.APPLICATION_JSON; + public static final MimeType JSON_LINES = new MimeType("application", "jsonlines"); + public static final MimeType XML = MimeTypeUtils.APPLICATION_XML; + + private ResourceMap resourceMap = defaultResourceMap(); + private Set protocolResolvers = new HashSet<>(); + + public MimeType type(Resource resource) { + return MimeType.valueOf(resourceMap.getContentTypeFor(resource)); + } + + public MimeType type(Resource resource, FileOptions options) { + if (options.getContentType() == null) { + return MimeType.valueOf(resourceMap.getContentTypeFor(resource)); + } + return options.getContentType(); + } + + private static class JsonLinesFileNameMap implements FileNameMap { + + public static final String JSONL_SUFFIX = ".jsonl"; + + @Override + public String getContentTypeFor(String fileName) { + if (fileName == null) { + return null; + } + if (fileName.endsWith(JSONL_SUFFIX)) { + return JSON_LINES.toString(); + } + return null; + } + + } + + public static ResourceMap defaultResourceMap() { + RiotResourceMap resourceMap = new RiotResourceMap(); + resourceMap.addFileNameMap(new JsonLinesFileNameMap()); + return resourceMap; + } + + public void addProtocolResolver(ProtocolResolver protocolResolver) { + protocolResolvers.add(protocolResolver); + } + + public Resource resource(String location, FileOptions options) throws IOException { + Resource resource = createResource(location, options); + if (isGzip(resource, options)) { + GZIPInputStream gzipInputStream = new GZIPInputStream(resource.getInputStream()); + return new NamedInputStreamResource(gzipInputStream, resource.getFilename(), resource.getDescription()); + } + return resource; + } + + private boolean isGzip(Resource resource, FileOptions options) { + return options.isGzip() || isGzip(resource.getFilename()); + } + + public WritableResource writableResource(String location, FileOptions options) throws IOException { + Resource resource = createResource(location, options); + Assert.isInstanceOf(WritableResource.class, resource); + if (options.isGzip() || isGzip(resource.getFilename())) { + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(((WritableResource) resource).getOutputStream()); + return new OutputStreamResource(gzipOutputStream, resource.getFilename(), resource.getDescription()); + } + return (WritableResource) resource; + } + + private Resource createResource(String location, FileOptions options) { + RiotResourceLoader resourceLoader = new RiotResourceLoader(); + protocolResolvers.forEach(resourceLoader::addProtocolResolver); + resourceLoader.getS3ProtocolResolver().setClientSupplier(options.getS3Options()::client); + resourceLoader.getGoogleStorageProtocolResolver() + .setStorageSupplier(options.getGoogleStorageOptions()::storage); + return resourceLoader.getResource(location); + } + + public static boolean isGzip(String filename) { + return filename.endsWith(GZ_SUFFIX); + } + + public static String stripGzipSuffix(String filename) { + if (isGzip(filename)) { + return filename.substring(0, filename.length() - GZ_SUFFIX.length()); + } + return filename; + } + + public ResourceMap getResourceMap() { + return resourceMap; + } + + public void setResourceMap(ResourceMap resourceMap) { + this.resourceMap = resourceMap; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/RiotResourceMap.java b/core/riot-file/src/main/java/com/redis/riot/file/RiotResourceMap.java index ee8fae3b..110cc231 100644 --- a/core/riot-file/src/main/java/com/redis/riot/file/RiotResourceMap.java +++ b/core/riot-file/src/main/java/com/redis/riot/file/RiotResourceMap.java @@ -11,18 +11,12 @@ public class RiotResourceMap implements ResourceMap { - private final Set fileNameMaps = defaultFileNameMaps(); + private final Set fileNameMaps = new LinkedHashSet<>(); public void addFileNameMap(FileNameMap map) { fileNameMaps.add(map); } - public static Set defaultFileNameMaps() { - Set maps = new LinkedHashSet<>(); - maps.add(new JsonLinesFileNameMap()); - return maps; - } - @Override public String getContentTypeFor(Resource resource) { String type = null; @@ -40,7 +34,7 @@ public String getContentTypeFor(Resource resource) { } public String getContentTypeFor(String filename) { - String normalizedFilename = FileUtils.stripGzipSuffix(filename); + String normalizedFilename = ResourceFactory.stripGzipSuffix(filename); String type = URLConnection.guessContentTypeFromName(normalizedFilename); if (type != null) { return type; @@ -54,21 +48,4 @@ public String getContentTypeFor(String filename) { throw new IllegalArgumentException("Could not determine type of " + filename); } - private static class JsonLinesFileNameMap implements FileNameMap { - - public static final String JSONL_SUFFIX = ".jsonl"; - - @Override - public String getContentTypeFor(String fileName) { - if (fileName == null) { - return null; - } - if (fileName.endsWith(JSONL_SUFFIX)) { - return FileUtils.JSON_LINES.toString(); - } - return null; - } - - } - } diff --git a/core/riot-file/src/test/java/com/redis/riot/file/ReaderTests.java b/core/riot-file/src/test/java/com/redis/riot/file/ReaderTests.java index ca27d5a7..e3800a10 100644 --- a/core/riot-file/src/test/java/com/redis/riot/file/ReaderTests.java +++ b/core/riot-file/src/test/java/com/redis/riot/file/ReaderTests.java @@ -19,6 +19,7 @@ import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.json.JsonItemReader; import org.springframework.core.io.Resource; +import org.springframework.util.MimeType; import org.springframework.util.StreamUtils; import software.amazon.awssdk.regions.Region; @@ -37,6 +38,7 @@ public class ReaderTests { public static final String JSON_GOOGLE_STORAGE_URL = "gs://riot-bucket-jrx/beers.json"; public static final String JSON_GZ_URL = "http://storage.googleapis.com/jrx/beers.json.gz"; + private final ResourceFactory resourceFactory = new ResourceFactory(); private final FileReaderRegistry registry = FileReaderRegistry.defaultReaderRegistry(); @Test @@ -100,7 +102,9 @@ private void assertRead(String location, Class expectedType, int expectedCoun private void assertRead(String location, ReadOptions options, Class expectedType, int expectedCount) throws Exception { - ItemReader reader = registry.find(location, options).getReader(); + Resource resource = resourceFactory.resource(location, options); + MimeType type = resourceFactory.type(resource); + ItemReader reader = registry.getReaderFactory(type).create(resource, options); Assertions.assertNotNull(reader); List items = readAll(reader); Assertions.assertEquals(expectedCount, items.size()); diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractFileExport.java b/plugins/riot/src/main/java/com/redis/riot/AbstractFileExport.java index fd85703d..3726acda 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractFileExport.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractFileExport.java @@ -1,5 +1,6 @@ package com.redis.riot; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -10,15 +11,19 @@ import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemWriter; +import org.springframework.core.io.WritableResource; +import org.springframework.util.Assert; import org.springframework.util.MimeType; import com.redis.riot.core.RiotInitializationException; import com.redis.riot.core.Step; -import com.redis.riot.file.FileUtils; import com.redis.riot.file.FileWriterRegistry; -import com.redis.riot.file.FileWriterResult; +import com.redis.riot.file.ResourceFactory; +import com.redis.riot.file.RuntimeIOException; import com.redis.riot.file.StdOutProtocolResolver; import com.redis.riot.file.WriteOptions; +import com.redis.riot.file.WriterFactory; import com.redis.spring.batch.item.redis.RedisItemReader; import com.redis.spring.batch.item.redis.common.KeyValue; @@ -31,7 +36,7 @@ public abstract class AbstractFileExport extends AbstractRedisExportCommand { private Set flatFileTypes = new HashSet<>( - Arrays.asList(FileUtils.CSV, FileUtils.PSV, FileUtils.TSV, FileUtils.TEXT)); + Arrays.asList(ResourceFactory.CSV, ResourceFactory.PSV, ResourceFactory.TSV, ResourceFactory.TEXT)); @Parameters(arity = "0..1", description = "File path or URL. If omitted, export is written to stdout.", paramLabel = "FILE") private String file = StdOutProtocolResolver.DEFAULT_FILENAME; @@ -43,15 +48,27 @@ public abstract class AbstractFileExport extends AbstractRedisExportCommand { private ContentType contentType = ContentType.STRUCT; private FileWriterRegistry writerRegistry; + private ResourceFactory resourceFactory; private WriteOptions writeOptions; @Override protected void initialize() throws RiotInitializationException { super.initialize(); - writerRegistry = FileWriterRegistry.defaultWriterRegistry(); + writerRegistry = writerRegistry(); + resourceFactory = resourceFactory(); writeOptions = writeOptions(); } + protected FileWriterRegistry writerRegistry() { + return FileWriterRegistry.defaultWriterRegistry(); + } + + protected ResourceFactory resourceFactory() { + ResourceFactory factory = new ResourceFactory(); + factory.addProtocolResolver(new StdOutProtocolResolver()); + return factory; + } + private WriteOptions writeOptions() { WriteOptions writeOptions = fileWriterArgs.fileWriterOptions(); writeOptions.setContentType(getFileType()); @@ -72,8 +89,17 @@ public void setFlatFileTypes(MimeType... types) { @SuppressWarnings("unchecked") private Step step() { - FileWriterResult writer = writerRegistry.find(file, writeOptions); - return step(writer.getWriter()).processor(processor(writer.getType())); + WritableResource resource; + try { + resource = resourceFactory.writableResource(file, writeOptions); + } catch (IOException e) { + throw new RuntimeIOException(String.format("Could not create resource from file %s", file), e); + } + MimeType type = resourceFactory.type(resource, writeOptions); + WriterFactory writerFactory = writerRegistry.getWriterFactory(type); + Assert.notNull(writerFactory, String.format("No writer found for file %s", file)); + ItemWriter writer = writerFactory.create(resource, writeOptions); + return step(writer).processor(processor(type)); } @Override diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractFileImport.java b/plugins/riot/src/main/java/com/redis/riot/AbstractFileImport.java index 799ac579..1c681904 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractFileImport.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractFileImport.java @@ -1,18 +1,23 @@ package com.redis.riot; +import java.io.IOException; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.springframework.batch.core.Job; import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.function.FunctionItemProcessor; +import org.springframework.core.io.Resource; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.MimeType; @@ -22,8 +27,10 @@ import com.redis.riot.core.Step; import com.redis.riot.core.processor.RegexNamedGroupFunction; import com.redis.riot.file.FileReaderRegistry; -import com.redis.riot.file.FileReaderResult; import com.redis.riot.file.ReadOptions; +import com.redis.riot.file.ReaderFactory; +import com.redis.riot.file.ResourceFactory; +import com.redis.riot.file.RuntimeIOException; import com.redis.riot.file.StdInProtocolResolver; import com.redis.riot.function.MapToFieldFunction; import com.redis.spring.batch.item.redis.RedisItemWriter; @@ -38,6 +45,8 @@ public abstract class AbstractFileImport extends AbstractRedisImportCommand { public static final String STDIN_FILENAME = "-"; + private static final Set keyValueTypes = new HashSet<>( + Arrays.asList(ResourceFactory.JSON, ResourceFactory.JSON_LINES, ResourceFactory.XML)); @Parameters(arity = "1..*", description = "Files or URLs to import. Use '-' to read from stdin.", paramLabel = "FILE") private List files; @@ -49,6 +58,7 @@ public abstract class AbstractFileImport extends AbstractRedisImportCommand { private Map regexes = new LinkedHashMap<>(); private FileReaderRegistry readerRegistry; + private ResourceFactory resourceFactory; private ReadOptions readOptions; @Override @@ -56,15 +66,20 @@ protected void initialize() throws RiotInitializationException { super.initialize(); Assert.notEmpty(files, "No file specified"); readerRegistry = readerRegistry(); + resourceFactory = resourceFactory(); readOptions = readOptions(); } - private FileReaderRegistry readerRegistry() { - FileReaderRegistry registry = FileReaderRegistry.defaultReaderRegistry(); + protected FileReaderRegistry readerRegistry() { + return FileReaderRegistry.defaultReaderRegistry(); + } + + protected ResourceFactory resourceFactory() { + ResourceFactory resourceFactory = new ResourceFactory(); StdInProtocolResolver stdInProtocolResolver = new StdInProtocolResolver(); stdInProtocolResolver.setFilename(STDIN_FILENAME); - registry.addProtocolResolver(stdInProtocolResolver); - return registry; + resourceFactory.addProtocolResolver(stdInProtocolResolver); + return resourceFactory; } @Override @@ -73,21 +88,30 @@ protected Job job() { } private Step step(String location) { - FileReaderResult reader = readerRegistry.find(location, readOptions); - Assert.notNull(reader.getReader(), - () -> String.format("No reader found for type %s and file %s", reader.getType(), location)); + Resource resource; + try { + resource = resourceFactory.resource(location, readOptions); + } catch (IOException e) { + throw new RuntimeIOException(String.format("Could not create resource from %s", location), e); + } + MimeType type = resourceFactory.type(resource, readOptions); + ReaderFactory readerFactory = readerRegistry.getReaderFactory(type); + Assert.notNull(readerFactory, () -> String.format("No reader found for file %s", location)); + ItemReader reader = readerFactory.create(resource, readOptions); RedisItemWriter writer = writer(); configureTargetRedisWriter(writer); - Step step = new Step<>(reader.getReader(), writer); + Step step = new Step<>(reader, writer); step.name(location); if (hasOperations()) { step.processor(RiotUtils.processor(processor(), regexProcessor())); + } else { + Assert.isTrue(keyValueTypes.contains(type), "No Redis operation specified"); } step.skip(ParseException.class); step.skip(org.springframework.batch.item.ParseException.class); step.noRetry(ParseException.class); step.noRetry(org.springframework.batch.item.ParseException.class); - step.taskName(String.format("Importing %s", reader.getResource().getFilename())); + step.taskName(String.format("Importing %s", resource.getFilename())); return step; } diff --git a/plugins/riot/src/main/java/com/redis/riot/FileTypeArgs.java b/plugins/riot/src/main/java/com/redis/riot/FileTypeArgs.java index ccf5df49..4556fae7 100644 --- a/plugins/riot/src/main/java/com/redis/riot/FileTypeArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/FileTypeArgs.java @@ -6,7 +6,7 @@ import org.springframework.util.MimeType; -import com.redis.riot.file.FileUtils; +import com.redis.riot.file.ResourceFactory; import picocli.CommandLine.ITypeConverter; import picocli.CommandLine.Option; @@ -29,13 +29,13 @@ public void setType(MimeType type) { public static Map typeMap() { Map map = new HashMap<>(); - map.put("csv", FileUtils.CSV); - map.put("psv", FileUtils.PSV); - map.put("tsv", FileUtils.TSV); - map.put("fw", FileUtils.TEXT); - map.put("json", FileUtils.JSON); - map.put("jsonl", FileUtils.JSON_LINES); - map.put("xml", FileUtils.XML); + map.put("csv", ResourceFactory.CSV); + map.put("psv", ResourceFactory.PSV); + map.put("tsv", ResourceFactory.TSV); + map.put("fw", ResourceFactory.TEXT); + map.put("json", ResourceFactory.JSON); + map.put("jsonl", ResourceFactory.JSON_LINES); + map.put("xml", ResourceFactory.XML); return map; }