Skip to content

Commit

Permalink
Shut down Lettuce clients after job completed. Fixes #68
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Sep 23, 2021
1 parent 5e95637 commit 0c45e79
Show file tree
Hide file tree
Showing 26 changed files with 227 additions and 268 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.redis.riot.file;

import com.redis.riot.AbstractTransferCommand;
import com.redis.riot.RedisOptions;
import com.redis.riot.RiotStepBuilder;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -64,11 +63,7 @@ private DumpFileType fileType(String file) {
}

private ItemWriter<DataStructure> writer() {
RedisOptions redisOptions = getRedisOptions();
if (redisOptions.isCluster()) {
return DataStructureItemWriter.client(redisOptions.clusterClient()).poolConfig(redisOptions.poolConfig()).build();
}
return DataStructureItemWriter.client(redisOptions.client()).poolConfig(redisOptions.poolConfig()).build();
return new DataStructureItemWriter.DataStructureItemWriterBuilder(getRedisOptions().client()).poolConfig(getRedisOptions().poolConfig()).build();
}

protected AbstractItemStreamItemReader<DataStructure> reader(DumpFileType fileType, Resource resource) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package com.redis.riot.gen;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.Utils;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.search.Field;
import com.redis.lettucemod.api.search.IndexInfo;
import com.redis.lettucemod.api.sync.RediSearchCommands;
import com.redis.riot.AbstractImportCommand;
import com.redis.riot.MapProcessorOptions;
import com.redis.riot.RedisOptions;
import com.redis.riot.RiotStepBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
Expand Down Expand Up @@ -74,15 +72,12 @@ private String expression(Field field) {

private Map<String, String> fieldsFromIndex(String index) {
Map<String, String> fields = new LinkedHashMap<>();
RedisModulesClient client = RedisModulesClient.create(getRedisOptions().uris().get(0));
try (StatefulRedisModulesConnection<String, String> connection = client.connect()) {
try (StatefulRedisModulesConnection<String, String> connection = getRedisOptions().connect()) {
RediSearchCommands<String, String> commands = connection.sync();
IndexInfo info = Utils.indexInfo(commands.indexInfo(index));
for (Field field : info.getFields()) {
fields.put(field.getName(), expression(field));
}
} finally {
RedisOptions.shutdown(client);
}
return fields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public void genFakerIndexIntrospection() throws Exception {
Document<String, String> doc1 = results.get(0);
Assertions.assertNotNull(doc1.get(FIELD_ABV));
connection.close();
RedisOptions.shutdown(modulesClient);
modulesClient.shutdown();
modulesClient.getResources().shutdown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.redis.riot.redis;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.riot.AbstractRiotCommand;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.util.ClassUtils;
import picocli.CommandLine.Command;

@Command
public abstract class AbstractRedisCommandCommand extends AbstractRiotCommand {

@Override
protected Flow flow(StepBuilderFactory stepBuilderFactory) {

return flow(stepBuilderFactory.get(ClassUtils.getShortName(getClass()) + "-step").tasklet((contribution, chunkContext) -> {
try (StatefulRedisModulesConnection<String, String> connection = getRedisOptions().connect()) {
RedisModulesCommands<String, String> commands = connection.sync();
execute(commands);
return RepeatStatus.FINISHED;
}
}).build());
}

protected abstract void execute(RedisModulesCommands<String, String> commands) throws InterruptedException;


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.riot.redis;

import com.redis.riot.FlushingTransferOptions;
import com.redis.riot.KeyValueProcessorOptions;
import com.redis.riot.RedisOptions;
import com.redis.riot.RiotStepBuilder;
Expand All @@ -26,6 +27,8 @@
@Slf4j
public abstract class AbstractReplicateCommand<T extends KeyValue<?>> extends AbstractTargetCommand {

@CommandLine.Mixin
private FlushingTransferOptions flushingTransferOptions = new FlushingTransferOptions();
@SuppressWarnings("unused")
@CommandLine.Mixin
private ReplicationOptions replicationOptions = new ReplicationOptions();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package com.redis.riot.redis;

import com.redis.riot.*;
import com.redis.riot.AbstractTransferCommand;
import com.redis.riot.RedisOptions;
import com.redis.riot.RedisReaderOptions;
import com.redis.riot.RiotStepBuilder;
import com.redis.riot.TransferOptions;
import io.lettuce.core.AbstractRedisClient;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
Expand All @@ -24,7 +30,7 @@
@Data
@EqualsAndHashCode(callSuper = true)
@Slf4j
public abstract class AbstractTargetCommand extends AbstractFlushingTransferCommand {
public abstract class AbstractTargetCommand extends AbstractTransferCommand {

private static final String ASCII_COMPARE_MESSAGE_FORMAT = ">%,d T%,d ≠%,d ⧗%,d <%,d";
private static final String COLORFUL_COMPARE_MESSAGE_FORMAT = "\u001b[31m>%,d \u001b[33mT%,d \u001b[35m≠%,d \u001b[36m⧗%,d\u001b[0m";
Expand All @@ -36,6 +42,12 @@ public abstract class AbstractTargetCommand extends AbstractFlushingTransferComm
@CommandLine.Mixin
private CompareOptions compareOptions = new CompareOptions();

@Override
public void afterJob(JobExecution jobExecution) {
targetRedisOptions.shutdown();
super.afterJob(jobExecution);
}

protected void initialMax(RiotStepBuilder<?, ?> step) {
step.initialMax(readerOptions.initialMaxSupplier(getRedisOptions()));
}
Expand Down Expand Up @@ -93,18 +105,12 @@ private String extraMessageFormat() {
}

protected KeyValueItemReader<DataStructure> dataStructureReader() {
RedisOptions redisOptions = getRedisOptions();
if (redisOptions.isCluster()) {
return readerOptions.configure(DataStructureItemReader.client(redisOptions.clusterClient()).poolConfig(redisOptions.poolConfig())).build();
}
return readerOptions.configure(DataStructureItemReader.client(redisOptions.client()).poolConfig(redisOptions.poolConfig())).build();
AbstractRedisClient client = getRedisOptions().client();
return readerOptions.configure(new DataStructureItemReader.DataStructureItemReaderBuilder(client, new DataStructureValueReader.DataStructureValueReaderBuilder(client).build()).poolConfig(getRedisOptions().poolConfig())).build();
}

protected DataStructureValueReader targetDataStructureValueReader() {
if (targetRedisOptions.isCluster()) {
return DataStructureValueReader.client(targetRedisOptions.clusterClient()).poolConfig(targetRedisOptions.poolConfig()).build();
}
return DataStructureValueReader.client(targetRedisOptions.client()).poolConfig(targetRedisOptions.poolConfig()).build();
return new DataStructureValueReader.DataStructureValueReaderBuilder(targetRedisOptions.client()).poolConfig(targetRedisOptions.poolConfig()).build();
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.redis.riot.redis;

import io.lettuce.core.api.sync.BaseRedisCommands;
import io.lettuce.core.api.sync.RedisServerCommands;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import picocli.CommandLine.Command;

@Command(name = "info", description = "Display INFO command output")
public class InfoCommand extends AbstractUtilityCommand {
public class InfoCommand extends AbstractRedisCommandCommand {

@Override
@SuppressWarnings("unchecked")
protected void execute(BaseRedisCommands<String, String> commands) {
System.out.println(((RedisServerCommands<String, String>) commands).info());
protected void execute(RedisModulesCommands<String, String> commands) {
System.out.println(commands.info());
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.redis.riot.redis;

import io.lettuce.core.api.sync.BaseRedisCommands;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import io.lettuce.core.metrics.CommandMetrics;
import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions;
import org.HdrHistogram.Histogram;
Expand All @@ -13,44 +13,40 @@
import java.util.concurrent.TimeUnit;

@Command(name = "latency", description = "Calculate latency stats")
public class LatencyCommand extends AbstractUtilityCommand {
public class LatencyCommand extends AbstractRedisCommandCommand {

@Option(names = "--iterations", description = "Number of latency tests (default: ${DEFAULT-VALUE})", paramLabel = "<count>")
private int iterations = 1000;
@Option(names = "--sleep", description = "Sleep duration between calls (default: ${DEFAULT-VALUE})", paramLabel = "<ms>")
private long sleep = 1;
@Option(names = "--unit", description = "Latency unit (default: ${DEFAULT-VALUE})", paramLabel = "<unit>")
private TimeUnit unit = TimeUnit.MILLISECONDS;
@Option(names = "--show-distribution", description = "Show latency distribution")
private boolean showDistribution = false;
@Option(names = "--iterations", description = "Number of latency tests (default: ${DEFAULT-VALUE})", paramLabel = "<count>")
private int iterations = 1000;
@Option(names = "--sleep", description = "Sleep duration between calls (default: ${DEFAULT-VALUE})", paramLabel = "<ms>")
private long sleep = 1;
@Option(names = "--unit", description = "Latency unit (default: ${DEFAULT-VALUE})", paramLabel = "<unit>")
private TimeUnit unit = TimeUnit.MILLISECONDS;
@Option(names = "--show-distribution", description = "Show latency distribution")
private boolean showDistribution = false;

@Override
protected void execute(BaseRedisCommands<String, String> commands) {
LatencyStats stats = new LatencyStats();
for (int index = 0; index < iterations; index++) {
long startTime = System.nanoTime();
commands.ping();
stats.recordLatency(System.nanoTime() - startTime);
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
return;
}
}
Histogram histogram = stats.getIntervalHistogram();
if (showDistribution) {
histogram.outputPercentileDistribution(System.out, 1000000.0);
} else {
DefaultCommandLatencyCollectorOptions options = DefaultCommandLatencyCollectorOptions.create();
Map<Double, Long> percentiles = new TreeMap<>();
for (double targetPercentile : options.targetPercentiles()) {
percentiles.put(targetPercentile,
unit.convert(histogram.getValueAtPercentile(targetPercentile), TimeUnit.NANOSECONDS));
}
CommandMetrics.CommandLatency latency = new CommandMetrics.CommandLatency(
unit.convert(histogram.getMinValue(), TimeUnit.NANOSECONDS),
unit.convert(histogram.getMaxValue(), TimeUnit.NANOSECONDS), percentiles);
System.out.println(latency);
}
}
@Override
protected void execute(RedisModulesCommands<String, String> commands) throws InterruptedException {
LatencyStats stats = new LatencyStats();
for (int index = 0; index < iterations; index++) {
long startTime = System.nanoTime();
commands.ping();
stats.recordLatency(System.nanoTime() - startTime);
Thread.sleep(sleep);
}
Histogram histogram = stats.getIntervalHistogram();
if (showDistribution) {
histogram.outputPercentileDistribution(System.out, 1000000.0);
} else {
DefaultCommandLatencyCollectorOptions options = DefaultCommandLatencyCollectorOptions.create();
Map<Double, Long> percentiles = new TreeMap<>();
for (double targetPercentile : options.targetPercentiles()) {
percentiles.put(targetPercentile,
unit.convert(histogram.getValueAtPercentile(targetPercentile), TimeUnit.NANOSECONDS));
}
CommandMetrics.CommandLatency latency = new CommandMetrics.CommandLatency(
unit.convert(histogram.getMinValue(), TimeUnit.NANOSECONDS),
unit.convert(histogram.getMaxValue(), TimeUnit.NANOSECONDS), percentiles);
System.out.println(latency);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.redis.riot.redis;

import io.lettuce.core.api.sync.BaseRedisCommands;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import picocli.CommandLine.Command;

@Command(name = "ping", description = "Execute PING command")
public class PingCommand extends AbstractUtilityCommand {
public class PingCommand extends AbstractRedisCommandCommand {

@Override
protected void execute(BaseRedisCommands<String, String> commands) {
protected void execute(RedisModulesCommands<String, String> commands) {
System.out.println("Received ping reply: " + commands.ping());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.redis.riot.redis;

import com.redis.riot.RedisOptions;
import io.lettuce.core.AbstractRedisClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.redis.DataStructureItemReader;
import org.springframework.batch.item.redis.DataStructureItemWriter;
import org.springframework.batch.item.redis.support.DataStructure;
import org.springframework.batch.item.redis.support.DataStructureValueReader;
import org.springframework.batch.item.redis.support.PollableItemReader;
import picocli.CommandLine;

Expand All @@ -21,17 +23,12 @@ protected ItemReader<DataStructure> reader(RedisOptions redisOptions) {

@Override
protected PollableItemReader<DataStructure> liveReader(RedisOptions redisOptions) {
if (redisOptions.isCluster()) {
return configure(DataStructureItemReader.client(redisOptions.clusterClient()).live()).build();
}
return configure(DataStructureItemReader.client(redisOptions.client()).live()).build();
AbstractRedisClient client = redisOptions.client();
return configure(new DataStructureItemReader.DataStructureItemReaderBuilder(client, new DataStructureValueReader.DataStructureValueReaderBuilder(client).build()).live()).build();
}

@Override
protected ItemWriter<DataStructure> writer(RedisOptions redisOptions) {
if (redisOptions.isCluster()) {
return DataStructureItemWriter.client(redisOptions.clusterClient()).poolConfig(redisOptions.poolConfig()).build();
}
return DataStructureItemWriter.client(redisOptions.client()).poolConfig(redisOptions.poolConfig()).build();
return new DataStructureItemWriter.DataStructureItemWriterBuilder(redisOptions.client()).poolConfig(redisOptions.poolConfig()).build();
}
}
Loading

0 comments on commit 0c45e79

Please sign in to comment.