Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trimming down MoreSearch class. #21078

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.graylog.events.configuration.EventsConfigurationProvider;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.search.MoreSearch;
import org.graylog.plugins.views.search.Filter;
import org.graylog.plugins.views.search.ParameterProvider;
import org.graylog.plugins.views.search.Query;
Expand Down Expand Up @@ -56,8 +55,9 @@
import org.graylog.plugins.views.search.searchtypes.pivot.series.Count;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.database.Persisted;
import org.graylog2.plugin.indexer.searches.timeranges.TimeRange;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
Expand Down Expand Up @@ -99,10 +99,10 @@ public class PivotAggregationSearch implements AggregationSearch {
private final QueryEngine queryEngine;
private final EventsConfigurationProvider configurationProvider;
private final EventDefinition eventDefinition;
private final MoreSearch moreSearch;
private final PermittedStreams permittedStreams;
private final NotificationService notificationService;
private final QueryStringDecorators queryStringDecorators;
private final StreamService streamService;
private final boolean isCloud;

@Inject
Expand All @@ -114,10 +114,10 @@ public PivotAggregationSearch(@Assisted AggregationEventProcessorConfig config,
SearchJobService searchJobService,
QueryEngine queryEngine,
EventsConfigurationProvider configProvider,
MoreSearch moreSearch,
PermittedStreams permittedStreams,
NotificationService notificationService,
QueryStringDecorators queryStringDecorators,
StreamService streamService,
@Named("is_cloud") boolean isCloud) {
this.config = config;
this.parameters = parameters;
Expand All @@ -127,10 +127,10 @@ public PivotAggregationSearch(@Assisted AggregationEventProcessorConfig config,
this.searchJobService = searchJobService;
this.queryEngine = queryEngine;
this.configurationProvider = configProvider;
this.moreSearch = moreSearch;
this.permittedStreams = permittedStreams;
this.notificationService = notificationService;
this.queryStringDecorators = queryStringDecorators;
this.streamService = streamService;
this.isCloud = isCloud;
}

Expand Down Expand Up @@ -329,7 +329,7 @@ ImmutableList<AggregationKeyResult> extractValues(PivotResult pivotResult) throw
}

// Safety guard against programming errors
if (row.key().size() == 0 || isNullOrEmpty(row.key().get(0))) {
if (row.key().isEmpty() || isNullOrEmpty(row.key().get(0))) {
throw new EventProcessorException("Invalid row key! Expected at least the date range timestamp value: " + row.key().toString(), true, eventDefinition);
}

Expand Down Expand Up @@ -393,6 +393,10 @@ ImmutableList<AggregationKeyResult> extractValues(PivotResult pivotResult) throw
return results.build();
}

private ImmutableSet<String> loadAllStreams() {
return permittedStreams.loadAllMessageStreams((streamId) -> true);
patrickmann marked this conversation as resolved.
Show resolved Hide resolved
}

private SearchJob getSearchJob(AggregationEventProcessorParameters parameters, User user,
long searchWithinMs, long executeEveryMs) throws EventProcessorException {
final var username = user.name();
Expand All @@ -403,7 +407,7 @@ private SearchJob getSearchJob(AggregationEventProcessorParameters parameters, U
// This adds all streams if none were provided
// TODO: Once we introduce "EventProcessor owners" this should only load the permitted streams of the
// user who created this EventProcessor.
search = search.addStreamsToQueriesWithoutStreams(() -> permittedStreams.loadAllMessageStreams((streamId) -> true));
search = search.addStreamsToQueriesWithoutStreams(this::loadAllStreams);
final SearchJob searchJob = queryEngine.execute(searchJobService.create(search, username, NO_CANCELLATION), Collections.emptySet(), user.timezone());
try {
Uninterruptibles.getUninterruptibly(
Expand Down Expand Up @@ -549,13 +553,14 @@ private Set<String> getStreams(AggregationEventProcessorParameters parameters) {
// TODO: How to take into consideration StreamPermissions here???
streamIds.addAll(permittedStreams.loadWithCategories(config.streamCategories(), (streamId) -> true));
}
final Set<String> existingStreams = moreSearch.loadStreams(streamIds).stream()
.map(Stream::getId)
final Set<String> existingStreams = streamService.loadByIds(streamIds)
.stream()
.map(Persisted::getId)
.collect(toSet());
final Set<String> nonExistingStreams = streamIds.stream()
.filter(stream -> !existingStreams.contains(stream))
.collect(toSet());
if (nonExistingStreams.size() != 0) {
if (!nonExistingStreams.isEmpty()) {
LOG.warn("Removing non-existing streams <{}> from event definition <{}>/<{}>",
nonExistingStreams,
eventDefinition.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.graylog.plugins.views.search.errors.EmptyParameterError;
import org.graylog.plugins.views.search.errors.SearchException;
import org.graylog.plugins.views.search.searchfilters.model.UsedSearchFilter;
import org.graylog2.database.NotFoundException;
import org.graylog2.indexer.ranges.IndexRange;
import org.graylog2.indexer.ranges.IndexRangeService;
import org.graylog2.indexer.results.ResultMessage;
Expand All @@ -37,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
Expand Down Expand Up @@ -112,7 +110,7 @@ private Set<String> getAffectedIndices(Set<String> streamIds, TimeRange timeRang
.map(IndexRange::indexName)
.collect(Collectors.toSet());
} else {
final Set<Stream> streams = loadStreams(streamIds);
final Set<Stream> streams = streamService.loadByIds(streamIds);
final IndexRangeContainsOneOfStreams indexRangeContainsOneOfStreams = new IndexRangeContainsOneOfStreams();
return indexRanges.stream()
.filter(ir -> indexRangeContainsOneOfStreams.test(ir, streams))
Expand Down Expand Up @@ -157,21 +155,6 @@ public void scrollQuery(String queryString, Set<String> streams, List<UsedSearch
moreSearchAdapter.scrollEvents(queryString, timeRange, affectedIndices, streams, filters, batchSize, resultCallback::call);
}

public Set<Stream> loadStreams(Set<String> streamIds) {
// TODO: Use method from `StreamService` which loads a collection of ids (when implemented) to prevent n+1.
// Track https://github.com/Graylog2/graylog2-server/issues/4897 for progress.
Set<Stream> streams = new HashSet<>();
for (String streamId : streamIds) {
try {
Stream load = streamService.load(streamId);
streams.add(load);
} catch (NotFoundException e) {
LOG.debug("Failed to load stream <{}>", streamId);
}
}
return streams;
}

/**
* Substitute query string parameters using {@link QueryStringDecorators}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class PermittedStreams {
private final Supplier<Stream<String>> allStreamsProvider;
private final Function<Collection<String>, Stream<String>> streamCategoryMapper;


public PermittedStreams(Supplier<Stream<String>> allStreamsProvider, Function<Collection<String>, Stream<String>> streamCategoryMapper) {
this.allStreamsProvider = allStreamsProvider;
this.streamCategoryMapper = streamCategoryMapper;
Expand Down
Loading
Loading