Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failure metrics for input diagnostics #20933

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.graylog2.Configuration;
import org.graylog2.plugin.Message;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.util.Collection;
import java.util.List;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@
*/
package org.graylog.failure;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.graylog2.indexer.messages.Indexable;
import org.graylog2.indexer.messages.IndexingError;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.Map;

import static com.codahale.metrics.MetricRegistry.name;
import static org.graylog2.indexer.messages.IndexingError.Type.MappingError;
import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_INPUT;

/**
* A supplementary service layer, which is aimed to simplify failure
Expand All @@ -47,12 +53,20 @@ public class FailureSubmissionService {
private final FailureSubmissionQueue failureSubmissionQueue;
private final FailureHandlingConfiguration failureHandlingConfiguration;

private final MetricRegistry metricRegistry;
private final ObjectMapper objectMapper;
private final Meter dummyMeter = new Meter();

@Inject
public FailureSubmissionService(
FailureSubmissionQueue failureSubmissionQueue,
FailureHandlingConfiguration failureHandlingConfiguration) {
FailureHandlingConfiguration failureHandlingConfiguration,
MetricRegistry metricRegistry,
ObjectMapperProvider objectMapperProvider) {
this.failureSubmissionQueue = failureSubmissionQueue;
this.failureHandlingConfiguration = failureHandlingConfiguration;
this.metricRegistry = metricRegistry;
this.objectMapper = objectMapperProvider.get();
}

/**
Expand Down Expand Up @@ -91,6 +105,8 @@ private boolean submitProcessingErrorsInternal(Message message, List<Message.Pro
return true;
}

updateProcessingFailureMetric(message);
AntonEbel marked this conversation as resolved.
Show resolved Hide resolved

if (!message.supportsFailureHandling()) {
logger.warn("Submitted a message with processing errors, which doesn't support failure handling!");
return true;
Expand Down Expand Up @@ -144,6 +160,8 @@ private void submitProcessingFailure(Message failedMessage, Message.ProcessingEr
*/
public void submitIndexingErrors(Collection<IndexingError> indexingErrors) {
try {
indexingErrors.forEach(ie -> updateIndexingFailureMetric(ie.message()));

final FailureBatch fb = FailureBatch.indexingFailureBatch(
indexingErrors.stream()
.filter(ie -> {
Expand All @@ -155,7 +173,7 @@ public void submitIndexingErrors(Collection<IndexingError> indexingErrors) {
}
})
.map(this::fromIndexingError)
AntonEbel marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList()));
.toList());

if (fb.size() > 0) {
failureSubmissionQueue.submitBlocking(fb);
Expand All @@ -181,4 +199,21 @@ private IndexingFailure fromIndexingError(IndexingError indexingError) {
indexingError.index()
);
}

private void updateProcessingFailureMetric(Message message) {
Object inputId = message.getField(FIELD_GL2_SOURCE_INPUT);
if (inputId != null) {
final String indexingFailureMetricName = name("org.graylog2.inputs", inputId.toString(), "failures.processing");
metricRegistry.meter(indexingFailureMetricName).mark();
}
}

