From 73a410ba220f3109d04f50c706fb53dff4c7525a Mon Sep 17 00:00:00 2001 From: Peter Nied Date: Sun, 1 Oct 2023 23:34:22 +0000 Subject: [PATCH] Add header verifition netty channel handler Based off work of @cwperks in https://github.com/opensearch-project/OpenSearch/pull/10261 Signed-off-by: Peter Nied --- CHANGELOG.md | 1 + .../opensearch/http/netty4/Netty4Http2IT.java | 29 ++++++++++ .../http/netty4/Netty4HeaderVerifier.java | 55 +++++++++++++++++++ .../netty4/Netty4HttpServerTransport.java | 12 +++- 4 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HeaderVerifier.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 772198d5d0544..aa32d6d537eb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131)) - Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189)) - Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562)) +- Header verification ([####](https://github.com/opensearch-project/OpenSearch/pull/###)) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4Http2IT.java b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4Http2IT.java index eba2c5ce1e094..fad54fcf4f271 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4Http2IT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4Http2IT.java @@ -15,18 +15,25 @@ import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.stream.IntStream; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.HttpConversionUtil; import io.netty.util.ReferenceCounted; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; +import static io.netty.handler.codec.http.HttpHeaderNames.HOST; @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) public class Netty4Http2IT extends OpenSearchNetty4IntegTestCase { @@ -56,6 +63,28 @@ public void testThatNettyHttpServerSupportsHttp2GetUpgrades() throws Exception { } } + public void testThatNettyHttpServerRequestBlockedWithHeaderVerifier() throws Exception { + HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); + TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); + TransportAddress transportAddress = randomFrom(boundAddresses); + + final FullHttpRequest blockedRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + blockedRequest.headers().add("blockme", "Not Allowed"); + blockedRequest.headers().add(HOST, "localhost"); + blockedRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http"); + + final List responses = new ArrayList<>(); + try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2()) { + try { + FullHttpResponse blockedResponse = nettyHttpClient.send(transportAddress.address(), blockedRequest); + responses.add(blockedResponse); + assertThat(blockedResponse.status().code(), equalTo(401)); + } finally { + responses.forEach(ReferenceCounted::release); + } + } + } + public void testThatNettyHttpServerSupportsHttp2PostUpgrades() throws Exception { final List> requests = List.of(Tuple.tuple("/_search", "{\"query\":{ \"match_all\":{}}}")); diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HeaderVerifier.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HeaderVerifier.java new file mode 100644 index 0000000000000..8e50076234d1f --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HeaderVerifier.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.ReferenceCountUtil; + +/** POC for how an external header verifier would be implemented */ +@ChannelHandler.Sharable +public class Netty4HeaderVerifier extends ChannelInboundHandlerAdapter { + + final static Logger log = LogManager.getLogger(Netty4HeaderVerifier.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (!(msg instanceof HttpRequest)) { + ctx.fireChannelRead(msg); + } + + HttpRequest request = (HttpRequest) msg; + if (!isVerified(request)) { + final FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + ReferenceCountUtil.release(msg); + } else { + // Lets the request pass to the next channel handler + ctx.fireChannelRead(msg); + } + } + + private boolean isVerified(HttpRequest request) { + log.info("Checking if request is authenticated:\n" + request); + + final boolean shouldBlock = request.headers().contains("blockme"); + + return !shouldBlock; + } +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index 0271472125814..5cd5d95c037d3 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -419,8 +419,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E // If this handler is hit then no upgrade has been attempted and the client is just talking HTTP final ChannelPipeline pipeline = ctx.pipeline(); pipeline.addAfter(ctx.name(), "handler", getRequestHandler()); - pipeline.replace(this, "decoder_compress", new HttpContentDecompressor()); - + pipeline.replace(this, "header_verifier", transport.createHeaderVerifier()); + pipeline.addAfter("header_verifier", "decoder_compress", new HttpContentDecompressor()); pipeline.addAfter("decoder_compress", "aggregator", aggregator); if (handlingSettings.isCompression()) { pipeline.addAfter( @@ -446,6 +446,7 @@ protected void configureDefaultHttpPipeline(ChannelPipeline pipeline) { ); decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); pipeline.addLast("decoder", decoder); + pipeline.addLast("header_verifier", transport.createHeaderVerifier()); pipeline.addLast("decoder_compress", new HttpContentDecompressor()); pipeline.addLast("encoder", new HttpResponseEncoder()); final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength()); @@ -493,6 +494,7 @@ protected void initChannel(Channel childChannel) throws Exception { .addLast(new Http2StreamFrameToHttpObjectCodec(true)) .addLast("byte_buf_sizer", byteBufSizer) .addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)) + .addLast("header_verifier", transport.createHeaderVerifier()) .addLast("decoder_decompress", new HttpContentDecompressor()); if (handlingSettings.isCompression()) { @@ -531,4 +533,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } } + + protected ChannelInboundHandlerAdapter createHeaderVerifier() { + return new Netty4HeaderVerifier(); + // pass-through + // return new ChannelInboundHandlerAdapter(); + } }