Skip to content

Commit

Permalink
Trimming down MoreSearch class. (#21078)
Browse files Browse the repository at this point in the history
* Consolidate stream fetching, avoiding n+1.

* Reusing stream loading.
  • Loading branch information
dennisoelkers authored Dec 2, 2024
1 parent ef1ba80 commit 5cdcb0e
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.graylog.events.configuration.EventsConfigurationProvider;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.search.MoreSearch;
import org.graylog.plugins.views.search.Filter;
import org.graylog.plugins.views.search.ParameterProvider;
import org.graylog.plugins.views.search.Query;
Expand Down Expand Up @@ -56,8 +55,9 @@
import org.graylog.plugins.views.search.searchtypes.pivot.series.Count;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.database.Persisted;
import org.graylog2.plugin.indexer.searches.timeranges.TimeRange;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
Expand Down Expand Up @@ -99,10 +99,10 @@ public class PivotAggregationSearch implements AggregationSearch {
private final QueryEngine queryEngine;
private final EventsConfigurationProvider configurationProvider;
private final EventDefinition eventDefinition;
private final MoreSearch moreSearch;
private final PermittedStreams permittedStreams;
private final NotificationService notificationService;
private final QueryStringDecorators queryStringDecorators;
private final StreamService streamService;
private final boolean isCloud;

@Inject
Expand All @@ -114,10 +114,10 @@ public PivotAggregationSearch(@Assisted AggregationEventProcessorConfig config,
SearchJobService searchJobService,
QueryEngine queryEngine,
EventsConfigurationProvider configProvider,
MoreSearch moreSearch,
PermittedStreams permittedStreams,
NotificationService notificationService,
QueryStringDecorators queryStringDecorators,
StreamService streamService,
@Named("is_cloud") boolean isCloud) {
this.config = config;
this.parameters = parameters;
Expand All @@ -127,10 +127,10 @@ public PivotAggregationSearch(@Assisted AggregationEventProcessorConfig config,
this.searchJobService = searchJobService;
this.queryEngine = queryEngine;
this.configurationProvider = configProvider;
this.moreSearch = moreSearch;
this.permittedStreams = permittedStreams;
this.notificationService = notificationService;
this.queryStringDecorators = queryStringDecorators;
this.streamService = streamService;
this.isCloud = isCloud;
}

Expand Down Expand Up @@ -329,7 +329,7 @@ ImmutableList<AggregationKeyResult> extractValues(PivotResult pivotResult) throw
}

// Safety guard against programming errors
if (row.key().size() == 0 || isNullOrEmpty(row.key().get(0))) {
if (row.key().isEmpty() || isNullOrEmpty(row.key().get(0))) {
throw new EventProcessorException("Invalid row key! Expected at least the date range timestamp value: " + row.key().toString(), true, eventDefinition);
}

Expand Down Expand Up @@ -393,6 +393,10 @@ ImmutableList<AggregationKeyResult> extractValues(PivotResult pivotResult) throw
return results.build();
}

private ImmutableSet<String> loadAllStreams() {
return permittedStreams.loadAllMessageStreams((streamId) -> true);
}

private SearchJob getSearchJob(AggregationEventProcessorParameters parameters, User user,
long searchWithinMs, long executeEveryMs) throws EventProcessorException {
final var username = user.name();
Expand All @@ -403,7 +407,7 @@ private SearchJob getSearchJob(AggregationEventProcessorParameters parameters, U
// This adds all streams if none were provided
// TODO: Once we introduce "EventProcessor owners" this should only load the permitted streams of the
// user who created this EventProcessor.
search = search.addStreamsToQueriesWithoutStreams(() -> permittedStreams.loadAllMessageStreams((streamId) -> true));
search = search.addStreamsToQueriesWithoutStreams(this::loadAllStreams);
final SearchJob searchJob = queryEngine.execute(searchJobService.create(search, username, NO_CANCELLATION), Collections.emptySet(), user.timezone());
try {
Uninterruptibles.getUninterruptibly(
Expand Down Expand Up @@ -549,13 +553,14 @@ private Set<String> getStreams(AggregationEventProcessorParameters parameters) {
// TODO: How to take into consideration StreamPermissions here???
streamIds.addAll(permittedStreams.loadWithCategories(config.streamCategories(), (streamId) -> true));
}
final Set<String> existingStreams = moreSearch.loadStreams(streamIds).stream()
.map(Stream::getId)
final Set<String> existingStreams = streamService.loadByIds(streamIds)
.stream()
.map(Persisted::getId)
.collect(toSet());
final Set<String> nonExistingStreams = streamIds.stream()
.filter(stream -> !existingStreams.contains(stream))
.collect(toSet());
if (nonExistingStreams.size() != 0) {
if (!nonExistingStreams.isEmpty()) {
LOG.warn("Removing non-existing streams <{}> from event definition <{}>/<{}>",
nonExistingStreams,
eventDefinition.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.graylog.plugins.views.search.errors.EmptyParameterError;
import org.graylog.plugins.views.search.errors.SearchException;
import org.graylog.plugins.views.search.searchfilters.model.UsedSearchFilter;
import org.graylog2.database.NotFoundException;
import org.graylog2.indexer.ranges.IndexRange;
import org.graylog2.indexer.ranges.IndexRangeService;
import org.graylog2.indexer.results.ResultMessage;
Expand All @@ -37,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
Expand Down Expand Up @@ -112,7 +110,7 @@ private Set<String> getAffectedIndices(Set<String> streamIds, TimeRange timeRang
.map(IndexRange::indexName)
.collect(Collectors.toSet());
} else {
final Set<Stream> streams = loadStreams(streamIds);
final Set<Stream> streams = streamService.loadByIds(streamIds);
final IndexRangeContainsOneOfStreams indexRangeContainsOneOfStreams = new IndexRangeContainsOneOfStreams();
return indexRanges.stream()
.filter(ir -> indexRangeContainsOneOfStreams.test(ir, streams))
Expand Down Expand Up @@ -157,21 +155,6 @@ public void scrollQuery(String queryString, Set<String> streams, List<UsedSearch
moreSearchAdapter.scrollEvents(queryString, timeRange, affectedIndices, streams, filters, batchSize, resultCallback::call);
}

public Set<Stream> loadStreams(Set<String> streamIds) {
// TODO: Use method from `StreamService` which loads a collection of ids (when implemented) to prevent n+1.
// Track https://github.com/Graylog2/graylog2-server/issues/4897 for progress.
Set<Stream> streams = new HashSet<>();
for (String streamId : streamIds) {
try {
Stream load = streamService.load(streamId);
streams.add(load);
} catch (NotFoundException e) {
LOG.debug("Failed to load stream <{}>", streamId);
}
}
return streams;
}

/**
* Substitute query string parameters using {@link QueryStringDecorators}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class PermittedStreams {
private final Supplier<Stream<String>> allStreamsProvider;
private final Function<Collection<String>, Stream<String>> streamCategoryMapper;


public PermittedStreams(Supplier<Stream<String>> allStreamsProvider, Function<Collection<String>, Stream<String>> streamCategoryMapper) {
this.allStreamsProvider = allStreamsProvider;
this.streamCategoryMapper = streamCategoryMapper;
Expand Down
Loading

0 comments on commit 5cdcb0e

Please sign in to comment.