Skip to content

Commit

Permalink
Add acknowledgement support to aggregate processor (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#5139)

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments and added tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle errors

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed test errors by adding await

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed unnecessary API

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle error

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka authored Dec 20, 2024
1 parent 8bd39d3 commit 708843c
Showing 22 changed files with 874 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -587,7 +587,7 @@ public Builder<T> withTimeReceived(final Instant timeReceived) {
* @return returns the builder
* @since 2.10
*/
protected Builder<T> withEventHandle(final EventHandle eventHandle) {
public Builder<T> withEventHandle(final EventHandle eventHandle) {
this.eventHandle = eventHandle;
return this;
}
Original file line number Diff line number Diff line change
@@ -33,6 +33,15 @@ public interface Processor<InputRecord extends Record<?>, OutputRecord extends R
*/
void prepareForShutdown();

/**
* @since 2.11
* Indicates if the processor holds the events or not
* Holding events indicates that the events are not ready to be released.
*/
default boolean holdsEvents() {
return false;
}

/**
* @since 1.2
* Returns true if the Processor's internal state is safe to be shutdown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.processor;

import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ProcessorTest {

@Test
public void testDefault() {
Processor processor = mock(Processor.class);
when(processor.holdsEvents()).thenCallRealMethod();
assertThat(processor.holdsEvents(), equalTo(false));
}
}

Original file line number Diff line number Diff line change
@@ -347,7 +347,7 @@ private void shutdownExecutorService(final ExecutorService executorService, fina
* @param records records that needs to published to each sink
* @return List of Future, each future for each sink
*/
List<Future<Void>> publishToSinks(final Collection<Record> records) {
public List<Future<Void>> publishToSinks(final Collection<Record> records) {
final int sinksSize = sinks.size();
final List<Future<Void>> sinkFutures = new ArrayList<>(sinksSize);

Original file line number Diff line number Diff line change
@@ -137,7 +137,8 @@ private void doRun() {

try {
records = processor.execute(records);
if (inputEvents != null) {
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.atLeast;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.model.CheckpointState;
@@ -132,6 +134,61 @@ void testProcessWorkerHappyPathWithAcknowledgments() {
}
}

@Test
void testProcessWorkerWithProcessorsNotHoldingEvents() {
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
when(eventHandle.release(true)).thenReturn(true);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(false);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, atLeast(1)).release(true);
}


@Test
void testProcessWorkerWithProcessorsHoldingEvents() {
EventHandle eventHandle = mock(EventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(true);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);

processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, never()).release(true);
}

@Test
void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() {

1 change: 1 addition & 0 deletions data-prepper-plugins/aggregate-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ dependencies {
implementation project(':data-prepper-expression')
implementation project(':data-prepper-plugins:otel-proto-common')
implementation project(':data-prepper-plugins:otel-metrics-raw-processor')
testImplementation project(':data-prepper-core')
implementation libs.guava.core
implementation libs.commons.lang3
implementation libs.opentelemetry.proto
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Collections;

@@ -29,6 +30,14 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
return AggregateActionResponse.fromEvent(event);
}

/**
* indicates if the action holds the events or not
*
*/
default boolean holdsEvents() {
return false;
}

/**
* Concludes a group of Events
*
@@ -38,6 +47,12 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
* @since 1.3
*/
default AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}

Original file line number Diff line number Diff line change
@@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Map;
import java.util.function.Function;
import java.time.Duration;
@@ -28,6 +30,12 @@ public interface AggregateActionInput {
*/
Map<Object, Object> getIdentificationKeys();

/**
* @return returns eventHandle held by the instance
* @since 2.11
*/
EventHandle getEventHandle();

/**
* Sets custom shouldConclude function
*
Original file line number Diff line number Diff line change
@@ -5,6 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.AggregateEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.Event;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
@@ -19,13 +24,27 @@ class AggregateGroup implements AggregateActionInput {
private final Lock handleEventForGroupLock;
private final Map<Object, Object> identificationKeys;
private Function<Duration, Boolean> customShouldConclude;
private EventHandle eventHandle;

AggregateGroup(final Map<Object, Object> identificationKeys) {
this.groupState = new DefaultGroupState();
this.identificationKeys = identificationKeys;
this.groupStart = Instant.now();
this.concludeGroupLock = new ReentrantLock();
this.handleEventForGroupLock = new ReentrantLock();
this.eventHandle = new AggregateEventHandle(Instant.now());
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
}

public void attachToEventAcknowledgementSet(Event event) {
InternalEventHandle internalEventHandle;
EventHandle handle = event.getEventHandle();
internalEventHandle = (InternalEventHandle)(handle);
internalEventHandle.addEventHandle(eventHandle);
}

public GroupState getGroupState() {
@@ -63,5 +82,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) {
void resetGroup() {
groupStart = Instant.now();
groupState.clear();
this.eventHandle = new AggregateEventHandle(groupStart);
}
}
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKe
return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap()));
}


List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> getGroupsToConclude(final boolean forceConclude) {
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = new ArrayList<>();
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : allGroups.entrySet()) {
Original file line number Diff line number Diff line change
@@ -93,6 +93,12 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting);
}

AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) {
AggregateGroup aggregateGroup = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
aggregateGroup.attachToEventAcknowledgementSet(event);
return aggregateGroup;
}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final List<Record<Event>> recordsOut = new LinkedList<>();
@@ -124,7 +130,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
continue;
}
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap, event);

final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);

@@ -149,6 +155,11 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return recordsOut;
}

@Override
public boolean holdsEvents() {
return aggregateAction.holdsEvents();
}

public static long getTimeNanos(final Instant time) {
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano();
Original file line number Diff line number Diff line change
@@ -84,6 +84,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
return new AggregateActionOutput(List.of(event));
}
Original file line number Diff line number Diff line change
@@ -146,6 +146,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(groupState)
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
} else {
Integer countValue = (Integer)groupState.get(countKey);
@@ -168,6 +169,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withValue((double)countValue)
.withExemplars(List.of(exemplar))
.withAttributes(attr)
.withEventHandle(aggregateActionInput.getEventHandle())
.build(false);
event = (Event)sum;
}
Original file line number Diff line number Diff line change
@@ -225,6 +225,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(groupState)
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
} else {
List<Double> explicitBoundsList = new ArrayList<Double>();
@@ -262,6 +263,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withExplicitBoundsList(explicitBoundsList)
.withExemplars(exemplarList)
.withAttributes(attr)
.withEventHandle(aggregateActionInput.getEventHandle())
.build(false);
event = (Event)histogram;
}
Original file line number Diff line number Diff line change
@@ -47,4 +47,5 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}
return AggregateActionResponse.nullEventResponse();
}

}
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.withEventHandle(aggregateActionInput.getEventHandle())
.build();

return new AggregateActionOutput(List.of(event));
Original file line number Diff line number Diff line change
@@ -42,4 +42,5 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}
return new AggregateActionResponse(event);
}

}
Loading

0 comments on commit 708843c

Please sign in to comment.