Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mechanism for early http header validation #92220

Merged
merged 54 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
0bc08e3
Move security http server logic to http server
Tim-Brooks Jun 2, 2022
b5eaf0b
Changes
Tim-Brooks Nov 23, 2022
d8b47a5
Merge remote-tracking branch 'upstream/main' into http_single_impl
Tim-Brooks Nov 23, 2022
32a3ae7
Cleanup
Tim-Brooks Nov 23, 2022
726b43d
Fix
Tim-Brooks Nov 23, 2022
67bfdac
Merge remote-tracking branch 'upstream/main' into http_single_impl
Tim-Brooks Dec 1, 2022
7fba280
INit
Tim-Brooks Dec 1, 2022
5b43b00
Changes
Tim-Brooks Dec 2, 2022
4bf2730
Merge remote-tracking branch 'upstream/main' into early_http_header_c…
Tim-Brooks Dec 2, 2022
2b66570
tests
Tim-Brooks Dec 8, 2022
0ed6cf8
Merge remote-tracking branch 'upstream/main' into early_http_header_c…
Tim-Brooks Dec 8, 2022
b9f2792
Changes
Tim-Brooks Dec 14, 2022
a4f17ff
Merge remote-tracking branch 'upstream/main' into early_http_header_c…
Tim-Brooks Dec 14, 2022
0d84d14
Merge remote-tracking branch 'upstream/main' into early_http_header_c…
Tim-Brooks Feb 3, 2023
b7a1c8e
Merge branch 'main' into early_http_header_check
albertzaharovits Feb 28, 2023
3a71195
Never close the channel on validation error
albertzaharovits Feb 28, 2023
7f374f1
Hook-in noop validator for security
albertzaharovits Feb 28, 2023
32b384a
Merge branch 'main' into early_http_header_check
albertzaharovits Mar 1, 2023
a5851ab
Nit
albertzaharovits Mar 1, 2023
e92e3bb
assert fullRequestConsumed || pending.isEmpty();
albertzaharovits Mar 1, 2023
8439acb
Reference count for drop data
albertzaharovits Mar 1, 2023
5f45ce9
Consume dropped data
albertzaharovits Mar 1, 2023
7e7eec7
Meh, random(null, ...)
albertzaharovits Mar 1, 2023
d2ca098
Fix test for content dropped
albertzaharovits Mar 1, 2023
2a62d72
Handle misbehaving HTTP start message
albertzaharovits Mar 2, 2023
eecd4c2
Nit
albertzaharovits Mar 2, 2023
163baec
Forward data
albertzaharovits Mar 2, 2023
9db59ca
requestStart wait for HTTP start
albertzaharovits Mar 6, 2023
2f9045f
Resize pending when dropping too
albertzaharovits Mar 6, 2023
f8e09d1
validationFailure resemble validationSuccess
albertzaharovits Mar 6, 2023
208316a
Fix
albertzaharovits Mar 6, 2023
75fb7b3
Make private methods static for clarity on variables accessed
albertzaharovits Mar 6, 2023
5ab32df
Nit assert
albertzaharovits Mar 6, 2023
4f12486
Validation failure content release
albertzaharovits Mar 6, 2023
6194829
forwardUntilFirstProperRequestStart
albertzaharovits Mar 6, 2023
5047da8
Merge branch 'main' into early_http_header_check
albertzaharovits Mar 6, 2023
0c2b7be
requestStart handles empty pending queue
albertzaharovits Mar 7, 2023
ea89adb
Streamline request dropped
albertzaharovits Mar 7, 2023
b00abaf
remove forwardUntil....
albertzaharovits Mar 7, 2023
daa9be0
Use switch statement
albertzaharovits Mar 7, 2023
e4633b5
STATE -> State and import static State.*
albertzaharovits Mar 7, 2023
b7fd527
Random nits
albertzaharovits Mar 7, 2023
eb45b64
Remove redundant HANDLING_QUEUED_DATA
albertzaharovits Mar 7, 2023
cecc024
pepper some state assertions
albertzaharovits Mar 7, 2023
692c39d
Merge branch 'main' into early_http_header_check
albertzaharovits Mar 7, 2023
fc02a6a
FORWARDING_DATA_UNTIL_NEXT_REQUEST
albertzaharovits Mar 7, 2023
cc5afb1
nitnit
albertzaharovits Mar 7, 2023
b0ebe1d
suppress warning fallthrough
albertzaharovits Mar 7, 2023
9ffb019
Polish Netty4HttpHeaderValidatorTests
albertzaharovits Mar 8, 2023
a0932e3
Netty4HttpHeaderValidatorTests
albertzaharovits Mar 8, 2023
23e695f
Merge branch 'main' into early_http_header_check
albertzaharovits Mar 28, 2023
eeffc7a
Merge branch 'main' into early_http_header_check
albertzaharovits Mar 30, 2023
d6d7b87
Comments to pipeline formation
albertzaharovits Mar 30, 2023
059c45d
Merge branch 'main' into early_http_header_check
albertzaharovits Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.TriConsumer;

