Skip to content

Commit

Permalink
Name all threads. (#5397)
Browse files Browse the repository at this point in the history
* Name all threads.
* Update server thread name, add test.
* Fix websocket tests (race condition on sending close before last message)
  • Loading branch information
tomas-langer authored Nov 14, 2022
1 parent 3831fbb commit 980696f
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public void close() {

private void run() {
this.thread = Thread.currentThread();
this.thread.setName("[" + socket().socketId() + " " + socket().childSocketId() + "]");
try {
while (run) {
CompositeBufferData toWrite = BufferData.createComposite(writeQueue.take()); // wait if the queue is empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}
};
Future<?> future = executor.get().submit(() -> {
Thread thread = Thread.currentThread();
thread.setName(thread.getName() + ": async");
try {
T t = supplier.get();
result.complete(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public final class FaultTolerance {
private static final AtomicReference<Config> CONFIG = new AtomicReference<>(Config.empty());

static {
EXECUTOR.set(LazyValue.create(Executors.newVirtualThreadPerTaskExecutor()));
EXECUTOR.set(LazyValue.create(() -> Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.name("helidon-ft-", 0)
.factory())));
}

private FaultTolerance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;

Expand Down Expand Up @@ -49,6 +51,16 @@ void testCustomExecutorBuilder() {
assertThat(thread.isVirtual(), is(true));
}

@Test
void testThreadName() throws Exception {
String threadName = Async.create()
.invoke(() -> Thread.currentThread().getName())
.get(10, TimeUnit.SECONDS);

assertThat(threadName, startsWith("helidon-ft-"));
assertThat(threadName, endsWith(": async"));
}

private Thread testAsync(Async async) {
try {
CompletableFuture<Thread> cf = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.tests.integration.server;

import io.helidon.nima.testing.junit5.webserver.ServerTest;
import io.helidon.nima.testing.junit5.webserver.SetUpRoute;
import io.helidon.nima.webclient.http1.Http1Client;
import io.helidon.nima.webserver.http.HttpRules;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsArrayWithSize.arrayWithSize;

@ServerTest
class ThreadNameTest {
private final Http1Client client;

ThreadNameTest(Http1Client client) {
this.client = client;
}

@SetUpRoute
static void routing(HttpRules rules) {
rules.get("/", (req, res) -> {
String socketId = req.serverSocketId();
String clientSocketId = req.socketId();
boolean isVirtual = Thread.currentThread().isVirtual();

res.send(isVirtual + ":" + socketId + ":" + clientSocketId + ":" + Thread.currentThread().getName());
});
}

@Test
void testName() {
String message = client.get()
.request(String.class);
String[] parts = message.split(":");
// the parts must be 4 long
assertThat("Response should have four parts: isVirtual,serverSocketId,connSocketId,threadName", parts, arrayWithSize(4));
assertThat("Thread must be virtual", parts[0], is("true"));
String serverSocket = parts[1];
String clientSocket = parts[2];

assertThat(parts[3], is("[" + serverSocket + " " + clientSocket + "] Nima socket"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ void testOnce() throws Exception {
.get(5, TimeUnit.SECONDS);
ws.request(10);

ws.sendText("Hello", true);
ws.sendClose(CloseCodes.NORMAL_CLOSE, "normal");
ws.sendText("Hello", true).get(5, TimeUnit.SECONDS);
ws.sendClose(CloseCodes.NORMAL_CLOSE, "normal").get(5, TimeUnit.SECONDS);

List<String> results = listener.getResults();
assertThat(results, contains("Hello"));
Expand All @@ -101,9 +101,9 @@ void testMulti() throws Exception {
.get(5, TimeUnit.SECONDS);
ws.request(10);

ws.sendText("First", true);
ws.sendText("Second", true);
ws.sendClose(CloseCodes.NORMAL_CLOSE, "normal");
ws.sendText("First", true).get(5, TimeUnit.SECONDS);
ws.sendText("Second", true).get(5, TimeUnit.SECONDS);
ws.sendClose(CloseCodes.NORMAL_CLOSE, "normal").get(5, TimeUnit.SECONDS);
assertThat(listener.getResults(), contains("First", "Second"));
}

Expand All @@ -116,10 +116,10 @@ void testFragmentedAndMulti() throws Exception {
.get(5, TimeUnit.SECONDS);
ws.request(10);

ws.sendText("First", false);
ws.sendText("Second", true);
ws.sendText("Third", true);
ws.sendClose(CloseCodes.NORMAL_CLOSE, "normal");
ws.sendText("First", false).get(5, TimeUnit.SECONDS);
ws.sendText("Second", true).get(5, TimeUnit.SECONDS);
ws.sendText("Third", true).get(5, TimeUnit.SECONDS);
ws.sendClose(CloseCodes.NORMAL_CLOSE, "normal").get(5, TimeUnit.SECONDS);

assertThat(listener.getResults(), contains("FirstSecond", "Third"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class ConnectionHandler implements Runnable {

@Override
public final void run() {
Thread.currentThread().setName("[" + socket.socketId() + " " + socket.childSocketId() + "] Nima socket");
if (LOGGER.isLoggable(DEBUG)) {
ctx.log(LOGGER, DEBUG, "accepted socket from %s", socket.remotePeer().host());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,18 @@ public boolean hasTls(String socketName) {
}

private void stopIt() {
parallel(ServerListener::stop);
parallel("stop", ServerListener::stop);
running.set(false);

LOGGER.log(System.Logger.Level.INFO, "Níma server stopped all channels.");
}

private void startIt() {
long now = System.currentTimeMillis();
boolean result = parallel(ServerListener::start);
boolean result = parallel("start", ServerListener::start);
if (!result) {
LOGGER.log(System.Logger.Level.ERROR, "Níma server failed to start, shutting down");
parallel(ServerListener::stop);
parallel("stop", ServerListener::stop);
if (startFutures != null) {
startFutures.forEach(future -> future.future().cancel(true));
}
Expand Down Expand Up @@ -202,13 +202,16 @@ private void startIt() {
}

// return false if anything fails
private boolean parallel(Consumer<ServerListener> task) {
private boolean parallel(String taskName, Consumer<ServerListener> task) {
boolean result = true;

List<ListenerFuture> futures = new LinkedList<>();

for (ServerListener listener : listeners.values()) {
futures.add(new ListenerFuture(listener, executorService.submit(() -> task.accept(listener))));
futures.add(new ListenerFuture(listener, executorService.submit(() -> {
Thread.currentThread().setName(taskName + " " + listener);
task.accept(listener);
})));
}
for (ListenerFuture listenerFuture : futures) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class ServerListener {
this.contentEncodingContext = contentEncodingContext;
}

@Override
public String toString() {
return socketName + " (" + configuredAddress + ")";
}

int port() {
return connectedPort;
}
Expand Down

0 comments on commit 980696f

Please sign in to comment.