Skip to content

Commit

Permalink
[java] Consume in batches from the new session queue (#10987)
Browse files Browse the repository at this point in the history
* [java] Consume in batches from the new session queue

* [java] Add test to check batch consumption

* [java] Remove public modifier from the test

* [java] Return value immediately in RemoteNewSessionQueue

Co-authored-by: Diego Molina <[email protected]>
  • Loading branch information
pujagani and diemol authored Oct 17, 2022
1 parent e685cf8 commit 02b23e0
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -753,7 +754,7 @@ public void run() {
// starting the session, we just put the request back on the queue.
// This does mean, however, that under high contention, we might end
// up starving a session request.
Set<Capabilities> stereotypes =
Map<Capabilities, Long> stereotypes =
getAvailableNodes()
.stream()
.filter(NodeStatus::hasCapacity)
Expand All @@ -763,15 +764,15 @@ public void run() {
.getSlots()
.stream()
.map(Slot::getStereotype)
.collect(Collectors.toSet()))
.collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toSet());
.collect(Collectors.groupingBy(ImmutableCapabilities::new, Collectors.counting()));

if (!stereotypes.isEmpty()) {
Optional<SessionRequest> maybeRequest = sessionQueue.getNextAvailable(stereotypes);
maybeRequest.ifPresent(
List<SessionRequest> matchingRequests = sessionQueue.getNextAvailable(stereotypes);
matchingRequests.forEach(
req -> sessionCreatorExecutor.execute(() -> handleNewSessionRequest(req)));
loop = maybeRequest.isPresent();
loop = !matchingRequests.isEmpty();
} else {
loop = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.openqa.selenium.Capabilities;
import org.openqa.selenium.grid.data.SessionRequest;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.json.Json;
import org.openqa.selenium.json.TypeToken;
import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpHandler;
Expand All @@ -30,16 +31,18 @@

import java.io.UncheckedIOException;
import java.lang.reflect.Type;
import java.util.Optional;
import java.util.Set;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf;
import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST;
import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE;

class GetNextMatchingRequest implements HttpHandler {
private static final Type SET_OF_CAPABILITIES = new TypeToken<Set<Capabilities>>() {}.getType();
private static final Type MAP_OF_CAPABILITIES = new TypeToken<Map<String, Long>>() {}.getType();
private static final Json JSON = new Json();

private final Tracer tracer;
private final NewSessionQueue queue;
Expand All @@ -53,11 +56,18 @@ public GetNextMatchingRequest(Tracer tracer, NewSessionQueue queue) {
public HttpResponse execute(HttpRequest req) throws UncheckedIOException {
try (Span span = newSpanAsChildOf(tracer, req, "sessionqueue.getrequest")) {
HTTP_REQUEST.accept(span, req);
Set<Capabilities> stereotypes = Contents.fromJson(req, SET_OF_CAPABILITIES);
Map<String, Long> stereotypesJson = Contents.fromJson(req, MAP_OF_CAPABILITIES);

Optional<SessionRequest> maybeRequest = queue.getNextAvailable(stereotypes);
Map<Capabilities, Long> stereotypes = new HashMap<>();

HttpResponse response = new HttpResponse().setContent(Contents.asJson(singletonMap("value", maybeRequest.orElse(null))));
stereotypesJson.forEach((k,v) -> {
Capabilities caps = JSON.toType(k, Capabilities.class);
stereotypes.put(caps, v);
});

List<SessionRequest> sessionRequestList = queue.getNextAvailable(stereotypes);

HttpResponse response = new HttpResponse().setContent(Contents.asJson(singletonMap("value", sessionRequestList)));

HTTP_RESPONSE.accept(span, response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private RequestId requestIdFrom(Map<String, String> params) {

public abstract Optional<SessionRequest> remove(RequestId reqId);

public abstract Optional<SessionRequest> getNextAvailable(Set<Capabilities> stereotypes);
public abstract List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes);

public abstract void complete(RequestId reqId, Either<SessionNotCreatedException, CreateSessionResponse> result);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableSet;

import org.openqa.selenium.Capabilities;
import org.openqa.selenium.ImmutableCapabilities;
import org.openqa.selenium.SessionNotCreatedException;
import org.openqa.selenium.concurrent.GuardedRunnable;
import org.openqa.selenium.grid.config.Config;
Expand Down Expand Up @@ -295,23 +296,33 @@ public Optional<SessionRequest> remove(RequestId reqId) {
}

@Override
public Optional<SessionRequest> getNextAvailable(Set<Capabilities> stereotypes) {
public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
Require.nonNull("Stereotypes", stereotypes);

Predicate<Capabilities> matchesStereotype =
caps -> stereotypes.stream().anyMatch(stereotype -> slotMatcher.matches(stereotype, caps));
caps -> stereotypes.entrySet()
.stream()
.filter(entry -> entry.getValue() > 0)
.anyMatch(entry -> {
boolean matches = slotMatcher.matches(entry.getKey(), caps);
if (matches) {
Long value = entry.getValue();
entry.setValue(value - 1);
}
return matches;
});

Lock writeLock = lock.writeLock();
writeLock.lock();
try {
Optional<SessionRequest> maybeRequest =
queue.stream()
.filter(req -> req.getDesiredCapabilities().stream().anyMatch(matchesStereotype))
.findFirst();
List<SessionRequest> availableRequests = queue.stream()
.filter(req -> req.getDesiredCapabilities().stream().anyMatch(matchesStereotype))
.limit(10) // TODO: Batch size should be configurable via a flag
.collect(Collectors.toList());

maybeRequest.ifPresent(req -> this.remove(req.getRequestId()));
availableRequests.forEach(req -> this.remove(req.getRequestId()));

return maybeRequest;
return availableRequests;
} finally {
writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.openqa.selenium.remote.http.HttpMethod.DELETE;
import static org.openqa.selenium.remote.http.HttpMethod.GET;
Expand All @@ -59,6 +60,7 @@
public class RemoteNewSessionQueue extends NewSessionQueue {

private static final Type QUEUE_CONTENTS_TYPE = new TypeToken<List<SessionRequestCapability>>() {}.getType();
private static final Type SESSION_REQUEST_TYPE = new TypeToken<List<SessionRequest>>() {}.getType();
private static final Json JSON = new Json();
private final HttpClient client;
private final Filter addSecret;
Expand Down Expand Up @@ -128,17 +130,19 @@ public Optional<SessionRequest> remove(RequestId reqId) {
}

@Override
public Optional<SessionRequest> getNextAvailable(Set<Capabilities> stereotypes) {
public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
Require.nonNull("Stereotypes", stereotypes);

Map<String, Long> stereotypeJson = new HashMap<>();
stereotypes.forEach((k,v) -> stereotypeJson.put(JSON.toJson(k), v));

HttpRequest upstream = new HttpRequest(POST, "/se/grid/newsessionqueue/session/next")
.setContent(Contents.asJson(stereotypes));
.setContent(Contents.asJson(stereotypeJson));

HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream);
HttpResponse response = client.with(addSecret).execute(upstream);

SessionRequest value = Values.get(response, SessionRequest.class);

return Optional.ofNullable(value);
return Values.get(response, SESSION_REQUEST_TYPE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -520,10 +521,57 @@ void shouldBeAbleToReturnTheNextAvailableEntryThatMatchesAStereotype(Supplier<Te
Map.of(),
Map.of()));

Optional<SessionRequest> returned = queue.getNextAvailable(
Set.of(new ImmutableCapabilities("browserName", "cheese")));
Map<Capabilities, Long> stereotypes = new HashMap<>();
stereotypes.put(new ImmutableCapabilities("browserName", "cheese"), 1L);

assertThat(returned).isEqualTo(Optional.of(expected));
List<SessionRequest> returned = queue.getNextAvailable(stereotypes);

assertThat(returned.get(0)).isEqualTo(expected);
}

@ParameterizedTest
@MethodSource("data")
void shouldBeAbleToReturnTheNextAvailableBatchThatMatchesStereotypes(Supplier<TestData> supplier) {
setup(supplier);

SessionRequest firstSessionRequest = new SessionRequest(
new RequestId(UUID.randomUUID()),
Instant.now(),
Set.of(W3C),
Set.of(new ImmutableCapabilities("browserName", "cheese", "se:kind", "smoked")),
Map.of(),
Map.of());

SessionRequest secondSessionRequest = new SessionRequest(
new RequestId(UUID.randomUUID()),
Instant.now(),
Set.of(W3C),
Set.of(new ImmutableCapabilities("browserName", "peas", "se:kind", "smoked")),
Map.of(),
Map.of());

SessionRequest thirdSessionRequest = new SessionRequest(
new RequestId(UUID.randomUUID()),
Instant.now(),
Set.of(W3C),
Set.of(new ImmutableCapabilities("browserName", "peas", "se:kind", "smoked")),
Map.of(),
Map.of());

localQueue.injectIntoQueue(firstSessionRequest);
localQueue.injectIntoQueue(secondSessionRequest);
localQueue.injectIntoQueue(thirdSessionRequest);

Map<Capabilities, Long> stereotypes = new HashMap<>();
stereotypes.put(new ImmutableCapabilities("browserName", "cheese"), 2L);
stereotypes.put(new ImmutableCapabilities("browserName", "peas"), 2L);

List<SessionRequest> returned = queue.getNextAvailable(stereotypes);

assertThat(returned.size()).isEqualTo(3);
assertTrue(returned.contains(firstSessionRequest));
assertTrue(returned.contains(secondSessionRequest));
assertTrue(returned.contains(thirdSessionRequest));
}

@ParameterizedTest
Expand Down Expand Up @@ -551,10 +599,12 @@ void shouldNotReturnANextAvailableEntryThatDoesNotMatchTheStereotypes(Supplier<T
Map.of());
localQueue.injectIntoQueue(expected);

Optional<SessionRequest> returned = queue.getNextAvailable(
Set.of(new ImmutableCapabilities("browserName", "cheese")));
Map<Capabilities, Long> stereotypes = new HashMap<>();
stereotypes.put(new ImmutableCapabilities("browserName", "cheese"), 1L);

List<SessionRequest> returned = queue.getNextAvailable(stereotypes);

assertThat(returned).isEqualTo(Optional.of(expected));
assertThat(returned.get(0)).isEqualTo(expected);
}

static class TestData {
Expand Down

0 comments on commit 02b23e0

Please sign in to comment.