private void updateIndexingFailureMetric(Indexable message) {
final Map<String, Object> searchObject = message.toElasticSearchObject(objectMapper, dummyMeter);
Object inputId = searchObject.get(FIELD_GL2_SOURCE_INPUT);
if (inputId != null) {
final String indexingFailureMetricName = name("org.graylog2.inputs", inputId.toString(), "failures.indexing");
metricRegistry.meter(indexingFailureMetricName).mark();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,41 @@
*/
package org.graylog.failure;

import com.codahale.metrics.MetricRegistry;
import org.graylog2.indexer.messages.IndexingError;
import org.graylog2.plugin.Message;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.graylog2.indexer.messages.IndexingError.Type.MappingError;
import static org.graylog2.indexer.messages.IndexingError.Type.Unknown;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

public class FailureSubmissionServiceTest {
class FailureSubmissionServiceTest {

private final FailureSubmissionQueue failureSubmissionQueue = Mockito.mock(FailureSubmissionQueue.class);
private final FailureHandlingConfiguration failureHandlingConfiguration = Mockito.mock(FailureHandlingConfiguration.class);
private final FailureSubmissionService underTest = new FailureSubmissionService(failureSubmissionQueue, failureHandlingConfiguration);
private final FailureSubmissionQueue failureSubmissionQueue = mock(FailureSubmissionQueue.class);
private final FailureHandlingConfiguration failureHandlingConfiguration = mock(FailureHandlingConfiguration.class);
private final FailureSubmissionService underTest = new FailureSubmissionService(
failureSubmissionQueue, failureHandlingConfiguration, new MetricRegistry(), mock(ObjectMapperProvider.class));

private final ArgumentCaptor<FailureBatch> failureBatchCaptor = ArgumentCaptor.forClass(FailureBatch.class);

@Test
public void submitIndexingErrors_allIndexingErrorsTransformedAndSubmittedToFailureQueue() throws Exception {
void submitIndexingErrors_allIndexingErrorsTransformedAndSubmittedToFailureQueue() throws Exception {
// given
final Message msg1 = Mockito.mock(Message.class);
final Message msg1 = mock(Message.class);
when(msg1.getMessageId()).thenReturn("msg-1");
when(msg1.supportsFailureHandling()).thenReturn(true);
final Message msg2 = Mockito.mock(Message.class);
final Message msg2 = mock(Message.class);
when(msg2.getMessageId()).thenReturn("msg-2");
when(msg2.supportsFailureHandling()).thenReturn(true);

Expand Down Expand Up @@ -91,12 +94,12 @@ public void submitIndexingErrors_allIndexingErrorsTransformedAndSubmittedToFailu
}

@Test
public void submitIndexingErrors_messageNotSupportingFailureHandlingNotSubmittedToQueue() throws Exception {
void submitIndexingErrors_messageNotSupportingFailureHandlingNotSubmittedToQueue() {
// given
final Message msg1 = Mockito.mock(Message.class);
final Message msg1 = mock(Message.class);
when(msg1.getMessageId()).thenReturn("msg-1");
when(msg1.supportsFailureHandling()).thenReturn(false);
final Message msg2 = Mockito.mock(Message.class);
final Message msg2 = mock(Message.class);
when(msg2.getMessageId()).thenReturn("msg-2");
when(msg2.supportsFailureHandling()).thenReturn(false);

Expand All @@ -114,9 +117,9 @@ public void submitIndexingErrors_messageNotSupportingFailureHandlingNotSubmitted


@Test
public void submitProcessingErrors_allProcessingErrorsSubmittedToQueueAndMessageNotFilteredOut_ifSubmissionEnabledAndDuplicatesAreKept() throws Exception {
void submitProcessingErrors_allProcessingErrorsSubmittedToQueueAndMessageNotFilteredOut_ifSubmissionEnabledAndDuplicatesAreKept() throws Exception {
// given
final Message msg = Mockito.mock(Message.class);
final Message msg = mock(Message.class);
when(msg.getMessageId()).thenReturn("msg-x");
when(msg.supportsFailureHandling()).thenReturn(true);

Expand Down Expand Up @@ -171,9 +174,9 @@ public void submitProcessingErrors_allProcessingErrorsSubmittedToQueueAndMessage
}

@Test
public void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSubmissionEnabledAndDuplicatesAreKeptAndMessageDoesntSupportFailureHandling() throws Exception {
void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSubmissionEnabledAndDuplicatesAreKeptAndMessageDoesntSupportFailureHandling() {
// given
final Message msg = Mockito.mock(Message.class);
final Message msg = mock(Message.class);
when(msg.getMessageId()).thenReturn("msg-x");
when(msg.supportsFailureHandling()).thenReturn(false);

Expand All @@ -196,9 +199,9 @@ public void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSu


@Test
public void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSubmissionDisabledAndDuplicatesAreKept() throws Exception {
void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSubmissionDisabledAndDuplicatesAreKept() {
// given
final Message msg = Mockito.mock(Message.class);
final Message msg = mock(Message.class);
when(msg.getMessageId()).thenReturn("msg-x");
when(msg.supportsFailureHandling()).thenReturn(true);

Expand All @@ -221,9 +224,9 @@ public void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSu
}

@Test
public void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSubmissionDisabledAndDuplicatesAreNotKept() throws Exception {
void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSubmissionDisabledAndDuplicatesAreNotKept() {
// given
final Message msg = Mockito.mock(Message.class);
final Message msg = mock(Message.class);
when(msg.getMessageId()).thenReturn("msg-x");
when(msg.supportsFailureHandling()).thenReturn(true);

Expand All @@ -246,9 +249,9 @@ public void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifSu
}

@Test
public void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifMessageHasNoErrors() throws Exception {
void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifMessageHasNoErrors() {
// given
final Message msg = Mockito.mock(Message.class);
final Message msg = mock(Message.class);
when(msg.getMessageId()).thenReturn("msg-x");
when(msg.supportsFailureHandling()).thenReturn(true);
when(msg.processingErrors()).thenReturn(List.of());
Expand All @@ -268,9 +271,9 @@ public void submitProcessingErrors_nothingSubmittedAndMessageNotFilteredOut_ifMe


@Test
public void submitProcessingErrors_processingErrorSubmittedToQueueAndMessageFilteredOut_ifSubmissionEnabledAndDuplicatesAreNotKept() throws Exception {
void submitProcessingErrors_processingErrorSubmittedToQueueAndMessageFilteredOut_ifSubmissionEnabledAndDuplicatesAreNotKept() throws Exception {
// given
final Message msg = Mockito.mock(Message.class);
final Message msg = mock(Message.class);
when(msg.getMessageId()).thenReturn("msg-x");
when(msg.supportsFailureHandling()).thenReturn(true);
when(msg.processingErrors()).thenReturn(List.of(
Expand Down Expand Up @@ -309,9 +312,9 @@ public void submitProcessingErrors_processingErrorSubmittedToQueueAndMessageFilt
}

@Test
public void submitUnknownProcessingError_unknownProcessingErrorSubmittedToQueue() throws Exception {
void submitUnknownProcessingError_unknownProcessingErrorSubmittedToQueue() throws Exception {
// given
final Message msg = Mockito.mock(Message.class);
final Message msg = mock(Message.class);
when(msg.processingErrors()).thenReturn(List.of());
when(msg.supportsFailureHandling()).thenReturn(true);

Expand Down Expand Up @@ -346,9 +349,9 @@ public void submitUnknownProcessingError_unknownProcessingErrorSubmittedToQueue(

@Test
@DisplayName("Ensure Message#getId() is used as a fallback for Message#getMessageId()")
public void submitProcessingErrorWithIdButnoMessageId() throws Exception {
void submitProcessingErrorWithIdButnoMessageId() throws Exception {
// given
final Message msg = Mockito.mock(Message.class);
final Message msg = mock(Message.class);
when(msg.getId()).thenReturn("msg-uuid");
when(msg.processingErrors()).thenReturn(List.of());
when(msg.supportsFailureHandling()).thenReturn(true);
Expand Down
Loading