Skip to content

Commit

Permalink
refactor!: Major refactoring to simplify codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed May 25, 2023
1 parent fffaa00 commit bdbd2ac
Show file tree
Hide file tree
Showing 200 changed files with 1,875 additions and 1,869 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
if: startsWith(github.event.head_commit.message, 'Releasing version') != true
steps:
- uses: actions/checkout@v3

Expand Down
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ subprojects { subproj ->

dependencies {
compileOnly group: 'com.google.code.findbugs', name: 'jsr305', version: jsr305Version
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testImplementation group: 'commons-io', name: 'commons-io', version: commonsIoVersion
testImplementation(group: 'com.redis.testcontainers', name: 'testcontainers-redis-junit', version: testcontainersRedisVersion) {
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: testcontainersVersion
testImplementation(group: 'com.redis.testcontainers', name: 'testcontainers-redis', version: testcontainersRedisVersion) {
exclude group: 'com.redis', module: 'lettucemod'
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class FakerItemReader extends AbstractItemCountingItemStreamItemReader<Ma
private final Generator<Map<String, Object>> generator;

public FakerItemReader(Generator<Map<String, Object>> generator) {
setName(ClassUtils.getShortName(FakerItemReader.class));
setName(ClassUtils.getShortName(getClass()));
Assert.notNull(generator, "A generator is required");
setMaxItemCount(count);
this.generator = generator;
Expand Down
13 changes: 11 additions & 2 deletions core/riot-core/src/main/java/com/redis/riot/core/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand All @@ -25,12 +27,18 @@
import com.redis.riot.core.resource.XmlItemReaderBuilder;
import com.redis.riot.core.resource.XmlObjectReader;

public interface FileUtils {
public class FileUtils {

private static Logger log = Logger.getLogger(FileUtils.class.getName());

public static final String GS_URI_PREFIX = "gs://";
public static final String S3_URI_PREFIX = "s3://";

public static final Pattern EXTENSION_PATTERN = Pattern.compile("(?i)\\.(?<extension>\\w+)(?:\\.(?<gz>gz))?$");

private FileUtils() {

}

public static boolean isGzip(String file) {
return extensionGroup(file, "gz").isPresent();
Expand Down Expand Up @@ -117,7 +125,8 @@ public static List<Path> expand(Path path) {
stream.iterator().forEachRemaining(paths::add);
return paths;
} catch (IOException e) {
throw new RuntimeException("Could not expand file " + path, e);
log.severe("Could not expand path " + path);
return Collections.emptyList();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package com.redis.riot.core;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.reader.KeyComparison;

import io.lettuce.core.ScoredValue;
import io.lettuce.core.StreamMessage;

@SuppressWarnings("unchecked")
public class KeyComparisonLogger {

/**
* Represents a failed index search.
*
*/
public static final int INDEX_NOT_FOUND = -1;

private final Logger log;

public KeyComparisonLogger() {
this(Logger.getLogger(KeyComparisonLogger.class.getName()));
}

public KeyComparisonLogger(Logger logger) {
this.log = logger;
}

public void log(KeyComparison comparison) {
switch (comparison.getStatus()) {
case MISSING:
log.log(Level.WARNING, "Missing key {0}", comparison.getSource().getKey());
break;
case TTL:
log.log(Level.WARNING, "TTL mismatch for key {0}: {1} <> {2}",
new Object[] { comparison.getSource().getKey(), comparison.getSource().getTtl(),
comparison.getTarget().getTtl() });
break;
case TYPE:
log.log(Level.WARNING, "Type mismatch for key {0}: {1} <> {2}",
new Object[] { comparison.getSource().getKey(), comparison.getSource().getType(),
comparison.getTarget().getType() });
break;
case VALUE:
switch (comparison.getSource().getType()) {
case DataStructure.SET:
showSetDiff(comparison);
break;
case DataStructure.LIST:
showListDiff(comparison);
break;
case DataStructure.ZSET:
showSortedSetDiff(comparison);
break;
case DataStructure.STREAM:
showStreamDiff(comparison);
break;
case DataStructure.STRING:
case DataStructure.JSON:
showStringDiff(comparison);
break;
case DataStructure.HASH:
showHashDiff(comparison);
break;
case DataStructure.TIMESERIES:
showListDiff(comparison);
break;
default:
log.log(Level.WARNING, "Value mismatch for key '{}'", comparison.getSource().getKey());
break;
}
break;
case OK:
break;
}
}

private void showHashDiff(KeyComparison comparison) {
Map<String, String> sourceHash = (Map<String, String>) comparison.getSource().getValue();
Map<String, String> targetHash = (Map<String, String>) comparison.getTarget().getValue();
Map<String, String> diff = new HashMap<>();
diff.putAll(sourceHash);
diff.putAll(targetHash);
diff.entrySet()
.removeAll(sourceHash.size() <= targetHash.size() ? sourceHash.entrySet() : targetHash.entrySet());
log.log(Level.WARNING, "Value mismatch for hash {0} on fields: {1}",
new Object[] { comparison.getSource().getKey(), diff.keySet() });
}

private void showStringDiff(KeyComparison comparison) {
String sourceString = (String) comparison.getSource().getValue();
String targetString = (String) comparison.getTarget().getValue();
int diffIndex = indexOfDifference(sourceString, targetString);
log.log(Level.WARNING, "Value mismatch for string {0} at offset {1}",
new Object[] { comparison.getSource().getKey(), diffIndex });
}

/**
* <p>
* Compares two CharSequences, and returns the index at which the CharSequences
* begin to differ.
* </p>
*
* <p>
* For example, {@code indexOfDifference("i am a machine", "i am a robot") -> 7}
* </p>
*
* <pre>
* StringUtils.indexOfDifference(null, null) = -1
* StringUtils.indexOfDifference("", "") = -1
* StringUtils.indexOfDifference("", "abc") = 0
* StringUtils.indexOfDifference("abc", "") = 0
* StringUtils.indexOfDifference("abc", "abc") = -1
* StringUtils.indexOfDifference("ab", "abxyz") = 2
* StringUtils.indexOfDifference("abcde", "abxyz") = 2
* StringUtils.indexOfDifference("abcde", "xyz") = 0
* </pre>
*
* @param cs1 the first CharSequence, may be null
* @param cs2 the second CharSequence, may be null
* @return the index where cs1 and cs2 begin to differ; -1 if they are equal
* @since 2.0
* @since 3.0 Changed signature from indexOfDifference(String, String) to
* indexOfDifference(CharSequence, CharSequence)
*/
private static int indexOfDifference(final CharSequence cs1, final CharSequence cs2) {
if (cs1 == cs2) {
return INDEX_NOT_FOUND;
}
if (cs1 == null || cs2 == null) {
return 0;
}
int i;
for (i = 0; i < cs1.length() && i < cs2.length(); ++i) {
if (cs1.charAt(i) != cs2.charAt(i)) {
break;
}
}
if (i < cs2.length() || i < cs1.length()) {
return i;
}
return INDEX_NOT_FOUND;
}

private void showListDiff(KeyComparison comparison) {
List<?> sourceList = (List<?>) comparison.getSource().getValue();
List<?> targetList = (List<?>) comparison.getTarget().getValue();
if (sourceList.size() != targetList.size()) {
log.log(Level.WARNING, "Size mismatch for {0} {1}: {2} <> {3}",
new Object[] { comparison.getSource().getType(), comparison.getSource().getKey(), sourceList.size(),
targetList.size() });
return;
}
List<Integer> diff = new ArrayList<>();
for (int index = 0; index < sourceList.size(); index++) {
if (!sourceList.get(index).equals(targetList.get(index))) {
diff.add(index);
}
}
log.log(Level.WARNING, "Value mismatch for {0} {1} at indexes {2}",
new Object[] { comparison.getSource().getType(), comparison.getSource().getKey(), diff });
}

private void showSetDiff(KeyComparison comparison) {
Set<String> sourceSet = (Set<String>) comparison.getSource().getValue();
Set<String> targetSet = (Set<String>) comparison.getTarget().getValue();
Set<String> missing = new HashSet<>(sourceSet);
missing.removeAll(targetSet);
Set<String> extra = new HashSet<>(targetSet);
extra.removeAll(sourceSet);
log.log(Level.WARNING, "Value mismatch for set {0}: {1} <> {2}",
new Object[] { comparison.getSource().getKey(), missing, extra });
}

private void showSortedSetDiff(KeyComparison comparison) {
List<ScoredValue<String>> sourceList = (List<ScoredValue<String>>) comparison.getSource().getValue();
List<ScoredValue<String>> targetList = (List<ScoredValue<String>>) comparison.getTarget().getValue();
List<ScoredValue<String>> missing = new ArrayList<>(sourceList);
missing.removeAll(targetList);
List<ScoredValue<String>> extra = new ArrayList<>(targetList);
extra.removeAll(sourceList);
log.log(Level.WARNING, "Value mismatch for sorted set {0}: {1} <> {2}",
new Object[] { comparison.getSource().getKey(), print(missing), print(extra) });
}

private List<String> print(List<ScoredValue<String>> list) {
return list.stream().map(v -> v.getValue() + "@" + v.getScore()).collect(Collectors.toList());
}

private void showStreamDiff(KeyComparison comparison) {
List<StreamMessage<String, String>> sourceMessages = (List<StreamMessage<String, String>>) comparison
.getSource().getValue();
List<StreamMessage<String, String>> targetMessages = (List<StreamMessage<String, String>>) comparison
.getTarget().getValue();
List<StreamMessage<String, String>> missing = new ArrayList<>(sourceMessages);
missing.removeAll(targetMessages);
List<StreamMessage<String, String>> extra = new ArrayList<>(targetMessages);
extra.removeAll(sourceMessages);
log.log(Level.WARNING, "Value mismatch for stream {0}: {1} <> {2}",
new Object[] { comparison.getSource().getKey(),
missing.stream().map(StreamMessage::getId).collect(Collectors.toList()),
extra.stream().map(StreamMessage::getId).collect(Collectors.toList()) });
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.redis.riot.core;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.redis.spring.batch.common.DelegatingItemStreamSupport;
import com.redis.spring.batch.reader.PollableItemReader;

public class ThrottledItemReader<T> extends DelegatingItemStreamSupport
implements ItemStreamReader<T>, PollableItemReader<T> {

private final ItemReader<T> delegate;
private final long sleep;

public ThrottledItemReader(ItemReader<T> delegate, Duration sleepDuration) {
super(delegate);
setName(ClassUtils.getShortName(getClass()));
Assert.notNull(delegate, "Reader delegate must not be null");
Assert.notNull(sleepDuration, "Sleep duration must not be null");
Assert.isTrue(!sleepDuration.isNegative() && !sleepDuration.isZero(),
"Sleep duration must be strictly positive");
this.delegate = delegate;
this.sleep = sleepDuration.toMillis();
}

@Override
public void setName(String name) {
super.setName(name);
if (delegate instanceof ItemStreamSupport) {
((ItemStreamSupport) delegate).setName(name);
}
}

@Override
public T read() throws Exception {
sleep();
return delegate.read();
}

@Override
public T poll(long timeout, TimeUnit unit) throws InterruptedException, PollingException {
sleep();
return ((PollableItemReader<T>) delegate).poll(timeout, unit);
}

private void sleep() throws InterruptedException {
Thread.sleep(sleep);
}

}

This file was deleted.

Loading

0 comments on commit bdbd2ac

Please sign in to comment.