diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/InputDiagnosticService.java b/graylog2-server/src/main/java/org/graylog2/inputs/InputDiagnosticService.java new file mode 100644 index 000000000000..09e99fe9351b --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/inputs/InputDiagnosticService.java @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.inputs; + +import com.google.common.collect.ImmutableSet; +import jakarta.inject.Inject; +import jakarta.ws.rs.InternalServerErrorException; +import org.apache.commons.lang3.StringUtils; +import org.graylog.plugins.views.search.Query; +import org.graylog.plugins.views.search.QueryResult; +import org.graylog.plugins.views.search.Search; +import org.graylog.plugins.views.search.SearchJob; +import org.graylog.plugins.views.search.SearchType; +import org.graylog.plugins.views.search.elasticsearch.ElasticsearchQueryString; +import org.graylog.plugins.views.search.engine.SearchExecutor; +import org.graylog.plugins.views.search.errors.SearchError; +import org.graylog.plugins.views.search.permissions.SearchUser; +import org.graylog.plugins.views.search.rest.ExecutionState; +import org.graylog.plugins.views.search.rest.SearchJobDTO; +import org.graylog.plugins.views.search.searchtypes.pivot.Pivot; +import org.graylog.plugins.views.search.searchtypes.pivot.PivotResult; +import org.graylog.plugins.views.search.searchtypes.pivot.buckets.Values; +import org.graylog.plugins.views.search.searchtypes.pivot.series.Count; +import org.graylog2.database.NotFoundException; +import org.graylog2.plugin.indexer.searches.timeranges.RelativeRange; +import org.graylog2.rest.models.system.inputs.responses.InputDiagnostics; +import org.graylog2.streams.StreamService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_INPUT; +import static org.graylog2.rest.models.system.inputs.responses.InputDiagnostics.EMPTY_DIAGNOSTICS; +import static org.graylog2.shared.utilities.StringUtils.f; + +public class InputDiagnosticService { + private static final Logger LOG = LoggerFactory.getLogger(InputDiagnosticService.class); + + private static final String QUERY_ID = "input_diagnostics_streams_query"; + private static final String PIVOT_ID = "input_diagnostics_streams_pivot"; + + private final SearchExecutor searchExecutor; + private final StreamService streamService; + + @Inject + public InputDiagnosticService(SearchExecutor searchExecutor, + StreamService streamService) { + this.searchExecutor = searchExecutor; + this.streamService = streamService; + } + + public InputDiagnostics getInputDiagnostics( + Input input, SearchUser searchUser) { + final Search search = buildSearch(input); + final SearchJob searchJob = searchExecutor.executeSync(search, searchUser, ExecutionState.empty()); + final SearchJobDTO searchJobDTO = SearchJobDTO.fromSearchJob(searchJob); + final QueryResult queryResult = searchJobDTO.results().get(QUERY_ID); + + final Set errors = queryResult.errors(); + if (errors != null && !errors.isEmpty()) { + String errorMsg = f("An error occurred while executing aggregation: %s", + errors.stream().map(SearchError::description).collect(Collectors.joining(", "))); + LOG.error(errorMsg); + throw new InternalServerErrorException(errorMsg); + } + + final SearchType.Result aggregationResult = queryResult.searchTypes().get(PIVOT_ID); + if (aggregationResult instanceof PivotResult pivotResult && pivotResult.total() > 0) { + final List> resultList = pivotResult.rows().stream() + .filter(row -> row.source().equals("leaf")) + .map(InputDiagnosticService::extractValues) + .toList(); + Map resultMap = new HashMap<>(); + resultList.forEach( + entry -> { + try { + final org.graylog2.plugin.streams.Stream stream = streamService.load(entry.getKey()); + resultMap.put(stream.getTitle(), entry.getValue()); + } catch (NotFoundException e) { + LOG.warn("Unable to load stream {}", entry.getKey(), e); + } + } + ); + return new InputDiagnostics(resultMap); + } + + return EMPTY_DIAGNOSTICS; + } + + private Search buildSearch(Input input) { + final SearchType searchType = Pivot.builder() + .id(PIVOT_ID) + .rollup(true) + .rowGroups(Values.builder().fields(List.of("streams")).build()) + .series(Count.builder().build()) + .build(); + return Search.builder() + .queries(ImmutableSet.of( + Query.builder() + .id(QUERY_ID) + .query(ElasticsearchQueryString.of(FIELD_GL2_SOURCE_INPUT + ":" + input.getId())) + .searchTypes(Collections.singleton(searchType)) + .timerange(RelativeRange.create(900)) + .build() + )) + .build(); + } + + private static AbstractMap.SimpleEntry extractValues(PivotResult.Row r) { + if (r.values().size() != 1) { + String errorMsg = f("Expected 1 value in aggregation result, but received [%d].", r.values().size()); + LOG.warn(errorMsg); + throw new InternalServerErrorException(errorMsg); + } + final String streamId = r.key().get(0); + if (StringUtils.isEmpty(streamId)) { + String errorMsg = "Unable to retrieve stream ID from query result"; + LOG.warn(errorMsg); + throw new InternalServerErrorException(errorMsg); + } + + final Long count = (Long) r.values().get(0).value(); + return new AbstractMap.SimpleEntry<>(streamId, count); + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/rest/models/system/inputs/responses/InputDiagnostics.java b/graylog2-server/src/main/java/org/graylog2/rest/models/system/inputs/responses/InputDiagnostics.java new file mode 100644 index 000000000000..e1944a6da96e --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/rest/models/system/inputs/responses/InputDiagnostics.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.rest.models.system.inputs.responses; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.Map; + +public record InputDiagnostics( + @JsonProperty("stream_message_count") Map streamMessageCount) { + public static InputDiagnostics EMPTY_DIAGNOSTICS = new InputDiagnostics(Collections.emptyMap()); +} + diff --git a/graylog2-server/src/main/java/org/graylog2/rest/resources/system/inputs/InputsResource.java b/graylog2-server/src/main/java/org/graylog2/rest/resources/system/inputs/InputsResource.java index aa0f2c95e759..442b4ac7eddd 100644 --- a/graylog2-server/src/main/java/org/graylog2/rest/resources/system/inputs/InputsResource.java +++ b/graylog2-server/src/main/java/org/graylog2/rest/resources/system/inputs/InputsResource.java @@ -22,12 +22,30 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import jakarta.inject.Inject; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; import org.apache.shiro.authz.annotation.RequiresAuthentication; import org.apache.shiro.authz.annotation.RequiresPermissions; +import org.graylog.plugins.views.search.permissions.SearchUser; import org.graylog2.Configuration; import org.graylog2.audit.AuditEventTypes; import org.graylog2.audit.jersey.AuditEvent; import org.graylog2.inputs.Input; +import org.graylog2.inputs.InputDiagnosticService; import org.graylog2.inputs.InputService; import org.graylog2.inputs.encryption.EncryptedInputConfigs; import org.graylog2.plugin.configuration.ConfigurationException; @@ -35,6 +53,7 @@ import org.graylog2.plugin.inputs.MessageInput; import org.graylog2.rest.models.system.inputs.requests.InputCreateRequest; import org.graylog2.rest.models.system.inputs.responses.InputCreated; +import org.graylog2.rest.models.system.inputs.responses.InputDiagnostics; import org.graylog2.rest.models.system.inputs.responses.InputSummary; import org.graylog2.rest.models.system.inputs.responses.InputsList; import org.graylog2.shared.inputs.MessageInputFactory; @@ -43,24 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Inject; - -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotNull; - -import jakarta.ws.rs.BadRequestException; -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.DELETE; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.NotFoundException; -import jakarta.ws.rs.POST; -import jakarta.ws.rs.PUT; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.PathParam; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; - import java.net.URI; import java.util.HashMap; import java.util.Locale; @@ -81,13 +82,15 @@ public class InputsResource extends AbstractInputsResource { private static final Logger LOG = LoggerFactory.getLogger(InputsResource.class); private final InputService inputService; + private final InputDiagnosticService inputDiagnosticService; private final MessageInputFactory messageInputFactory; private final Configuration config; @Inject - public InputsResource(InputService inputService, MessageInputFactory messageInputFactory, Configuration config) { + public InputsResource(InputService inputService, InputDiagnosticService inputDiagnosticService, MessageInputFactory messageInputFactory, Configuration config) { super(messageInputFactory.getAvailableInputs()); this.inputService = inputService; + this.inputDiagnosticService = inputDiagnosticService; this.messageInputFactory = messageInputFactory; this.config = config; } @@ -108,6 +111,20 @@ public InputSummary get(@ApiParam(name = "inputId", required = true) return getInputSummary(input); } + @GET + @Timed + @ApiOperation(value = "Get diagnostic information of a single input") + @Path("/diagnostics/{inputId}") + @ApiResponses(value = { + @ApiResponse(code = 404, message = "No such input.") + }) + public InputDiagnostics diagnostics(@ApiParam(name = "inputId", required = true) + @PathParam("inputId") String inputId, + @Context SearchUser searchUser) throws org.graylog2.database.NotFoundException { + checkPermission(RestPermissions.INPUTS_READ, inputId); + return inputDiagnosticService.getInputDiagnostics(inputService.find(inputId), searchUser); + } + @GET @Timed @ApiOperation(value = "Get all inputs") diff --git a/graylog2-server/src/test/java/org/graylog2/rest/resources/system/inputs/InputsResourceMaskingPasswordsTest.java b/graylog2-server/src/test/java/org/graylog2/rest/resources/system/inputs/InputsResourceMaskingPasswordsTest.java index fb9a25faa9e0..e6617c1d77db 100644 --- a/graylog2-server/src/test/java/org/graylog2/rest/resources/system/inputs/InputsResourceMaskingPasswordsTest.java +++ b/graylog2-server/src/test/java/org/graylog2/rest/resources/system/inputs/InputsResourceMaskingPasswordsTest.java @@ -22,6 +22,7 @@ import org.graylog2.Configuration; import org.graylog2.database.NotFoundException; import org.graylog2.inputs.Input; +import org.graylog2.inputs.InputDiagnosticService; import org.graylog2.inputs.InputService; import org.graylog2.plugin.configuration.ConfigurationRequest; import org.graylog2.plugin.configuration.fields.ConfigurationField; @@ -67,7 +68,7 @@ public class InputsResourceMaskingPasswordsTest { class InputsTestResource extends InputsResource { public InputsTestResource(InputService inputService, MessageInputFactory messageInputFactory) { - super(inputService, messageInputFactory, new Configuration()); + super(inputService, mock(InputDiagnosticService.class), messageInputFactory, new Configuration()); } @Override diff --git a/graylog2-server/src/test/java/org/graylog2/rest/resources/system/inputs/InputsResourceTest.java b/graylog2-server/src/test/java/org/graylog2/rest/resources/system/inputs/InputsResourceTest.java index b2f40b123f53..6a5ae2876f5e 100644 --- a/graylog2-server/src/test/java/org/graylog2/rest/resources/system/inputs/InputsResourceTest.java +++ b/graylog2-server/src/test/java/org/graylog2/rest/resources/system/inputs/InputsResourceTest.java @@ -20,6 +20,7 @@ import jakarta.ws.rs.BadRequestException; import org.graylog2.Configuration; import org.graylog2.configuration.HttpConfiguration; +import org.graylog2.inputs.InputDiagnosticService; import org.graylog2.inputs.InputService; import org.graylog2.plugin.database.users.User; import org.graylog2.plugin.inputs.MessageInput; @@ -116,7 +117,7 @@ static class InputsTestResource extends InputsResource { public InputsTestResource(InputService inputService, MessageInputFactory messageInputFactory, Configuration config) { - super(inputService, messageInputFactory, config); + super(inputService, mock(InputDiagnosticService.class), messageInputFactory, config); configuration = mock(HttpConfiguration.class); this.user = mock(User.class); lenient().when(user.getName()).thenReturn("foo");