Skip to content

Commit

Permalink
refactor: Removed redis connection init
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 19, 2024
1 parent 75f369d commit d1865f1
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.springframework.expression.Expression;
import org.springframework.util.Assert;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.sync.RediSearchCommands;
import com.redis.lettucemod.search.Field;
import com.redis.lettucemod.search.IndexInfo;
Expand Down Expand Up @@ -83,10 +84,13 @@ private Map<String, Expression> fields() {

private Map<String, Expression> searchIndexFields() {
Map<String, Expression> searchFields = new LinkedHashMap<>();
RediSearchCommands<String, String> commands = getRedisConnection().sync();
IndexInfo info = RedisModulesUtils.indexInfo(commands.ftInfo(searchIndex));
for (Field<String> field : info.getFields()) {
searchFields.put(field.getName(), RiotUtils.parse(expression(field)));
try (StatefulRedisModulesConnection<String, String> connection = RedisModulesUtils
.connection(getRedisClient())) {
RediSearchCommands<String, String> commands = connection.sync();
IndexInfo info = RedisModulesUtils.indexInfo(commands.ftInfo(searchIndex));
for (Field<String> field : info.getFields()) {
searchFields.put(field.getName(), RiotUtils.parse(expression(field)));
}
}
return searchFields;
}
Expand Down
72 changes: 35 additions & 37 deletions connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.redis.riot.redis;

import java.io.PrintWriter;
import java.time.Duration;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -14,6 +13,9 @@
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.util.Assert;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.AbstractRunnable;

import io.lettuce.core.metrics.CommandMetrics.CommandLatency;
Expand All @@ -32,20 +34,11 @@ public class Ping extends AbstractRunnable {
private TimeUnit timeUnit = DEFAULT_TIME_UNIT;
private boolean latencyDistribution;
private double[] percentiles = DEFAULT_PERCENTILES;
private Duration sleep;

public void setOut(PrintWriter out) {
this.out = out;
}

public void setSleep(Duration sleep) {
this.sleep = sleep;
}

public Duration getSleep() {
return sleep;
}

public int getIterations() {
return iterations;
}
Expand Down Expand Up @@ -92,39 +85,44 @@ protected Job job() {
CallableTaskletAdapter tasklet = new CallableTaskletAdapter();
tasklet.setCallable(this::call);
step.setName(getName());
step.setTransactionManager(getJobFactory().getPlatformTransactionManager());
step.setJobRepository(getJobFactory().getJobRepository());
step.setTasklet(tasklet);
return jobBuilder().start(step).build();
}

private RepeatStatus call() {
for (int iteration = 0; iteration < iterations; iteration++) {
LatencyStats stats = new LatencyStats();
for (int index = 0; index < count; index++) {
long startTime = System.nanoTime();
String reply = getRedisConnection().sync().ping();
Assert.isTrue("pong".equalsIgnoreCase(reply), "Invalid PING reply received: " + reply);
stats.recordLatency(System.nanoTime() - startTime);
}
Histogram histogram = stats.getIntervalHistogram();
if (latencyDistribution) {
histogram.outputPercentileDistribution(System.out, (double) timeUnit.toNanos(1));
}
Map<Double, Long> percentileMap = new TreeMap<>();
for (double targetPercentile : percentiles) {
long percentile = toTimeUnit(histogram.getValueAtPercentile(targetPercentile));
percentileMap.put(targetPercentile, percentile);
}
long min = toTimeUnit(histogram.getMinValue());
long max = toTimeUnit(histogram.getMaxValue());
CommandLatency latency = new CommandLatency(min, max, percentileMap);
out.println(latency.toString());
if (sleep != null) {
try {
Thread.sleep(sleep.toMillis());
} catch (InterruptedException e) {
// Restore interrupted state...
Thread.currentThread().interrupt();
try (StatefulRedisModulesConnection<String, String> connection = RedisModulesUtils
.connection(getRedisClient())) {
RedisModulesCommands<String, String> commands = connection.sync();
for (int iteration = 0; iteration < iterations; iteration++) {
LatencyStats stats = new LatencyStats();
for (int index = 0; index < count; index++) {
long startTime = System.nanoTime();
String reply = commands.ping();
Assert.isTrue("pong".equalsIgnoreCase(reply), "Invalid PING reply received: " + reply);
stats.recordLatency(System.nanoTime() - startTime);
}
Histogram histogram = stats.getIntervalHistogram();
if (latencyDistribution) {
histogram.outputPercentileDistribution(System.out, (double) timeUnit.toNanos(1));
}
Map<Double, Long> percentileMap = new TreeMap<>();
for (double targetPercentile : percentiles) {
long percentile = toTimeUnit(histogram.getValueAtPercentile(targetPercentile));
percentileMap.put(targetPercentile, percentile);
}
long min = toTimeUnit(histogram.getMinValue());
long max = toTimeUnit(histogram.getMaxValue());
CommandLatency latency = new CommandLatency(min, max, percentileMap);
out.println(latency.toString());
if (getSleep() != null) {
try {
Thread.sleep(getSleep().toMillis());
} catch (InterruptedException e) {
// Restore interrupted state...
Thread.currentThread().interrupt();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.spel.support.StandardEvaluationContext;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.AbstractExport;
import com.redis.riot.core.RedisClientOptions;
import com.redis.riot.core.RedisWriterOptions;
Expand Down Expand Up @@ -159,8 +161,9 @@ protected <I, O> FaultTolerantStepBuilder<I, O> step(String name, ItemReader<I>
}

private void checkKeyspaceNotificationEnabled() {
try {
String config = getRedisConnection().sync().configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS)
try (StatefulRedisModulesConnection<String, String> connection = RedisModulesUtils
.connection(getRedisClient())) {
String config = connection.sync().configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS)
.getOrDefault(CONFIG_NOTIFY_KEYSPACE_EVENTS, "");
if (!config.contains("K")) {
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.expression.spel.support.StandardEvaluationContext;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.function.DropStreamMessageIdFunction;
import com.redis.riot.core.function.ExpressionFunction;
import com.redis.riot.core.function.KeyValueOperator;
Expand Down Expand Up @@ -43,7 +45,8 @@ protected <K> ItemProcessor<KeyValue<K, Object>, KeyValue<K, Object>> processor(

protected StandardEvaluationContext evaluationContext() {
StandardEvaluationContext evaluationContext = evaluationContextOptions.evaluationContext();
evaluationContext.setVariable(REDIS_VAR, getRedisConnection().sync());
StatefulRedisModulesConnection<String, String> connection = RedisModulesUtils.connection(getRedisClient());
evaluationContext.setVariable(REDIS_VAR, connection.sync());
return evaluationContext;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.redis.riot.core;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisURI;

Expand All @@ -12,16 +9,13 @@ public abstract class AbstractRunnable extends AbstractJobRunnable {

private RedisURI redisURI;
private AbstractRedisClient redisClient;
private StatefulRedisModulesConnection<String, String> redisConnection;

@Override
public void run() {
redisURI = redisClientOptions.redisURI();
try {
redisClient = redisClientOptions.client(redisURI);
redisConnection = RedisModulesUtils.connection(redisClient);
super.run();
redisConnection.close();
} finally {
redisClient.close();
redisClient.getResources().shutdown();
Expand All @@ -44,8 +38,4 @@ protected AbstractRedisClient getRedisClient() {
return redisClient;
}

protected StatefulRedisModulesConnection<String, String> getRedisConnection() {
return redisConnection;
}

}

0 comments on commit d1865f1

Please sign in to comment.