From 5e95637560ecd68f81dcb658d732983b845e0fe8 Mon Sep 17 00:00:00 2001 From: Julien Ruaux Date: Wed, 22 Sep 2021 19:23:56 -0700 Subject: [PATCH] Changed default skip policy from `always` to `limit`. Fixes #67 --- connectors/riot-file/src/test/resources/import-bad | 2 +- connectors/riot-file/src/test/resources/import-geoadd | 2 +- .../src/main/java/com/redis/riot/RiotStepBuilder.java | 8 +++----- .../src/main/java/com/redis/riot/TransferOptions.java | 4 ++-- gradle.properties | 2 +- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/connectors/riot-file/src/test/resources/import-bad b/connectors/riot-file/src/test/resources/import-bad index 34339dd14..554d107ed 100755 --- a/connectors/riot-file/src/test/resources/import-bad +++ b/connectors/riot-file/src/test/resources/import-bad @@ -1 +1 @@ -riot-file import http://developer.redis.com/riot/bad.psv --header --delimiter "|" hset --keyspace bad --keys id \ No newline at end of file +riot-file import http://developer.redis.com/riot/bad.psv --header --delimiter "|" --quote "'" hset --keyspace bad --keys id \ No newline at end of file diff --git a/connectors/riot-file/src/test/resources/import-geoadd b/connectors/riot-file/src/test/resources/import-geoadd index bbf7f1dac..dd1605df1 100755 --- a/connectors/riot-file/src/test/resources/import-geoadd +++ b/connectors/riot-file/src/test/resources/import-geoadd @@ -1 +1 @@ -riot-file import http://developer.redis.com/riot/airports.csv --header --skip-limit 1 geoadd --keyspace airportgeo --members AirportID --lon Longitude --lat Latitude \ No newline at end of file +riot-file import http://developer.redis.com/riot/airports.csv --header --skip-limit 3 geoadd --keyspace airportgeo --members AirportID --lon Longitude --lat Latitude \ No newline at end of file diff --git a/core/riot-core/src/main/java/com/redis/riot/RiotStepBuilder.java b/core/riot-core/src/main/java/com/redis/riot/RiotStepBuilder.java index 2826dd783..4967096c9 100644 --- a/core/riot-core/src/main/java/com/redis/riot/RiotStepBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/RiotStepBuilder.java @@ -1,7 +1,5 @@ package com.redis.riot; -import io.lettuce.core.RedisCommandExecutionException; -import io.lettuce.core.RedisCommandTimeoutException; import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -18,12 +16,12 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.redis.support.FlushingStepBuilder; +import org.springframework.batch.item.redis.support.KeyValueItemReader; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.time.Duration; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeoutException; import java.util.function.Supplier; @Slf4j @@ -61,7 +59,7 @@ public FaultTolerantStepBuilder build() { step.listener((StepExecutionListener) monitor); step.listener((ItemWriteListener) monitor); } - FaultTolerantStepBuilder ftStep = faultTolerant(step).skipPolicy(skipPolicy(options.getSkipPolicy())).skipLimit(options.getSkipLimit()).skip(RedisCommandExecutionException.class).skip(RedisCommandTimeoutException.class).skip(TimeoutException.class); + FaultTolerantStepBuilder ftStep = faultTolerant(step).skipPolicy(skipPolicy(options.getSkipPolicy())); if (options.getThreads() > 1) { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(options.getThreads()); @@ -95,7 +93,7 @@ private SkipPolicy skipPolicy(TransferOptions.SkipPolicy policy) { case NEVER: return new NeverSkipItemSkipPolicy(); default: - return new LimitCheckingItemSkipPolicy(); + return new LimitCheckingItemSkipPolicy(options.getSkipLimit(), KeyValueItemReader.KeyValueItemReaderBuilder.DEFAULT_SKIPPABLE_EXCEPTIONS); } } diff --git a/core/riot-core/src/main/java/com/redis/riot/TransferOptions.java b/core/riot-core/src/main/java/com/redis/riot/TransferOptions.java index 1b4f3b1b1..dc7c359c7 100644 --- a/core/riot-core/src/main/java/com/redis/riot/TransferOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/TransferOptions.java @@ -23,8 +23,8 @@ public enum SkipPolicy { @CommandLine.Option(names = {"-b", "--batch"}, description = "Number of items in each batch (default: ${DEFAULT-VALUE})", paramLabel = "") private int chunkSize = 50; @CommandLine.Option(names="--skip-policy", description = "Policy to determine if some processing should be skipped: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "") - private SkipPolicy skipPolicy = SkipPolicy.ALWAYS; + private SkipPolicy skipPolicy = SkipPolicy.LIMIT; @CommandLine.Option(names = "--skip-limit", description = "For LIMIT policy, max number of failed items to skip before considering the transfer has failed (default: ${DEFAULT-VALUE})", paramLabel = "") - private int skipLimit = 10; + private int skipLimit = 3; } diff --git a/gradle.properties b/gradle.properties index f5ac402fe..d2e51fd30 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ targetCompatibility=1.8 avroVersion=1.10.2 awsVersion=2.2.6.RELEASE -batchRedisVersion=2.18.4 +batchRedisVersion=2.18.6 bootPluginVersion=2.5.4 commonsIoVersion=2.11.0 db2Version=11.5.6.0