Skip to content

Commit

Permalink
feat: add configurations around endpoint logging (#8249)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Oct 12, 2021
1 parent d98f50a commit 21f4e03
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.auth.ApiUser;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
Expand All @@ -27,6 +28,7 @@
import io.vertx.ext.web.impl.Utils;
import java.time.Clock;
import java.util.Optional;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,6 +40,8 @@ public class LoggingHandler implements Handler<RoutingContext> {
private final Logger logger;
private final Clock clock;
private final LoggingRateLimiter loggingRateLimiter;
private final Optional<Pattern> endpointFilter;
private final boolean enableQueryLogging;

public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateLimiter) {
this(server, loggingRateLimiter, LOG, Clock.systemUTC());
Expand All @@ -53,6 +57,24 @@ public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateL
this.loggingRateLimiter = requireNonNull(loggingRateLimiter);
this.logger = logger;
this.clock = clock;

final String endpointRegex = server.getConfig()
.getString(KsqlRestConfig.KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_CONFIG);

Optional<Pattern> endpointFilter;
try {
endpointFilter = endpointRegex.isEmpty()
? Optional.empty()
: Optional.of(Pattern.compile(endpointRegex));
} catch (final Exception e) {
LOG.warn("Could not set up regex for Logging Handler", e);
endpointFilter = Optional.empty();
}
this.endpointFilter = endpointFilter;

this.enableQueryLogging = server
.getConfig()
.getBoolean(KsqlRestConfig.KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG);
}

@Override
Expand All @@ -66,7 +88,14 @@ public void handle(final RoutingContext routingContext) {
final long contentLength = routingContext.request().response().bytesWritten();
final HttpVersion version = routingContext.request().version();
final HttpMethod method = routingContext.request().method();
final String uri = routingContext.request().uri();
final String uri = enableQueryLogging
? routingContext.request().uri()
: routingContext.request().path();

if (endpointFilter.isPresent() && endpointFilter.get().matcher(uri).matches()) {
return;
}

final long requestBodyLength = routingContext.request().bytesRead();
final String versionFormatted;
switch (version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,23 @@ public class KsqlRestConfig extends AbstractConfig {
public static final String KSQL_LOCAL_COMMANDS_LOCATION_DOC = "Specify the directory where "
+ "KSQL tracks local commands, e.g. transient queries";

public static final String KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG
= "ksql.endpoint.logging.log.queries";
private static final boolean KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DEFAULT = false;
private static final String KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DOC
= "Whether or not to log the query portion of the URI when logging endpoints. Note that"
+ " enabling this may log sensitive information.";

public static final String KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_CONFIG
= "ksql.endpoint.logging.ignored.paths.regex";
public static final String KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_DEFAULT = "";
public static final String KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_DOC =
"A regex that allows users to filter out logging from certain endpoints. Without this filter,"
+ " all endpoints are logged. An example usage of this configuration would be to disable"
+ " heartbeat logging (e.g. ksql.endpoint.logging.filter=.*heartbeat.* ) which can"
+ " otherwise be verbose. Note that this works on the entire URI, respecting the "
+ KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG + " configuration";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -693,6 +710,18 @@ public class KsqlRestConfig extends AbstractConfig {
KSQL_LOCAL_COMMANDS_LOCATION_DEFAULT,
Importance.LOW,
KSQL_LOCAL_COMMANDS_LOCATION_DOC
).define(
KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_CONFIG,
Type.STRING,
KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_DEFAULT,
Importance.LOW,
KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_DOC
).define(
KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG,
Type.BOOLEAN,
KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DEFAULT,
Importance.LOW,
KSQL_ENDPOINT_LOGGING_LOG_QUERIES_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -40,8 +41,6 @@ public class LoggingHandlerTest {
@Mock
private RoutingContext routingContext;
@Mock
private KsqlRestConfig ksqlRestConfig;
@Mock
private HttpServerRequest request;
@Mock
private HttpServerResponse response;
Expand All @@ -54,6 +53,7 @@ public class LoggingHandlerTest {
@Captor
private ArgumentCaptor<Handler<AsyncResult<Void>>> endCallback;

private KsqlRestConfig config = new KsqlRestConfig(ImmutableMap.of());
private LoggingHandler loggingHandler;


Expand All @@ -67,12 +67,13 @@ public void setUp() {
when(clock.millis()).thenReturn(1699813434333L);
when(response.bytesWritten()).thenReturn(5678L);
when(request.path()).thenReturn("/query");
when(request.uri()).thenReturn("/query");
when(request.uri()).thenReturn("/query?foo=bar");
when(request.getHeader(HTTP_HEADER_USER_AGENT)).thenReturn("bot");
when(socketAddress.host()).thenReturn("123.111.222.333");
when(request.bytesRead()).thenReturn(3456L);
when(request.version()).thenReturn(HttpVersion.HTTP_1_1);
when(request.method()).thenReturn(HttpMethod.POST);
when(server.getConfig()).thenReturn(config);
loggingHandler = new LoggingHandler(server, loggingRateLimiter, logger, clock);
}

Expand All @@ -93,6 +94,28 @@ public void shouldProduceLog() {
+ "\"POST /query HTTP/1.1\" 200 5678 \"-\" \"bot\" 3456"));
}

@Test
public void shouldProduceLogWithQuery() {
// Given:
when(response.getStatusCode()).thenReturn(200);
config = new KsqlRestConfig(
ImmutableMap.of(KsqlRestConfig.KSQL_ENDPOINT_LOGGING_LOG_QUERIES_CONFIG, true)
);
when(server.getConfig()).thenReturn(config);
loggingHandler = new LoggingHandler(server, loggingRateLimiter, logger, clock);

// When:
loggingHandler.handle(routingContext);
verify(routingContext).addEndHandler(endCallback.capture());
endCallback.getValue().handle(null);

// Then:
verify(logger).info(logStringCaptor.capture());
assertThat(logStringCaptor.getValue(),
is("123.111.222.333 - - [Sun, 12 Nov 2023 18:23:54 GMT] "
+ "\"POST /query?foo=bar HTTP/1.1\" 200 5678 \"-\" \"bot\" 3456"));
}

@Test
public void shouldProduceLog_warn() {
// Given:
Expand Down Expand Up @@ -127,6 +150,49 @@ public void shouldSkipLog() {
verify(logger, never()).error(any());
}

@Test
public void shouldSkipLogIfFiltered() {
// Given:
when(response.getStatusCode()).thenReturn(200);
config = new KsqlRestConfig(
ImmutableMap.of(KsqlRestConfig.KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_CONFIG, ".*query.*")
);
when(server.getConfig()).thenReturn(config);
loggingHandler = new LoggingHandler(server, loggingRateLimiter, logger, clock);

// When:
loggingHandler.handle(routingContext);
verify(routingContext).addEndHandler(endCallback.capture());
endCallback.getValue().handle(null);

// Then:
verify(logger, never()).info(any());
verify(logger, never()).warn(any());
verify(logger, never()).error(any());
}

@Test
public void shouldProduceLogWithRandomFilter() {
// Given:
when(response.getStatusCode()).thenReturn(200);
config = new KsqlRestConfig(
ImmutableMap.of(KsqlRestConfig.KSQL_ENDPOINT_LOGGING_IGNORED_PATHS_REGEX_CONFIG, ".*random.*")
);
when(server.getConfig()).thenReturn(config);
loggingHandler = new LoggingHandler(server, loggingRateLimiter, logger, clock);

// When:
loggingHandler.handle(routingContext);
verify(routingContext).addEndHandler(endCallback.capture());
endCallback.getValue().handle(null);

// Then:
verify(logger).info(logStringCaptor.capture());
assertThat(logStringCaptor.getValue(),
is("123.111.222.333 - - [Sun, 12 Nov 2023 18:23:54 GMT] "
+ "\"POST /query HTTP/1.1\" 200 5678 \"-\" \"bot\" 3456"));
}

@Test
public void shouldSkipRateLimited() {
// Given:
Expand Down

0 comments on commit 21f4e03

Please sign in to comment.