import java.util.ArrayDeque;

import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;

public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {

public static final TriConsumer<HttpRequest, Channel, ActionListener<Void>> NOOP_VALIDATOR = ((
httpRequest,
channel,
listener) -> listener.onResponse(null));

private final TriConsumer<HttpRequest, Channel, ActionListener<Void>> validator;
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
private State state = WAITING_TO_START;

public Netty4HttpHeaderValidator(TriConsumer<HttpRequest, Channel, ActionListener<Void>> validator) {
this.validator = validator;
}

State getState() {
return state;
}

@SuppressWarnings("fallthrough")
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert msg instanceof HttpObject;
final HttpObject httpObject = (HttpObject) msg;

switch (state) {
case WAITING_TO_START:
assert pending.isEmpty();
pending.add(ReferenceCountUtil.retain(httpObject));
requestStart(ctx);
assert state == QUEUEING_DATA;
break;
case QUEUEING_DATA:
pending.add(ReferenceCountUtil.retain(httpObject));
break;
case FORWARDING_DATA_UNTIL_NEXT_REQUEST:
assert pending.isEmpty();
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
ctx.fireChannelRead(httpObject);
break;
case DROPPING_DATA_UNTIL_NEXT_REQUEST:
assert pending.isEmpty();
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
// fall-through
case DROPPING_DATA_PERMANENTLY:
assert pending.isEmpty();
ReferenceCountUtil.release(httpObject); // consume without enqueuing
break;
}

setAutoReadForState(ctx, state);
}

private void requestStart(ChannelHandlerContext ctx) {
assert state == WAITING_TO_START;

if (pending.isEmpty()) {
return;
}

final HttpObject httpObject = pending.getFirst();
final HttpRequest httpRequest;
if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) {
// a properly decoded HTTP start message is expected to begin validation
// anything else is probably an error that the downstream HTTP message aggregator will have to handle
httpRequest = (HttpRequest) httpObject;
} else {
httpRequest = null;
}

state = QUEUEING_DATA;

if (httpRequest == null) {
// this looks like a malformed request and will forward without validation
ctx.channel().eventLoop().submit(() -> forwardFullRequest(ctx));
} else {
validator.apply(httpRequest, ctx.channel(), new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// Always use "Submit" to prevent reentrancy concerns if we are still on event loop
ctx.channel().eventLoop().submit(() -> forwardFullRequest(ctx));
}

@Override
public void onFailure(Exception e) {
// Always use "Submit" to prevent reentrancy concerns if we are still on event loop
ctx.channel().eventLoop().submit(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e));
}
});
}
}

private void forwardFullRequest(ChannelHandlerContext ctx) {
assert ctx.channel().eventLoop().inEventLoop();
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;

boolean fullRequestForwarded = forwardData(ctx, pending);

assert fullRequestForwarded || pending.isEmpty();
if (fullRequestForwarded) {
state = WAITING_TO_START;
requestStart(ctx);
} else {
state = FORWARDING_DATA_UNTIL_NEXT_REQUEST;
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
assert ctx.channel().eventLoop().inEventLoop();
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;

HttpObject messageToForward = pending.getFirst();
boolean fullRequestDropped = dropData(pending);
if (messageToForward instanceof HttpContent toReplace) {
// if the request to forward contained data (which got dropped), replace with empty data
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
}
messageToForward.setDecoderResult(DecoderResult.failure(e));
ctx.fireChannelRead(messageToForward);

assert fullRequestDropped || pending.isEmpty();
if (fullRequestDropped) {
state = WAITING_TO_START;
requestStart(ctx);
} else {
state = DROPPING_DATA_UNTIL_NEXT_REQUEST;
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
state = DROPPING_DATA_PERMANENTLY;
while (true) {
if (dropData(pending) == false) {
break;
}
}
super.channelInactive(ctx);
}

private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque<HttpObject> pending) {
final int pendingMessages = pending.size();
try {
HttpObject toForward;
while ((toForward = pending.poll()) != null) {
ctx.fireChannelRead(toForward);
ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued
if (toForward instanceof LastHttpContent) {
return true;
}
}
return false;
} finally {
maybeResizePendingDown(pendingMessages, pending);
}
}

private static boolean dropData(ArrayDeque<HttpObject> pending) {
final int pendingMessages = pending.size();
try {
HttpObject toDrop;
while ((toDrop = pending.poll()) != null) {
ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming
if (toDrop instanceof LastHttpContent) {
return true;
}
}
return false;
} finally {
maybeResizePendingDown(pendingMessages, pending);
}
}

private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject> pending) {
if (pending.size() <= 4 && largeSize > 32) {
// Prevent the ArrayDeque from becoming forever large due to a single large message.
ArrayDeque<HttpObject> old = pending;
pending = new ArrayDeque<>(4);
pending.addAll(old);
}
}

