Skip to content

Commit

Permalink
remove clock infavor of a LongSupplier to System::nanoTime
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis committed Oct 4, 2018
1 parent 4588782 commit 2714338
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.metrics.Metric;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
Expand All @@ -45,25 +44,26 @@ public class CompoundProcessor implements Processor {
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
private final Clock clock;
private final LongSupplier relativeTimeProvider;

CompoundProcessor(Clock clock, Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList(), clock);
CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
}

public CompoundProcessor(Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList());
}

public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
this(ignoreFailure, processors, onFailureProcessors, Clock.systemUTC());
this(ignoreFailure, processors, onFailureProcessors, System::nanoTime);
}
CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors, Clock clock) {
CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors,
LongSupplier relativeTimeProvider) {
super();
this.ignoreFailure = ignoreFailure;
this.processors = processors;
this.onFailureProcessors = onFailureProcessors;
this.clock = clock;
this.relativeTimeProvider = relativeTimeProvider;
this.processorsWithMetrics = new ArrayList<>(processors.size());
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric())));
}
Expand Down Expand Up @@ -117,7 +117,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
long startTimeInMillis = clock.millis();
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
if (processor.execute(ingestDocument) == null) {
Expand All @@ -138,7 +138,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
break;
}
} finally {
long ingestTimeInMillis = clock.millis() - startTimeInMillis;
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -56,29 +55,29 @@ public void testEmpty() throws Exception {
}

public void testSingleProcessor() throws Exception {
Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L, 1L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
TestProcessor processor = new TestProcessor(ingestDocument ->{
assertStats(0, ingestDocument.getFieldValue("compoundProcessor", CompoundProcessor.class), 1, 0, 0, 0);
});
CompoundProcessor compoundProcessor = new CompoundProcessor(clock, processor);
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
ingestDocument.setFieldValue("compoundProcessor", compoundProcessor); //ugly hack to assert current count = 1
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument);
verify(clock, times(2)).millis();
verify(relativeTimeProvider, times(2)).getAsLong();
assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 0, 1);

}

public void testSingleProcessorWithException() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(clock, processor);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
Expand All @@ -96,10 +95,10 @@ public void testSingleProcessorWithException() throws Exception {
public void testIgnoreFailure() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");});
Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor =
new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), clock);
new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 1, 0);
Expand All @@ -118,12 +117,12 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id"));
});

Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L, 1L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2), clock);
Collections.singletonList(processor2), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);
verify(clock, times(2)).millis();
verify(relativeTimeProvider, times(2)).getAsLong();

assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 1);
Expand All @@ -147,12 +146,12 @@ public void testSingleProcessorWithNestedFailures() throws Exception {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2"));
});
Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail),
Collections.singletonList(lastProcessor), clock);
Collections.singletonList(lastProcessor), relativeTimeProvider);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(compoundOnFailProcessor), clock);
Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);

assertThat(processorToFail.getInvokedCounter(), equalTo(1));
Expand All @@ -169,13 +168,13 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1"));
});
Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);

CompoundProcessor failCompoundProcessor = new CompoundProcessor(clock, firstProcessor);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor);

CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), clock);
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);

assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
Expand All @@ -195,13 +194,13 @@ public void testCompoundProcessorExceptionFail() throws Exception {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
});

Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(failProcessor), clock);
Collections.singletonList(failProcessor), relativeTimeProvider);

CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), clock);
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);

assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
Expand All @@ -221,13 +220,13 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
});

Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(new CompoundProcessor(clock, failProcessor)));
Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor)));

CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor), clock);
Collections.singletonList(secondProcessor), relativeTimeProvider);
compoundProcessor.execute(ingestDocument);

assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
Expand All @@ -239,10 +238,10 @@ public void testBreakOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");});
TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");});
TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {});
Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L);
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor),
Collections.singletonList(onFailureProcessor), clock);
Collections.singletonList(onFailureProcessor), relativeTimeProvider);
pipeline.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(0));
Expand Down

0 comments on commit 2714338

Please sign in to comment.