Skip to content

Commit

Permalink
Some annotated service methods are run from blocking task execu… (lin…
Browse files Browse the repository at this point in the history
…e#2187)

Motivation:
The annotated services whose return type is neither `HttpResponse` nor `CompeltableFuture`, are run using a `blockingTaskExecutor` by default.
We should fix this to use `EventLoop` by default and let a user choose to use `blockingTaskExecutor` if he/she wants.

Modifications:
- Add `@Blocking` which makes the annotated service run using `blockingTaskExecutor`
- Make all annotated services run from `EventLoop`

Result:
- Fix line#2078
- (Breaking) All annotated services are now run from `EventLoop` by default
  • Loading branch information
heowc authored and eugene70 committed Nov 10, 2019
1 parent 5da8432 commit fc4ebda
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ public class AnnotatedHttpService implements HttpService {
private final HttpHeaders defaultHttpTrailers;

private final ResponseType responseType;
private final boolean useBlockingTaskExecutor;

AnnotatedHttpService(Object object, Method method,
List<AnnotatedValueResolver> resolvers,
List<ExceptionHandlerFunction> exceptionHandlers,
List<ResponseConverterFunction> responseConverters,
Route route,
ResponseHeaders defaultHttpHeaders,
HttpHeaders defaultHttpTrailers) {
HttpHeaders defaultHttpTrailers,
boolean useBlockingTaskExecutor) {
this.object = requireNonNull(object, "object");
this.method = requireNonNull(method, "method");
this.resolvers = requireNonNull(resolvers, "resolvers");
Expand All @@ -132,6 +134,7 @@ public class AnnotatedHttpService implements HttpService {

this.defaultHttpHeaders = requireNonNull(defaultHttpHeaders, "defaultHttpHeaders");
this.defaultHttpTrailers = requireNonNull(defaultHttpTrailers, "defaultHttpTrailers");
this.useBlockingTaskExecutor = useBlockingTaskExecutor;
final Class<?> returnType = method.getReturnType();
if (HttpResponse.class.isAssignableFrom(returnType)) {
responseType = ResponseType.HTTP_RESPONSE;
Expand Down Expand Up @@ -232,19 +235,45 @@ private CompletionStage<HttpResponse> serve0(ServiceRequestContext ctx, HttpRequ

switch (responseType) {
case HTTP_RESPONSE:
return f.thenApply(
msg -> new ExceptionFilteredHttpResponse(ctx, req, (HttpResponse) invoke(ctx, req, msg),
exceptionHandler));
if (useBlockingTaskExecutor) {
return f.thenApplyAsync(
msg -> new ExceptionFilteredHttpResponse(ctx, req,
(HttpResponse) invoke(ctx, req, msg),
exceptionHandler),
ctx.blockingTaskExecutor());
} else {
return f.thenApply(
msg -> new ExceptionFilteredHttpResponse(ctx, req,
(HttpResponse) invoke(ctx, req, msg),
exceptionHandler));
}

case COMPLETION_STAGE:
return f.thenCompose(msg -> toCompletionStage(invoke(ctx, req, msg)))
.handle((result, cause) -> cause == null ? convertResponse(ctx, req, null, result,
HttpHeaders.of())
: exceptionHandler.handleException(ctx, req,
cause));
if (useBlockingTaskExecutor) {
return f.thenComposeAsync(msg -> toCompletionStage(invoke(ctx, req, msg)),
ctx.blockingTaskExecutor())
.handle((result, cause) ->
cause == null ? convertResponse(ctx, req, null, result,
HttpHeaders.of())
: exceptionHandler.handleException(ctx, req, cause));
} else {
return f.thenCompose(msg -> toCompletionStage(invoke(ctx, req, msg)))
.handle((result, cause) ->
cause == null ? convertResponse(ctx, req, null, result,
HttpHeaders.of())
: exceptionHandler.handleException(ctx, req, cause));
}

default:
return f.thenApplyAsync(msg -> convertResponse(ctx, req, null, invoke(ctx, req, msg),
HttpHeaders.of()),
ctx.blockingTaskExecutor());
if (useBlockingTaskExecutor) {
return f.thenApplyAsync(
msg -> convertResponse(ctx, req, null, invoke(ctx, req, msg),
HttpHeaders.of()),
ctx.blockingTaskExecutor());
} else {
return f.thenApply(msg -> convertResponse(ctx, req, null, invoke(ctx, req, msg),
HttpHeaders.of()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import com.linecorp.armeria.server.SimpleDecoratingHttpService;
import com.linecorp.armeria.server.annotation.AdditionalHeader;
import com.linecorp.armeria.server.annotation.AdditionalTrailer;
import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.armeria.server.annotation.ConsumeType;
import com.linecorp.armeria.server.annotation.Consumes;
import com.linecorp.armeria.server.annotation.Decorator;
Expand Down Expand Up @@ -351,13 +352,15 @@ static List<AnnotatedHttpServiceElement> create(String pathPrefix, Object object
final ResponseHeaders responseHeaders = defaultHeaders.build();
final HttpHeaders responseTrailers = defaultTrailers.build();

final boolean useBlockingTaskExecutor = findFirst(method, Blocking.class).isPresent();

return routes.stream().map(route -> {
final List<AnnotatedValueResolver> resolvers = getAnnotatedValueResolvers(req, route, method,
clazz);
return new AnnotatedHttpServiceElement(
route,
new AnnotatedHttpService(object, method, resolvers, eh, res, route, responseHeaders,
responseTrailers),
responseTrailers, useBlockingTaskExecutor),
decorator(method, clazz, getInitialDecorator(route.methods())));
}).collect(toImmutableList());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import com.linecorp.armeria.server.ServerConfig;

/**
* Specifies that the annotated service method must be invoked from the
* {@linkplain ServerConfig#blockingTaskExecutor() blocking task executor}
* instead of an event loop thread.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Blocking {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright 2019 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.internal.annotation;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;

import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.util.ThreadFactories;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.logging.LoggingService;
import com.linecorp.armeria.testing.junit.server.ServerExtension;

class AnnotatedHttpServiceBlockingTest {

private static final CountingThreadPoolExecutor executor = new CountingThreadPoolExecutor(
0, 1, 1, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
ThreadFactories.newThreadFactory("blocking-test", true));

private static final AtomicInteger blockingCount = new AtomicInteger();

@BeforeEach
void clear() {
blockingCount.set(0);
}

private static class CountingThreadPoolExecutor extends ThreadPoolExecutor {

CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
blockingCount.incrementAndGet();
}
}

@RegisterExtension
static final ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.annotatedService("/myEvenLoop", new MyEventLoopAnnotatedService(),
LoggingService.newDecorator());
sb.annotatedService("/myBlocking", new MyBlockingAnnotatedService(),
LoggingService.newDecorator());
sb.blockingTaskExecutor(executor, false);
}
};

static class MyEventLoopAnnotatedService {

@Get("/httpResponse")
public HttpResponse httpResponse(RequestContext ctx) {
return HttpResponse.of(HttpStatus.OK);
}

@Get("/aggregatedHttpResponse")
public AggregatedHttpResponse aggregatedHttpResponse(RequestContext ctx) {
return AggregatedHttpResponse.of(HttpStatus.OK);
}

@Get("/jsonNode")
public JsonNode jsonNode(RequestContext ctx) {
return TextNode.valueOf("Armeria");
}

@Get("/completionStage")
public CompletionStage<String> completionStage(RequestContext ctx) {
return CompletableFuture.supplyAsync(() -> "Armeria");
}
}

static class MyBlockingAnnotatedService {

@Get("/httpResponse")
@Blocking
public HttpResponse httpResponse(RequestContext ctx) {
return HttpResponse.of(HttpStatus.OK);
}

@Get("/aggregatedHttpResponse")
@Blocking
public AggregatedHttpResponse aggregatedHttpResponse(RequestContext ctx) {
return AggregatedHttpResponse.of(HttpStatus.OK);
}

@Get("/jsonNode")
@Blocking
public JsonNode jsonNode(RequestContext ctx) {
return TextNode.valueOf("Armeria");
}

@Get("/completionStage")
@Blocking
public CompletionStage<String> completionStage(RequestContext ctx) {
return CompletableFuture.supplyAsync(() -> "Armeria");
}
}

@ParameterizedTest
@CsvSource({
"/myEvenLoop/httpResponse, 0",
"/myEvenLoop/aggregatedHttpResponse, 0",
"/myEvenLoop/jsonNode, 0",
"/myEvenLoop/completionStage, 0"
})
void testOnlyEventLoopWithoutBlockingAnnotation(String path, Integer count) throws Exception {
final HttpClient client = HttpClient.of(server.uri("/"));

final RequestHeaders headers = RequestHeaders.of(HttpMethod.GET, path);
final AggregatedHttpResponse res = client.execute(headers).aggregate().join();
assertThat(res.status()).isSameAs(HttpStatus.OK);
assertThat(blockingCount).hasValue(count);
}

@ParameterizedTest
@CsvSource({
"/myBlocking/httpResponse, 1",
"/myBlocking/aggregatedHttpResponse, 1",
"/myBlocking/jsonNode, 1",
"/myBlocking/completionStage, 1"
})
void testOnlyBlockingWithBlockingAnnotation(String path, Integer count) throws Exception {
final HttpClient client = HttpClient.of(server.uri("/"));

final RequestHeaders headers = RequestHeaders.of(HttpMethod.GET, path);
final AggregatedHttpResponse res = client.execute(headers).aggregate().join();
assertThat(res.status()).isSameAs(HttpStatus.OK);
assertThat(blockingCount).hasValue(count);
}
}
Loading

0 comments on commit fc4ebda

Please sign in to comment.