Skip to content

Commit

Permalink
refactor: Streamlined commands and added logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Jul 10, 2023
1 parent c2f3560 commit 92002ec
Show file tree
Hide file tree
Showing 53 changed files with 1,421 additions and 1,348 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.redis.riot.cli.common;
package com.redis.riot.core;

import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -11,14 +11,14 @@
import com.redis.spring.batch.writer.KeyComparisonCountItemWriter;
import com.redis.spring.batch.writer.KeyComparisonCountItemWriter.Results;

public class CompareStepListener extends StepExecutionListenerSupport {

private static final Logger log = Logger.getLogger(CompareStepListener.class.getName());
public class CompareStepExecutionListener extends StepExecutionListenerSupport {

private final Logger log;
private final KeyComparisonCountItemWriter writer;

public CompareStepListener(KeyComparisonCountItemWriter writer) {
public CompareStepExecutionListener(KeyComparisonCountItemWriter writer, Logger logger) {
this.writer = writer;
this.log = logger;
}

@Override
Expand All @@ -28,12 +28,17 @@ public ExitStatus afterStep(StepExecution stepExecution) {
}
Results results = writer.getResults();
if (results.getTotalCount() == results.getCount(Status.OK)) {
log.info("Verification completed - all OK");
log.info("Verification completed: all OK");
return ExitStatus.COMPLETED;
}
log.log(Level.SEVERE, "Verification failed: {0} ok, {1} missing, {4} type, {2} value, {3} ttl",
new Object[] { results.getCount(Status.OK), results.getCount(Status.MISSING),
results.getCount(Status.VALUE), results.getCount(Status.TTL), results.getCount(Status.TYPE) });
severe("Verification failed:\n OK: {1}\n Missing: {2}\n Type: {3}\n Value: {4}\n TTL: {5}\n Total: {0}",
results.getTotalCount(), results.getCount(Status.OK), results.getCount(Status.MISSING),
results.getCount(Status.TYPE), results.getCount(Status.VALUE), results.getCount(Status.TTL));
return new ExitStatus(ExitStatus.FAILED.getExitCode(), "Verification failed");
}

private void severe(String msg, Object... params) {
log.log(Level.SEVERE, msg, params);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,25 @@ public class FakerItemReader extends AbstractItemCountingItemStreamItemReader<Ma
private boolean includeMetadata;

private EvaluationContext context;
private int maxItemCount = Integer.MAX_VALUE;

public FakerItemReader() {
setName(ClassUtils.getShortName(getClass()));
}

@Override
public void setMaxItemCount(int count) {
super.setMaxItemCount(count);
this.maxItemCount = count;
}

public int size() {
if (maxItemCount == Integer.MAX_VALUE) {
return -1;
}
return maxItemCount - getCurrentItemCount();
}

public FakerItemReader withIndexRange(IntRange range) {
this.indexRange = range;
return this;
Expand Down
53 changes: 21 additions & 32 deletions core/riot-core/src/main/java/com/redis/riot/core/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
Expand All @@ -27,29 +26,22 @@
import com.redis.riot.core.resource.XmlItemReaderBuilder;
import com.redis.riot.core.resource.XmlObjectReader;

public class FileUtils {

private static Logger log = Logger.getLogger(FileUtils.class.getName());
public interface FileUtils {

public static final String GS_URI_PREFIX = "gs://";
public static final String S3_URI_PREFIX = "s3://";

public static final Pattern EXTENSION_PATTERN = Pattern.compile("(?i)\\.(?<extension>\\w+)(?:\\.(?<gz>gz))?$");

private FileUtils() {

}

public static boolean isGzip(String file) {
static boolean isGzip(String file) {
return extensionGroup(file, "gz").isPresent();
}

public static FileExtension extension(Resource resource) {
static FileExtension extension(Resource resource) {
return extensionGroup(resource.getFilename(), "extension")
.orElseThrow(() -> new UnknownFileTypeException("Could not determine file extension"));
}

public static Optional<FileExtension> extensionGroup(String file, String group) {
static Optional<FileExtension> extensionGroup(String file, String group) {
Matcher matcher = EXTENSION_PATTERN.matcher(file);
if (matcher.find()) {
String extensionString = matcher.group(group);
Expand All @@ -66,23 +58,23 @@ public static Optional<FileExtension> extensionGroup(String file, String group)
return Optional.empty();
}

public static boolean isFile(String file) {
static boolean isFile(String file) {
return !(isGcs(file) || isS3(file) || ResourceUtils.isUrl(file) || isConsole(file));
}

public static boolean isConsole(String file) {
static boolean isConsole(String file) {
return "-".equals(file);
}

public static boolean isS3(String file) {
static boolean isS3(String file) {
return file.startsWith(S3_URI_PREFIX);
}

public static boolean isGcs(String file) {
static boolean isGcs(String file) {
return file.startsWith(GS_URI_PREFIX);
}

public static <T> JsonItemReader<T> jsonReader(Resource resource, Class<T> clazz) {
static <T> JsonItemReader<T> jsonReader(Resource resource, Class<T> clazz) {
JsonItemReaderBuilder<T> jsonReaderBuilder = new JsonItemReaderBuilder<>();
jsonReaderBuilder.name(resource.getFilename() + "-json-file-reader");
jsonReaderBuilder.resource(resource);
Expand All @@ -92,7 +84,7 @@ public static <T> JsonItemReader<T> jsonReader(Resource resource, Class<T> clazz
return jsonReaderBuilder.build();
}

public static <T> XmlItemReader<T> xmlReader(Resource resource, Class<T> clazz) {
static <T> XmlItemReader<T> xmlReader(Resource resource, Class<T> clazz) {
XmlItemReaderBuilder<T> xmlReaderBuilder = new XmlItemReaderBuilder<>();
xmlReaderBuilder.name(resource.getFilename() + "-xml-file-reader");
xmlReaderBuilder.resource(resource);
Expand All @@ -108,14 +100,14 @@ public static <T> XmlItemReader<T> xmlReader(Resource resource, Class<T> clazz)
* @return List of file
* @throws IOException
*/
public static Stream<String> expand(String filename) {
static Stream<String> expand(String filename) throws IOException {
if (isFile(filename)) {
return expand(Paths.get(filename)).stream().map(Path::toString);
}
return Stream.of(filename);
}

public static List<Path> expand(Path path) {
static List<Path> expand(Path path) throws IOException {
if (Files.exists(path) || path.getParent() == null || !Files.exists(path.getParent())) {
return Arrays.asList(path);
}
Expand All @@ -124,20 +116,17 @@ public static List<Path> expand(Path path) {
List<Path> paths = new ArrayList<>();
stream.iterator().forEachRemaining(paths::add);
return paths;
} catch (IOException e) {
log.severe("Could not expand path " + path);
return Collections.emptyList();
}
}

public static class UnknownFileTypeException extends RuntimeException {

private static final long serialVersionUID = 1L;

public UnknownFileTypeException(String message) {
super(message);
}

static Stream<String> expandAll(Iterable<String> files) {
return StreamSupport.stream(files.spliterator(), false).flatMap(f -> {
try {
return FileUtils.expand(f);
} catch (IOException e) {
throw new RuntimeIOException(e);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,12 @@
import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.KeyComparison.Status;

public class KeyComparisonLogger implements ItemWriteListener<KeyComparison> {
public class KeyComparisonDiffLogger implements ItemWriteListener<KeyComparison> {

private final Logger log;

public KeyComparisonLogger() {
this(Logger.getLogger(KeyComparisonLogger.class.getName()));
}

public KeyComparisonLogger(Logger logger) {
public KeyComparisonDiffLogger(Logger logger) {
this.log = logger;

}

@Override
public void onWriteError(Exception exception, List<? extends KeyComparison> items) {
// do nothing
}

@Override
Expand All @@ -37,6 +27,11 @@ public void afterWrite(List<? extends KeyComparison> items) {
items.stream().filter(c -> c.getStatus() != Status.OK).forEach(this::log);
}

@Override
public void onWriteError(Exception exception, List<? extends KeyComparison> items) {
// do nothing
}

public void log(KeyComparison comparison) {
switch (comparison.getStatus()) {
case MISSING:
Expand All @@ -59,7 +54,6 @@ public void log(KeyComparison comparison) {
}

private void log(String msg, Object... params) {
log.log(Level.SEVERE, msg + "\n", params);
log.log(Level.SEVERE, msg, params);
}

}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.redis.riot.cli.common;
package com.redis.riot.core;

public class RuntimeIOException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.redis.riot.core;

public class UnknownFileTypeException extends RuntimeException {

private static final long serialVersionUID = 1L;

public UnknownFileTypeException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.redis.riot.core.logging;

import java.util.logging.Level;

public class RiotLevel extends Level {

/**
* Custom level between INFO and WARN
*/
public static final Level LIFECYCLE = new RiotLevel("LIFECYCLE", 850);

private static final long serialVersionUID = 1L;

public RiotLevel(String name, int value) {
super(name, value);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.redis.riot.core.logging;

import java.util.logging.Formatter;
import java.util.logging.LogRecord;

import org.springframework.core.NestedExceptionUtils;

public class SingleLineFormatter extends Formatter {

@Override
public String format(LogRecord logRecord) {
String message = formatMessage(logRecord);
if (logRecord.getThrown() != null) {
Throwable rootCause = NestedExceptionUtils.getRootCause(logRecord.getThrown());
if (rootCause != null && rootCause.getMessage() != null) {
return String.format("%s: %s%n", message, rootCause.getMessage());
}
}
return String.format("%s%n", message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.redis.riot.core.logging;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.TimeZone;
import java.util.logging.Formatter;
import java.util.logging.LogRecord;

public class StackTraceSingleLineFormatter extends Formatter {

private final DateTimeFormatter d = new DateTimeFormatterBuilder().appendValue(ChronoField.HOUR_OF_DAY, 2)
.appendLiteral(':').appendValue(ChronoField.MINUTE_OF_HOUR, 2).optionalStart().appendLiteral(':')
.appendValue(ChronoField.SECOND_OF_MINUTE, 2).optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 3, 3, true).toFormatter();

private final ZoneId offset = TimeZone.getDefault().toZoneId();

@Override
public String format(LogRecord logRecord) {
String message = formatMessage(logRecord);
ZonedDateTime time = Instant.ofEpochMilli(logRecord.getMillis()).atZone(offset);
if (logRecord.getThrown() == null) {
return String.format("%s %s %s\t: %s%n", time.format(d), logRecord.getLevel().getLocalizedName(),
logRecord.getLoggerName(), message);
}
return String.format("%s %s %s\t: %s%n%s%n", time.format(d), logRecord.getLevel().getLocalizedName(),
logRecord.getLoggerName(), message, stackTrace(logRecord));
}

private String stackTrace(LogRecord logRecord) {
StringWriter sw = new StringWriter(4096);
PrintWriter pw = new PrintWriter(sw);
logRecord.getThrown().printStackTrace(pw);
return sw.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@

import org.springframework.batch.item.ItemProcessor;

import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.KeyValue;

import io.lettuce.core.ScoredValue;
import io.lettuce.core.StreamMessage;

public class DataStructureProcessor implements ItemProcessor<DataStructure<String>, DataStructure<String>> {
public class DataStructureProcessor implements ItemProcessor<KeyValue<String>, KeyValue<String>> {

@SuppressWarnings("unchecked")
@Override
public DataStructure<String> process(DataStructure<String> item) throws Exception {
public KeyValue<String> process(KeyValue<String> item) throws Exception {
if (item.getType() == null) {
return item;
}
if (DataStructure.ZSET.equals(item.getType())) {
if (KeyValue.ZSET.equals(item.getType())) {
Collection<Map<String, Object>> zset = (Collection<Map<String, Object>>) item.getValue();
Collection<ScoredValue<String>> values = new ArrayList<>(zset.size());
for (Map<String, Object> map : zset) {
Expand All @@ -30,7 +30,7 @@ public DataStructure<String> process(DataStructure<String> item) throws Exceptio
item.setValue(values);
return item;
}
if (DataStructure.STREAM.equals(item.getType())) {
if (KeyValue.STREAM.equals(item.getType())) {
Collection<Map<String, Object>> stream = (Collection<Map<String, Object>>) item.getValue();
Collection<StreamMessage<String, String>> messages = new ArrayList<>(stream.size());
for (Map<String, Object> message : stream) {
Expand Down
Loading

0 comments on commit 92002ec

Please sign in to comment.