Skip to content

Commit

Permalink
[hibernate#1436] WIP add fail test
Browse files Browse the repository at this point in the history
  • Loading branch information
namjug-kim committed Jan 9, 2023
1 parent c96add1 commit 6d9bbe6
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 19 deletions.
3 changes: 3 additions & 0 deletions hibernate-reactive-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ dependencies {
testImplementation "io.vertx:vertx-mssql-client:${vertxVersion}"
testImplementation "io.vertx:vertx-oracle-client:${vertxVersion}"

// Metrics
testImplementation "io.vertx:vertx-micrometer-metrics:${vertxVersion}"

// Optional dependency of vertx-pg-client, essential when connecting via SASL SCRAM
testImplementation 'com.ongres.scram:client:2.1'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
*/
package org.hibernate.reactive;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.persistence.criteria.CriteriaQuery;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.Timeout;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.micrometer.MicrometerMetricsOptions;
import org.hibernate.SessionFactory;
import org.hibernate.boot.registry.StandardServiceRegistry;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
Expand All @@ -28,22 +32,19 @@
import org.hibernate.reactive.stage.Stage;
import org.hibernate.reactive.testing.SessionFactoryManager;
import org.hibernate.tool.schema.spi.SchemaManagementTool;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.runner.RunWith;

import io.smallrye.mutiny.Uni;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.Timeout;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import javax.persistence.criteria.CriteriaQuery;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.hibernate.reactive.containers.DatabaseConfiguration.dbType;
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
Expand Down Expand Up @@ -95,7 +96,9 @@ public static void setDefaultProperties(Configuration configuration) {

@ClassRule
public static RunTestOnContext vertxContextRule = new RunTestOnContext( () -> {
VertxOptions options = new VertxOptions();
Metrics.addRegistry(new SimpleMeterRegistry());
VertxOptions options = new VertxOptions().setMetricsOptions(new MicrometerMetricsOptions()
.setEnabled(true));
options.setBlockedThreadCheckInterval( 5 );
options.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES );
return Vertx.vertx( options );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive;

import io.micrometer.core.instrument.Metrics;
import io.smallrye.mutiny.subscription.Cancellable;
import io.vertx.ext.unit.TestContext;
import org.jboss.logging.Logger;
import org.junit.Test;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.stream.Stream;

import static java.util.Arrays.stream;

public class CancelSignalTest extends BaseReactiveTest {
private static final Logger LOG = Logger.getLogger( CancelSignalTest.class );

@Override
protected Collection<Class<?>> annotatedEntities() {
return List.of(GuineaPig.class);
}

@Test
public void cleanupConnectionWhenCancelSignal(TestContext context) {
int executeSize = 10;
ExecutorService withSessionExecutor = Executors.newFixedThreadPool(executeSize);
ExecutorService cancelExecutor = Executors.newFixedThreadPool(executeSize);
CountDownLatch firstSessionWaiter = new CountDownLatch(1);
Queue<Cancellable> cancellableQueue = new ConcurrentLinkedQueue<>();

CompletableFuture[] withSessionFutures = stream(new int[executeSize])
.mapToObj(i ->
CompletableFuture.runAsync(
() -> {
CountDownLatch countDownLatch = new CountDownLatch(1);
String name = Thread.currentThread().getName();
Cancellable cancellable = getMutinySessionFactory().withSession(s -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
firstSessionWaiter.countDown();
return s.find(GuineaPig.class, 1);
})
.onTermination().invoke(countDownLatch::countDown)
.subscribe()
.with(item -> LOG.debug("end withSession."));
cancellableQueue.add(cancellable);
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},
withSessionExecutor
)
)
.toArray(CompletableFuture[]::new);

CompletableFuture[] cancelFutures = stream(new int[executeSize])
.mapToObj(i ->
CompletableFuture.runAsync(
() -> {
try {
firstSessionWaiter.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

Cancellable cancellable = cancellableQueue.poll();
cancellable.cancel();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},
cancelExecutor
)
)
.toArray(CompletableFuture[]::new);

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
Stream.concat(stream(withSessionFutures), stream(cancelFutures)).toArray(CompletableFuture[]::new)
);

test(context, voidCompletableFuture
.thenAccept(x ->{
context.assertEquals(Metrics.globalRegistry.find("vertx.sql.queue.pending").gauge().value(), 0.0);
})
);

}

@Entity(name = "GuineaPig")
@Table(name = "Pig")
public static class GuineaPig {
@Id
private Integer id;
private String name;

public GuineaPig() {
}

public GuineaPig(Integer id, String name) {
this.id = id;
this.name = name;
}

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return id + ": " + name;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GuineaPig guineaPig = (GuineaPig) o;
return Objects.equals(name, guineaPig.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}
}
}

0 comments on commit 6d9bbe6

Please sign in to comment.