private static void setAutoReadForState(ChannelHandlerContext ctx, State state) {
ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false);
}

enum State {
WAITING_TO_START,
QUEUEING_DATA,
FORWARDING_DATA_UNTIL_NEXT_REQUEST,
DROPPING_DATA_UNTIL_NEXT_REQUEST,
DROPPING_DATA_PERMANENTLY
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
Expand All @@ -35,6 +36,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -143,6 +146,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final RecvByteBufAllocator recvByteBufAllocator;
private final TLSConfig tlsConfig;
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
private final TriConsumer<HttpRequest, Channel, ActionListener<Void>> headerValidator;
private final int readTimeoutMillis;

private final int maxCompositeBufferComponents;
Expand All @@ -160,8 +164,8 @@ public Netty4HttpServerTransport(
SharedGroupFactory sharedGroupFactory,
Tracer tracer,
TLSConfig tlsConfig,
@Nullable AcceptChannelHandler.AcceptPredicate acceptChannelPredicate

@Nullable AcceptChannelHandler.AcceptPredicate acceptChannelPredicate,
@Nullable TriConsumer<HttpRequest, Channel, ActionListener<Void>> headerValidator
) {
super(
settings,
Expand All @@ -178,6 +182,7 @@ public Netty4HttpServerTransport(
this.sharedGroupFactory = sharedGroupFactory;
this.tlsConfig = tlsConfig;
this.acceptChannelPredicate = acceptChannelPredicate;
this.headerValidator = headerValidator;

this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);

Expand Down Expand Up @@ -323,7 +328,7 @@ public void onException(HttpChannel channel, Exception cause) {
}

public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate);
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, headerValidator);
}

static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
Expand All @@ -335,17 +340,20 @@ protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
private final HttpHandlingSettings handlingSettings;
private final TLSConfig tlsConfig;
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
private final TriConsumer<HttpRequest, Channel, ActionListener<Void>> headerValidator;

protected HttpChannelHandler(
final Netty4HttpServerTransport transport,
final HttpHandlingSettings handlingSettings,
final TLSConfig tlsConfig,
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
@Nullable final TriConsumer<HttpRequest, Channel, ActionListener<Void>> headerValidator
) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.tlsConfig = tlsConfig;
this.acceptChannelPredicate = acceptChannelPredicate;
this.headerValidator = headerValidator;
}

@Override
Expand Down Expand Up @@ -374,11 +382,17 @@ protected void initChannel(Channel ch) throws Exception {
handlingSettings.maxChunkSize()
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces
if (headerValidator != null) {
// runs a validation function on the first HTTP message piece which contains all the headers
// if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded
ch.pipeline().addLast("header_validator", new Netty4HttpHeaderValidator(headerValidator));
}
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.maxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline()
.addLast("decoder", decoder)
.addLast("decoder_compress", new HttpContentDecompressor())
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression
.addLast("encoder", new HttpResponseEncoder() {
@Override
protected boolean isContentAlwaysEmpty(HttpResponse msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
getSharedGroupFactory(settings),
tracer,
TLSConfig.noTLS(),
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
new SharedGroupFactory(Settings.EMPTY),
Tracer.NOOP,
TLSConfig.noTLS(),
null
null,
randomFrom(Netty4HttpHeaderValidator.NOOP_VALIDATOR, null)
)
) {
httpServerTransport.start();
Expand Down
Loading