Skip to content

Commit

Permalink
feat: rest protocol support keep-alive timeout header config (#14560)
Browse files Browse the repository at this point in the history
* feat: rest protocol support keep-alive timeout header config

* optimize: rest protocol connection default keepalive

* optimize: rest protocol connection default keepalive
  • Loading branch information
funky-eyes authored Aug 21, 2024
1 parent 05cb55a commit 1c32c00
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.metadata.rest.media.MediaType;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.protocol.rest.RestHeaderEnum;
import org.apache.dubbo.rpc.protocol.rest.constans.RestConstant;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -36,10 +37,9 @@
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
Expand All @@ -51,11 +51,13 @@ public class NettyHttpResponse implements HttpResponse {
private static final int EMPTY_CONTENT_LENGTH = 0;
private int status = 200;
private OutputStream os;
private Map<String, List<String>> outputHeaders;
private final Map<String, List<String>> outputHeaders;
private final ChannelHandlerContext ctx;
private boolean committed;
private boolean keepAlive;
private HttpMethod method;
private final boolean keepAlive;

private final int idleTimeout;
private final HttpMethod method;
// raw response body
private Object responseBody;
// raw response class
Expand All @@ -69,6 +71,7 @@ public NettyHttpResponse(final ChannelHandlerContext ctx, final boolean keepAliv
outputHeaders = new HashMap<>();
this.method = method;
os = new ChunkOutputStream(this, ctx, url.getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD));
this.idleTimeout = url.getParameter(RestConstant.IDLE_TIMEOUT_PARAM, RestConstant.IDLE_TIMEOUT);
this.ctx = ctx;
this.keepAlive = keepAlive;
}
Expand Down Expand Up @@ -125,7 +128,6 @@ public void reset() {
throw new IllegalStateException("Messages.MESSAGES.alreadyCommitted()");
}
outputHeaders.clear();
outputHeaders.clear();
}

public boolean isKeepAlive() {
Expand All @@ -141,7 +143,7 @@ public DefaultHttpResponse getDefaultHttpResponse() {
public DefaultHttpResponse getEmptyHttpResponse() {
DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(getStatus()));
if (method == null || !method.equals(HttpMethod.HEAD)) {
res.headers().add(Names.CONTENT_LENGTH, EMPTY_CONTENT_LENGTH);
res.headers().add(HttpHeaderNames.CONTENT_LENGTH, EMPTY_CONTENT_LENGTH);
}
transformResponseHeaders(res);

Expand All @@ -155,7 +157,7 @@ private void transformResponseHeaders(io.netty.handler.codec.http.HttpResponse r
public void prepareChunkStream() {
committed = true;
DefaultHttpResponse response = getDefaultHttpResponse();
HttpHeaders.setTransferEncodingChunked(response);
HttpUtil.setTransferEncodingChunked(response, true);
ctx.write(response);
}

Expand Down Expand Up @@ -185,12 +187,7 @@ public void flushBuffer() throws IOException {
@Override
public void addOutputHeaders(String name, String value) {

List<String> values = outputHeaders.get(name);

if (values == null) {
values = new ArrayList<>();
outputHeaders.put(name, values);
}
List<String> values = outputHeaders.computeIfAbsent(name, k -> new ArrayList<>());

if (values.contains(value)) {
return;
Expand All @@ -200,10 +197,12 @@ public void addOutputHeaders(String name, String value) {
}

@SuppressWarnings({"rawtypes", "unchecked"})
public static void transformHeaders(
NettyHttpResponse nettyResponse, io.netty.handler.codec.http.HttpResponse response) {
public void transformHeaders(NettyHttpResponse nettyResponse, io.netty.handler.codec.http.HttpResponse response) {
if (nettyResponse.isKeepAlive()) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
if (idleTimeout > 0) {
response.headers().set(HttpHeaderNames.KEEP_ALIVE, "timeout=" + idleTimeout);
}
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpUtil;

import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;

Expand All @@ -56,8 +56,7 @@ public RestHttpRequestDecoder(URL url, ServiceDeployer serviceDeployer) {
protected void decode(
ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest request, List<Object> out)
throws Exception {
boolean keepAlive = HttpHeaders.isKeepAlive(request);

boolean keepAlive = HttpUtil.isKeepAlive(request);
NettyHttpResponse nettyHttpResponse = new NettyHttpResponse(ctx, keepAlive, url);
NettyRequestFacade requestFacade = new NettyRequestFacade(request, ctx, serviceDeployer);

Expand Down

0 comments on commit 1c32c00

Please sign in to comment.