diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java index 7d4c5de871d8bd..d46b66bc167e6a 100644 --- a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java @@ -42,6 +42,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -838,8 +839,42 @@ private void handleNewSessionRequest(SessionRequest sessionRequest) { } } - sessionQueue.complete(reqId, response); + // 'complete' will return 'true' if the session has not timed out during the creation + // process: it's still a valid session as it can be used by the client + boolean isSessionValid = sessionQueue.complete(reqId, response); + // If the session request has timed out, tell the Node to remove the session, so that does + // not stall + if (!isSessionValid && response.isRight()) { + LOG.log( + Debug.getDebugLogLevel(), + "Session for request {0} has been created but it has timed out, stopping it to avoid" + + " stalled browser", + reqId.toString()); + URI nodeURI = response.right().getSession().getUri(); + Node node = getNodeFromURI(nodeURI); + if (node != null) { + node.stop(response.right().getSession().getId()); + } + } } } } + + protected Node getNodeFromURI(URI uri) { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + Optional nodeStatus = + model.getSnapshot().stream() + .filter(node -> node.getExternalUri().equals(uri)) + .findFirst(); + if (nodeStatus.isPresent()) { + return nodes.get(nodeStatus.get().getNodeId()); + } else { + return null; + } + } finally { + readLock.unlock(); + } + } } diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueue.java b/java/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueue.java index 172716585f5fc4..aab89d92f945db 100644 --- a/java/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueue.java +++ b/java/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueue.java @@ -111,7 +111,7 @@ private RequestId requestIdFrom(Map params) { public abstract List getNextAvailable(Map stereotypes); - public abstract void complete( + public abstract boolean complete( RequestId reqId, Either result); public abstract int clearQueue(); diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/SessionCreated.java b/java/src/org/openqa/selenium/grid/sessionqueue/SessionCreated.java index 3b01b5b1110fe7..674797d0aa2f36 100644 --- a/java/src/org/openqa/selenium/grid/sessionqueue/SessionCreated.java +++ b/java/src/org/openqa/selenium/grid/sessionqueue/SessionCreated.java @@ -17,6 +17,8 @@ package org.openqa.selenium.grid.sessionqueue; +import static java.util.Collections.singletonMap; +import static org.openqa.selenium.remote.http.Contents.asJson; import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf; import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST; import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; @@ -50,9 +52,14 @@ public HttpResponse execute(HttpRequest req) throws UncheckedIOException { HTTP_REQUEST.accept(span, req); CreateSessionResponse response = Contents.fromJson(req, CreateSessionResponse.class); - queue.complete(requestId, Either.right(response)); - HttpResponse res = new HttpResponse(); + // 'complete' will return 'true' if the session has not timed out during the creation process: + // it's still a valid session as it can be used by the client + boolean isSessionValid = queue.complete(requestId, Either.right(response)); + + HttpResponse res = + new HttpResponse().setContent(asJson(singletonMap("value", isSessionValid))); + HTTP_RESPONSE.accept(span, res); return res; } diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/SessionNotCreated.java b/java/src/org/openqa/selenium/grid/sessionqueue/SessionNotCreated.java index 63b7c7dac83173..7a8d4e8181d89e 100644 --- a/java/src/org/openqa/selenium/grid/sessionqueue/SessionNotCreated.java +++ b/java/src/org/openqa/selenium/grid/sessionqueue/SessionNotCreated.java @@ -17,6 +17,8 @@ package org.openqa.selenium.grid.sessionqueue; +import static java.util.Collections.singletonMap; +import static org.openqa.selenium.remote.http.Contents.asJson; import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf; import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST; import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; @@ -52,9 +54,13 @@ public HttpResponse execute(HttpRequest req) throws UncheckedIOException { String message = Contents.fromJson(req, String.class); SessionNotCreatedException exception = new SessionNotCreatedException(message); - queue.complete(requestId, Either.left(exception)); - HttpResponse res = new HttpResponse(); + // 'complete' will return 'true' if the session has not timed out during the creation process: + // it's still a valid session as it can be used by the client + boolean isSessionValid = queue.complete(requestId, Either.left(exception)); + + HttpResponse res = + new HttpResponse().setContent(asJson(singletonMap("value", isSessionValid))); HTTP_RESPONSE.accept(span, res); return res; } diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java b/java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java index a26222971643cb..d673cc342de95f 100644 --- a/java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java +++ b/java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java @@ -214,9 +214,6 @@ public HttpResponse addToQueue(SessionRequest request) { try { boolean sessionCreated = data.latch.await(requestTimeout.toMillis(), MILLISECONDS); - if (!(sessionCreated || isRequestInQueue(request.getRequestId()))) { - sessionCreated = data.latch.await(5000, MILLISECONDS); - } if (sessionCreated) { result = data.result; @@ -293,6 +290,12 @@ public boolean retryAddToQueue(SessionRequest request) { if (!requests.containsKey(request.getRequestId())) { return false; } + if (isTimedOut(Instant.now(), requests.get(request.getRequestId()))) { + // as we try to re-add a session request that has already expired, force session timeout + failDueToTimeout(request.getRequestId()); + // return true to avoid handleNewSessionRequest to call 'complete' an other time + return true; + } if (queue.contains(request)) { // No need to re-add this @@ -330,19 +333,6 @@ public Optional remove(RequestId reqId) { } } - private boolean isRequestInQueue(RequestId requestId) { - Lock readLock = lock.readLock(); - readLock.lock(); - - try { - Optional result = - queue.stream().filter(req -> req.getRequestId().equals(requestId)).findAny(); - return result.isPresent(); - } finally { - readLock.unlock(); - } - } - @Override public List getNextAvailable(Map stereotypes) { Require.nonNull("Stereotypes", stereotypes); @@ -378,8 +368,9 @@ public List getNextAvailable(Map stereotypes } } + /** Returns true if the session is still valid (not timed out) */ @Override - public void complete( + public boolean complete( RequestId reqId, Either result) { Require.nonNull("New session request", reqId); Require.nonNull("Result", result); @@ -388,6 +379,7 @@ public void complete( Lock readLock = lock.readLock(); readLock.lock(); Data data; + boolean isSessionTimedOut = false; try { data = requests.get(reqId); } finally { @@ -395,7 +387,9 @@ public void complete( } if (data == null) { - return; + return false; + } else { + isSessionTimedOut = isTimedOut(Instant.now(), data); } Lock writeLock = lock.writeLock(); @@ -409,6 +403,8 @@ public void complete( } data.setResult(result); + + return !isSessionTimedOut; } } diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueue.java b/java/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueue.java index e80f0287433d49..f36cdd6bbfa72a 100644 --- a/java/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueue.java +++ b/java/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueue.java @@ -154,7 +154,7 @@ public List getNextAvailable(Map stereotypes } @Override - public void complete( + public boolean complete( RequestId reqId, Either result) { Require.nonNull("Request ID", reqId); Require.nonNull("Result", result); @@ -171,7 +171,8 @@ public void complete( } HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream); - client.with(addSecret).execute(upstream); + HttpResponse response = client.with(addSecret).execute(upstream); + return Values.get(response, Boolean.class); } @Override diff --git a/java/test/org/openqa/selenium/grid/router/SessionQueueGridWithTimeoutTest.java b/java/test/org/openqa/selenium/grid/router/SessionQueueGridWithTimeoutTest.java new file mode 100644 index 00000000000000..455749debff964 --- /dev/null +++ b/java/test/org/openqa/selenium/grid/router/SessionQueueGridWithTimeoutTest.java @@ -0,0 +1,207 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.router; + +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; +import static org.openqa.selenium.remote.http.Contents.asJson; +import static org.openqa.selenium.remote.http.HttpMethod.POST; + +import com.google.common.collect.ImmutableMap; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.openqa.selenium.Capabilities; +import org.openqa.selenium.ImmutableCapabilities; +import org.openqa.selenium.events.EventBus; +import org.openqa.selenium.events.local.GuavaEventBus; +import org.openqa.selenium.grid.config.MapConfig; +import org.openqa.selenium.grid.data.DefaultSlotMatcher; +import org.openqa.selenium.grid.data.Session; +import org.openqa.selenium.grid.distributor.Distributor; +import org.openqa.selenium.grid.distributor.local.LocalDistributor; +import org.openqa.selenium.grid.distributor.selector.DefaultSlotSelector; +import org.openqa.selenium.grid.node.local.LocalNode; +import org.openqa.selenium.grid.security.Secret; +import org.openqa.selenium.grid.server.BaseServerOptions; +import org.openqa.selenium.grid.server.Server; +import org.openqa.selenium.grid.sessionmap.SessionMap; +import org.openqa.selenium.grid.sessionmap.local.LocalSessionMap; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueue; +import org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue; +import org.openqa.selenium.grid.testing.PassthroughHttpClient; +import org.openqa.selenium.grid.testing.TestSessionFactory; +import org.openqa.selenium.grid.web.CombinedHandler; +import org.openqa.selenium.grid.web.RoutableHttpClientFactory; +import org.openqa.selenium.net.PortProber; +import org.openqa.selenium.netty.server.NettyServer; +import org.openqa.selenium.remote.http.HttpClient; +import org.openqa.selenium.remote.http.HttpHandler; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.DefaultTestTracer; +import org.openqa.selenium.remote.tracing.Tracer; +import org.openqa.selenium.support.ui.FluentWait; + +class SessionQueueGridWithTimeoutTest { + private static final Capabilities CAPS = new ImmutableCapabilities("browserName", "cheese"); + private HttpClient.Factory clientFactory; + private Secret registrationSecret; + private Server server; + private EventBus bus; + private LocalNode localNode; + + private static Server createServer(HttpHandler handler) { + return new NettyServer( + new BaseServerOptions( + new MapConfig( + ImmutableMap.of("server", ImmutableMap.of("port", PortProber.findFreePort())))), + handler); + } + + @BeforeEach + public void setup() throws URISyntaxException, MalformedURLException { + Tracer tracer = DefaultTestTracer.createTracer(); + bus = new GuavaEventBus(); + int nodePort = PortProber.findFreePort(); + URI nodeUri = new URI("http://localhost:" + nodePort); + CombinedHandler handler = new CombinedHandler(); + clientFactory = + new RoutableHttpClientFactory(nodeUri.toURL(), handler, HttpClient.Factory.createDefault()); + + registrationSecret = new Secret("cheese"); + + SessionMap sessions = new LocalSessionMap(tracer, bus); + NewSessionQueue queue = + new LocalNewSessionQueue( + tracer, + new DefaultSlotMatcher(), + Duration.ofSeconds(1), + Duration.ofSeconds(5), // low timeout to allow simulating it + registrationSecret, + 5); + handler.addHandler(queue); + + localNode = + LocalNode.builder(tracer, bus, nodeUri, nodeUri, registrationSecret) + .add( + CAPS, + new TestSessionFactory( + (id, caps) -> { + try { + Thread.sleep(6000); // simulate a session that takes long to create + } catch (Exception e) { + } + return new Session( + id, nodeUri, new ImmutableCapabilities(), caps, Instant.now()); + })) + .maximumConcurrentSessions(1) + .build(); + handler.addHandler(localNode); + + Distributor distributor = + new LocalDistributor( + tracer, + bus, + new PassthroughHttpClient.Factory(localNode), + sessions, + queue, + new DefaultSlotSelector(), + registrationSecret, + Duration.ofMinutes(5), + false, + Duration.ofSeconds(5), + Runtime.getRuntime().availableProcessors(), + new DefaultSlotMatcher()); + handler.addHandler(distributor); + + distributor.add(localNode); + + Router router = new Router(tracer, clientFactory, sessions, queue, distributor); + + server = createServer(router); + server.start(); + } + + @Test + void shouldBeAbleToDeleteTimedoutSessions() { + ImmutableMap caps = ImmutableMap.of("browserName", "cheese"); + ExecutorService fixedThreadPoolService = Executors.newFixedThreadPool(1); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + try { + Callable sessionCreationTask = () -> createSession(caps); + List> futureList = + fixedThreadPoolService.invokeAll(Arrays.asList(sessionCreationTask)); + + for (Future future : futureList) { + HttpResponse httpResponse = future.get(10, SECONDS); + assertThat(httpResponse.getStatus()).isEqualTo(HTTP_INTERNAL_ERROR); + // session is creating, so a slot is used + assertThat(localNode.getUsedSlots()).isEqualTo(1); + + // session has been destroyed on node as it's not used + new FluentWait<>(localNode) + .withTimeout(Duration.ofSeconds(7)) + .until(node -> node.getUsedSlots() == 0); + } + + } catch (InterruptedException e) { + fail("Unable to create session. Thread Interrupted"); + } catch (ExecutionException e) { + fail("Unable to create session due to execution exception."); + } catch (TimeoutException e) { + fail("Unable to create session. Timeout occurred."); + } finally { + fixedThreadPoolService.shutdownNow(); + scheduler.shutdownNow(); + } + } + + @AfterEach + public void stopServer() { + bus.close(); + server.stop(); + } + + private HttpResponse createSession(ImmutableMap caps) { + HttpRequest request = new HttpRequest(POST, "/session"); + request.setContent( + asJson(ImmutableMap.of("capabilities", ImmutableMap.of("alwaysMatch", caps)))); + + try (HttpClient client = clientFactory.createClient(server.getUrl())) { + return client.execute(request); + } + } +} diff --git a/java/test/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueueTest.java b/java/test/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueueTest.java index 9feb1a7d80235c..b099243066e26c 100644 --- a/java/test/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueueTest.java +++ b/java/test/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueueTest.java @@ -20,6 +20,7 @@ import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; import static java.net.HttpURLConnection.HTTP_OK; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -35,6 +36,7 @@ import java.net.URISyntaxException; import java.time.Duration; import java.time.Instant; +import java.time.LocalDateTime; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; @@ -43,6 +45,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -166,6 +169,98 @@ private void waitUntilAddedToQueue(SessionRequest request) { sessionRequestCapability.getRequestId().equals(r.getRequestId()))); } + @ParameterizedTest + @MethodSource("data") + void testCompleteWithCreatedSession(Supplier supplier) throws InterruptedException { + setup(supplier); + + AtomicBoolean isCompleted = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + + new Thread( + () -> { + waitUntilAddedToQueue(sessionRequest); + + Capabilities capabilities = new ImmutableCapabilities("browserName", "chrome"); + SessionId sessionId = new SessionId("123"); + Session session = + new Session( + sessionId, + URI.create("https://example.com"), + CAPS, + capabilities, + Instant.now()); + CreateSessionResponse sessionResponse = + new CreateSessionResponse( + session, + JSON.toJson( + ImmutableMap.of( + "value", + ImmutableMap.of( + "sessionId", sessionId, + "capabilities", capabilities))) + .getBytes(UTF_8)); + + isCompleted.set( + queue.complete(sessionRequest.getRequestId(), Either.right(sessionResponse))); + latch.countDown(); + }) + .start(); + + HttpResponse httpResponse = queue.addToQueue(sessionRequest); + + assertThat(latch.await(1000, MILLISECONDS)).isTrue(); + assertThat(isCompleted.get()).isTrue(); + } + + @ParameterizedTest + @MethodSource("data") + void testCompleteWithSessionInTimeout(Supplier supplier) throws InterruptedException { + setup(supplier); + + AtomicBoolean isCompleted = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + + new Thread( + () -> { + waitUntilAddedToQueue(sessionRequest); + try { + Thread.sleep(5500); // simulate session long to create + } catch (InterruptedException ignore) { + } + Capabilities capabilities = new ImmutableCapabilities("browserName", "chrome"); + SessionId sessionId = new SessionId("123"); + Session session = + new Session( + sessionId, + URI.create("https://example.com"), + CAPS, + capabilities, + Instant.now()); + CreateSessionResponse sessionResponse = + new CreateSessionResponse( + session, + JSON.toJson( + ImmutableMap.of( + "value", + ImmutableMap.of( + "sessionId", sessionId, + "capabilities", capabilities))) + .getBytes(UTF_8)); + + isCompleted.set( + queue.complete( + sessionRequest.getRequestId(), + Either.left(new SessionNotCreatedException("not created")))); + latch.countDown(); + }) + .start(); + + HttpResponse httpResponse = queue.addToQueue(sessionRequest); + assertThat(latch.await(1000, MILLISECONDS)).isTrue(); + assertThat(isCompleted.get()).isFalse(); + } + @ParameterizedTest @MethodSource("data") void shouldBeAbleToAddToQueueAndGetValidResponse(Supplier supplier) { @@ -210,7 +305,8 @@ void shouldBeAbleToAddToQueueAndGetValidResponse(Supplier supplier) { @ParameterizedTest @MethodSource("data") - void shouldBeAbleToAddToQueueWithTimeoutAndGetValidResponse(Supplier supplier) { + void shouldBeAbleToAddToQueueWithTimeoutDoNotCreateSessionAfterTimeout( + Supplier supplier) { setup(supplier); SessionRequest sessionRequestWithTimeout = @@ -240,39 +336,25 @@ void shouldBeAbleToAddToQueueWithTimeoutAndGetValidResponse(Supplier s Map stereotypes = new HashMap<>(); stereotypes.put(new ImmutableCapabilities("browserName", "cheese"), 1L); queue.getNextAvailable(stereotypes); - - SessionId sessionId = new SessionId("123"); - Session session = - new Session( - sessionId, - URI.create("https://example.com"), - CAPS, - capabilities, - Instant.now()); - CreateSessionResponse sessionResponse = - new CreateSessionResponse( - session, - JSON.toJson( - ImmutableMap.of( - "value", - ImmutableMap.of( - "sessionId", sessionId, - "capabilities", capabilities))) - .getBytes(UTF_8)); - try { - Thread.sleep(2000); // simulate session creation delay + Thread.sleep(2000); // wait to go past the request session timeout } catch (InterruptedException ignore) { } - queue.complete( - sessionRequestWithTimeout.getRequestId(), Either.right(sessionResponse)); + + // LocalDistributor could not distribute the session, add it back to queue + // it should not be re-added to queue and send back error on session creation + queue.retryAddToQueue(sessionRequestWithTimeout); }) .start(); + LocalDateTime start = LocalDateTime.now(); HttpResponse httpResponse = queue.addToQueue(sessionRequestWithTimeout); + // check we do not wait more than necessary + assertThat(LocalDateTime.now().minusSeconds(10).isBefore(start)).isTrue(); + assertThat(isPresent.get()).isTrue(); - assertEquals(HTTP_OK, httpResponse.getStatus()); + assertEquals(HTTP_INTERNAL_ERROR, httpResponse.getStatus()); } @ParameterizedTest