diff --git a/graylog2-server/src/main/java/org/graylog/events/processor/aggregation/PivotAggregationSearch.java b/graylog2-server/src/main/java/org/graylog/events/processor/aggregation/PivotAggregationSearch.java index 2b20bbfb974f..2d385fdce393 100644 --- a/graylog2-server/src/main/java/org/graylog/events/processor/aggregation/PivotAggregationSearch.java +++ b/graylog2-server/src/main/java/org/graylog/events/processor/aggregation/PivotAggregationSearch.java @@ -27,7 +27,6 @@ import org.graylog.events.configuration.EventsConfigurationProvider; import org.graylog.events.processor.EventDefinition; import org.graylog.events.processor.EventProcessorException; -import org.graylog.events.search.MoreSearch; import org.graylog.plugins.views.search.Filter; import org.graylog.plugins.views.search.ParameterProvider; import org.graylog.plugins.views.search.Query; @@ -56,8 +55,9 @@ import org.graylog.plugins.views.search.searchtypes.pivot.series.Count; import org.graylog2.notifications.Notification; import org.graylog2.notifications.NotificationService; +import org.graylog2.plugin.database.Persisted; import org.graylog2.plugin.indexer.searches.timeranges.TimeRange; -import org.graylog2.plugin.streams.Stream; +import org.graylog2.streams.StreamService; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.slf4j.Logger; @@ -99,10 +99,10 @@ public class PivotAggregationSearch implements AggregationSearch { private final QueryEngine queryEngine; private final EventsConfigurationProvider configurationProvider; private final EventDefinition eventDefinition; - private final MoreSearch moreSearch; private final PermittedStreams permittedStreams; private final NotificationService notificationService; private final QueryStringDecorators queryStringDecorators; + private final StreamService streamService; private final boolean isCloud; @Inject @@ -114,10 +114,10 @@ public PivotAggregationSearch(@Assisted AggregationEventProcessorConfig config, SearchJobService searchJobService, QueryEngine queryEngine, EventsConfigurationProvider configProvider, - MoreSearch moreSearch, PermittedStreams permittedStreams, NotificationService notificationService, QueryStringDecorators queryStringDecorators, + StreamService streamService, @Named("is_cloud") boolean isCloud) { this.config = config; this.parameters = parameters; @@ -127,10 +127,10 @@ public PivotAggregationSearch(@Assisted AggregationEventProcessorConfig config, this.searchJobService = searchJobService; this.queryEngine = queryEngine; this.configurationProvider = configProvider; - this.moreSearch = moreSearch; this.permittedStreams = permittedStreams; this.notificationService = notificationService; this.queryStringDecorators = queryStringDecorators; + this.streamService = streamService; this.isCloud = isCloud; } @@ -329,7 +329,7 @@ ImmutableList extractValues(PivotResult pivotResult) throw } // Safety guard against programming errors - if (row.key().size() == 0 || isNullOrEmpty(row.key().get(0))) { + if (row.key().isEmpty() || isNullOrEmpty(row.key().get(0))) { throw new EventProcessorException("Invalid row key! Expected at least the date range timestamp value: " + row.key().toString(), true, eventDefinition); } @@ -393,6 +393,10 @@ ImmutableList extractValues(PivotResult pivotResult) throw return results.build(); } + private ImmutableSet loadAllStreams() { + return permittedStreams.loadAllMessageStreams((streamId) -> true); + } + private SearchJob getSearchJob(AggregationEventProcessorParameters parameters, User user, long searchWithinMs, long executeEveryMs) throws EventProcessorException { final var username = user.name(); @@ -403,7 +407,7 @@ private SearchJob getSearchJob(AggregationEventProcessorParameters parameters, U // This adds all streams if none were provided // TODO: Once we introduce "EventProcessor owners" this should only load the permitted streams of the // user who created this EventProcessor. - search = search.addStreamsToQueriesWithoutStreams(() -> permittedStreams.loadAllMessageStreams((streamId) -> true)); + search = search.addStreamsToQueriesWithoutStreams(this::loadAllStreams); final SearchJob searchJob = queryEngine.execute(searchJobService.create(search, username, NO_CANCELLATION), Collections.emptySet(), user.timezone()); try { Uninterruptibles.getUninterruptibly( @@ -549,13 +553,14 @@ private Set getStreams(AggregationEventProcessorParameters parameters) { // TODO: How to take into consideration StreamPermissions here??? streamIds.addAll(permittedStreams.loadWithCategories(config.streamCategories(), (streamId) -> true)); } - final Set existingStreams = moreSearch.loadStreams(streamIds).stream() - .map(Stream::getId) + final Set existingStreams = streamService.loadByIds(streamIds) + .stream() + .map(Persisted::getId) .collect(toSet()); final Set nonExistingStreams = streamIds.stream() .filter(stream -> !existingStreams.contains(stream)) .collect(toSet()); - if (nonExistingStreams.size() != 0) { + if (!nonExistingStreams.isEmpty()) { LOG.warn("Removing non-existing streams <{}> from event definition <{}>/<{}>", nonExistingStreams, eventDefinition.id(), diff --git a/graylog2-server/src/main/java/org/graylog/events/search/MoreSearch.java b/graylog2-server/src/main/java/org/graylog/events/search/MoreSearch.java index 2ec81ce044ba..9240422212aa 100644 --- a/graylog2-server/src/main/java/org/graylog/events/search/MoreSearch.java +++ b/graylog2-server/src/main/java/org/graylog/events/search/MoreSearch.java @@ -26,7 +26,6 @@ import org.graylog.plugins.views.search.errors.EmptyParameterError; import org.graylog.plugins.views.search.errors.SearchException; import org.graylog.plugins.views.search.searchfilters.model.UsedSearchFilter; -import org.graylog2.database.NotFoundException; import org.graylog2.indexer.ranges.IndexRange; import org.graylog2.indexer.ranges.IndexRangeService; import org.graylog2.indexer.results.ResultMessage; @@ -37,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.SortedSet; @@ -112,7 +110,7 @@ private Set getAffectedIndices(Set streamIds, TimeRange timeRang .map(IndexRange::indexName) .collect(Collectors.toSet()); } else { - final Set streams = loadStreams(streamIds); + final Set streams = streamService.loadByIds(streamIds); final IndexRangeContainsOneOfStreams indexRangeContainsOneOfStreams = new IndexRangeContainsOneOfStreams(); return indexRanges.stream() .filter(ir -> indexRangeContainsOneOfStreams.test(ir, streams)) @@ -157,21 +155,6 @@ public void scrollQuery(String queryString, Set streams, List loadStreams(Set streamIds) { - // TODO: Use method from `StreamService` which loads a collection of ids (when implemented) to prevent n+1. - // Track https://github.com/Graylog2/graylog2-server/issues/4897 for progress. - Set streams = new HashSet<>(); - for (String streamId : streamIds) { - try { - Stream load = streamService.load(streamId); - streams.add(load); - } catch (NotFoundException e) { - LOG.debug("Failed to load stream <{}>", streamId); - } - } - return streams; - } - /** * Substitute query string parameters using {@link QueryStringDecorators}. */ diff --git a/graylog2-server/src/main/java/org/graylog/plugins/views/search/rest/PermittedStreams.java b/graylog2-server/src/main/java/org/graylog/plugins/views/search/rest/PermittedStreams.java index 716552b3107e..1722ad9dbbd9 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/views/search/rest/PermittedStreams.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/views/search/rest/PermittedStreams.java @@ -32,7 +32,6 @@ public class PermittedStreams { private final Supplier> allStreamsProvider; private final Function, Stream> streamCategoryMapper; - public PermittedStreams(Supplier> allStreamsProvider, Function, Stream> streamCategoryMapper) { this.allStreamsProvider = allStreamsProvider; this.streamCategoryMapper = streamCategoryMapper; diff --git a/graylog2-server/src/test/java/org/graylog/events/processor/aggregation/PivotAggregationSearchTest.java b/graylog2-server/src/test/java/org/graylog/events/processor/aggregation/PivotAggregationSearchTest.java index 3b02526049da..8f94a55d6b36 100644 --- a/graylog2-server/src/test/java/org/graylog/events/processor/aggregation/PivotAggregationSearchTest.java +++ b/graylog2-server/src/test/java/org/graylog/events/processor/aggregation/PivotAggregationSearchTest.java @@ -21,8 +21,8 @@ import org.assertj.core.api.Assertions; import org.graylog.events.EventsConfigurationTestProvider; import org.graylog.events.processor.EventDefinition; -import org.graylog.events.search.MoreSearch; import org.graylog.plugins.views.search.Query; +import org.graylog.plugins.views.search.SearchType; import org.graylog.plugins.views.search.ValueParameter; import org.graylog.plugins.views.search.db.SearchJobService; import org.graylog.plugins.views.search.elasticsearch.QueryStringDecorators; @@ -41,6 +41,7 @@ import org.graylog2.notifications.NotificationService; import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange; import org.graylog2.plugin.indexer.searches.timeranges.TimeRange; +import org.graylog2.streams.StreamService; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Duration; @@ -68,9 +69,9 @@ public class PivotAggregationSearchTest { @Mock private EventDefinition eventDefinition; @Mock - private MoreSearch moreSearch; - @Mock private NotificationService notificationService; + @Mock + private StreamService streamService; private final PermittedStreams permittedStreams = new PermittedStreams(Stream::of, (categories) -> Stream.of()); @@ -95,21 +96,7 @@ public void testExtractValuesWithGroupBy() throws Exception { .batchSize(500) .build(); - final PivotAggregationSearch pivotAggregationSearch = new PivotAggregationSearch( - config, - parameters, - new AggregationSearch.User("test", DateTimeZone.UTC), - eventDefinition, - Collections.emptyList(), - searchJobService, - queryEngine, - EventsConfigurationTestProvider.create(), - moreSearch, - permittedStreams, - notificationService, - new QueryStringDecorators(Optional.empty()), - false - ); + final PivotAggregationSearch pivotAggregationSearch = createPivotAggregationSearch(config, parameters); final String toString = timerange.getTo().toString(); final PivotResult pivotResult = PivotResult.builder() @@ -197,21 +184,7 @@ public void testExtractValuesWithoutGroupBy() throws Exception { .batchSize(500) .build(); - final PivotAggregationSearch pivotAggregationSearch = new PivotAggregationSearch( - config, - parameters, - new AggregationSearch.User("test", DateTimeZone.UTC), - eventDefinition, - Collections.emptyList(), - searchJobService, - queryEngine, - EventsConfigurationTestProvider.create(), - moreSearch, - permittedStreams, - notificationService, - new QueryStringDecorators(Optional.empty()), - false - ); + final PivotAggregationSearch pivotAggregationSearch = createPivotAggregationSearch(config, parameters); final PivotResult pivotResult = PivotResult.builder() .id("test") @@ -272,25 +245,15 @@ public void testExtractAdditionalSearchTypes() throws Exception { .batchSize(500) .build(); - final PivotAggregationSearch pivotAggregationSearch = new PivotAggregationSearch( + final PivotAggregationSearch pivotAggregationSearch = createPivotAggregationSearch( config, parameters, - new AggregationSearch.User("test", DateTimeZone.UTC), - eventDefinition, List.of(Pivot.builder() .id("risk-asset-1") .rowGroups(Values.builder().limit(10).field("Field").build()) .rollup(false) .series(Count.builder().build()) - .build()), - searchJobService, - queryEngine, - EventsConfigurationTestProvider.create(), - moreSearch, - permittedStreams, - notificationService, - new QueryStringDecorators(Optional.empty()), - false + .build()) ); final PivotResult pivotResult = PivotResult.builder() @@ -344,21 +307,7 @@ public void testExtractValuesWithNullValues() throws Exception { .batchSize(500) .build(); - final PivotAggregationSearch pivotAggregationSearch = new PivotAggregationSearch( - config, - parameters, - new AggregationSearch.User("test", DateTimeZone.UTC), - eventDefinition, - Collections.emptyList(), - searchJobService, - queryEngine, - EventsConfigurationTestProvider.create(), - moreSearch, - permittedStreams, - notificationService, - new QueryStringDecorators(Optional.empty()), - false - ); + final PivotAggregationSearch pivotAggregationSearch = createPivotAggregationSearch(config, parameters); final PivotResult pivotResult = PivotResult.builder() .id("test") @@ -483,26 +432,18 @@ public void testQueryParameterSubstitution() { .batchSize(500) .build(); - final PivotAggregationSearch pivotAggregationSearch = new PivotAggregationSearch( + final PivotAggregationSearch pivotAggregationSearch = createPivotAggregationSearch( config, parameters, - new AggregationSearch.User("test", DateTimeZone.UTC), - eventDefinition, Collections.emptyList(), - searchJobService, - queryEngine, - EventsConfigurationTestProvider.create(), - moreSearch, new PermittedStreams(() -> Stream.of("00001"), (categories) -> Stream.of()), - notificationService, new QueryStringDecorators(Optional.of((queryString, parameterProvider, query) -> { if (queryString.equals("source:$secret$") && parameterProvider.getParameter("secret").isPresent()) { return PositionTrackingQuery.of("source:example.org"); } else { throw new IllegalArgumentException("Unexpected query decoration request!"); } - })), - false + })) ); final Query query = pivotAggregationSearch.getAggregationQuery(parameters, WINDOW_LENGTH, WINDOW_LENGTH); Assertions.assertThat(query.query().queryString()).isEqualTo("source:example.org"); @@ -530,25 +471,16 @@ public void testAdditionalSearchTypes() { .batchSize(500) .build(); - final PivotAggregationSearch pivotAggregationSearch = new PivotAggregationSearch( + final PivotAggregationSearch pivotAggregationSearch = createPivotAggregationSearch( config, parameters, - new AggregationSearch.User("test", DateTimeZone.UTC), - eventDefinition, List.of(Pivot.builder() .id("risk-asset-1") .rowGroups(Values.builder().limit(10).field("Field").build()) .rollup(false) .series(Count.builder().build()) .build()), - searchJobService, - queryEngine, - EventsConfigurationTestProvider.create(), - moreSearch, - new PermittedStreams(() -> Stream.of("00001"), (categories) -> Stream.of()), - notificationService, - new QueryStringDecorators(Optional.empty()), - false + new PermittedStreams(() -> Stream.of("00001"), (categories) -> Stream.of()) ); final Query query = pivotAggregationSearch.getAggregationQuery(parameters, WINDOW_LENGTH, WINDOW_LENGTH); Assertions.assertThatCollection(query.searchTypes()).contains( @@ -560,4 +492,60 @@ public void testAdditionalSearchTypes() { .build()); } + + private PivotAggregationSearch createPivotAggregationSearch( + AggregationEventProcessorConfig config, + AggregationEventProcessorParameters parameters + ) { + return createPivotAggregationSearch(config, parameters, Collections.emptyList(), permittedStreams); + } + + private PivotAggregationSearch createPivotAggregationSearch( + AggregationEventProcessorConfig config, + AggregationEventProcessorParameters parameters, + PermittedStreams permittedStreams + ) { + return createPivotAggregationSearch(config, parameters, Collections.emptyList(), permittedStreams); + } + + private PivotAggregationSearch createPivotAggregationSearch( + AggregationEventProcessorConfig config, + AggregationEventProcessorParameters parameters, + List additionalSearchTypes + ) { + return createPivotAggregationSearch(config, parameters, additionalSearchTypes, permittedStreams); + } + + private PivotAggregationSearch createPivotAggregationSearch( + AggregationEventProcessorConfig config, + AggregationEventProcessorParameters parameters, + List additionalSearchTypes, + PermittedStreams permittedStreams + ) { + return createPivotAggregationSearch(config, parameters, additionalSearchTypes, permittedStreams, new QueryStringDecorators(Optional.empty())); + } + + private PivotAggregationSearch createPivotAggregationSearch( + AggregationEventProcessorConfig config, + AggregationEventProcessorParameters parameters, + List additionalSearchTypes, + PermittedStreams permittedStreams, + QueryStringDecorators queryStringDecorators + ) { + return new PivotAggregationSearch( + config, + parameters, + new AggregationSearch.User("test", DateTimeZone.UTC), + eventDefinition, + additionalSearchTypes, + searchJobService, + queryEngine, + EventsConfigurationTestProvider.create(), + permittedStreams, + notificationService, + queryStringDecorators, + streamService, + false + ); + } }