Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Add pipeline framework to make parallel processing simpler #1077

Merged
merged 44 commits into from
Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
a6ba7bf
Add a basic pipeline framework.
ajsutton Mar 6, 2019
2a75bd9
Support introducing batches into the pipeline.
ajsutton Mar 6, 2019
1eecb21
Make buffer size configurable.
ajsutton Mar 6, 2019
fa06ace
Add async processing and parallel processing support to pipelines.
ajsutton Mar 7, 2019
7fa7c59
Extract common looping code.
ajsutton Mar 8, 2019
91b53ca
Introduce finalize step to make InputOutputStep more useful.
ajsutton Mar 8, 2019
a3970c1
Move AsyncProcessStage over to SingleStepStage.
ajsutton Mar 8, 2019
c9b23f1
Reduce duplication.
ajsutton Mar 8, 2019
2ed23fc
Add tests for flat map.
ajsutton Mar 8, 2019
2f2ce8a
Fix issue where output pipe from a parallel processing stage was clos…
ajsutton Mar 8, 2019
ea8e0c4
Support aborting pipelines.
ajsutton Mar 8, 2019
55ccbcc
Abort the pipeline if any stage throws an unhandled exception.
ajsutton Mar 8, 2019
6811215
Tidy up access levels.
ajsutton Mar 8, 2019
02e4aa1
Add some javadoc.
ajsutton Mar 8, 2019
ebf0feb
Exceptions cause the pipeline to abort quite violently so we can't as…
ajsutton Mar 8, 2019
be717e0
Add missing javadoc parameters.
ajsutton Mar 8, 2019
4b8c544
Don't use a static executor service.
ajsutton Mar 8, 2019
8181abb
Simplify test & fix spotless.
ajsutton Mar 8, 2019
7b9961f
Ignore PipelineBuilderTest to see if that's what's causing the build …
ajsutton Mar 8, 2019
6a78181
Try to find the one test case that's stalling.
ajsutton Mar 11, 2019
e3f0f84
Unignore some more tests.
ajsutton Mar 11, 2019
1e3a6b0
Unignore one more test.
ajsutton Mar 11, 2019
0a4f8cf
Unignore final test.
ajsutton Mar 11, 2019
e92fce2
Synchronize on overallFuture.
ajsutton Mar 11, 2019
f12e681
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 11, 2019
3515376
Add more logging.
ajsutton Mar 11, 2019
c9e671e
Remove unused executor.
ajsutton Mar 11, 2019
5f9899a
Don't wait forever for vertx services to shutdown. Log warnings if s…
ajsutton Mar 11, 2019
da999d6
Merge branch 'runner-fix' into pipeline-framework
ajsutton Mar 11, 2019
38b3b66
Revert "Add more logging."
ajsutton Mar 11, 2019
3d8c447
Rename PipelineSource to ProducingStage.
ajsutton Mar 11, 2019
088838d
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 11, 2019
ca65957
Fix race condition when aborting.
ajsutton Mar 11, 2019
803784e
Spotless.
ajsutton Mar 11, 2019
7c799c0
Add javadoc.
ajsutton Mar 11, 2019
824867b
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 11, 2019
dc87314
More clearly separate stages (which are runnable) from processors (wh…
ajsutton Mar 11, 2019
714d62c
Add a counter to track metrics for each stage output count.
ajsutton Mar 12, 2019
6b2cfb9
Put the metrics in the pipe not the stage.
ajsutton Mar 12, 2019
4cf7cca
Abort the pipeline when the returned future is cancelled.
ajsutton Mar 12, 2019
f8df301
Rename InputPipe to ReadPipe and OutputPipe to WritePipe to avoid the…
ajsutton Mar 12, 2019
9d0be66
Make Pipeline constructor package private.
ajsutton Mar 12, 2019
30d2893
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 12, 2019
5b336aa
Check batch size is greater than 0. Pipe's don't have remaining capa…
ajsutton Mar 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

public class NoOpMetricsSystem implements MetricsSystem {

private static final Counter NO_OP_COUNTER = new NoOpCounter();
public static final Counter NO_OP_COUNTER = new NoOpCounter();
private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0;
private static final OperationTimer NO_OP_TIMER = () -> NO_OP_TIMING_CONTEXT;
public static final LabelledMetric<OperationTimer> NO_OP_LABELLED_TIMER = label -> NO_OP_TIMER;
Expand Down
41 changes: 41 additions & 0 deletions services/pipeline/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

apply plugin: 'java-library'

jar {
baseName 'pantheon-pipeline'
manifest {
attributes(
'Specification-Title': baseName,
'Specification-Version': project.version,
'Implementation-Title': baseName,
'Implementation-Version': calculateVersion()
)
}
}

dependencies {
api project(':util')
implementation project(':metrics')

implementation 'org.apache.logging.log4j:log4j-api'
implementation 'com.google.guava:guava'

runtime 'org.apache.logging.log4j:log4j-core'

testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
testImplementation 'org.mockito:mockito-core'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

class AsyncOperationProcessor<I, O> implements Processor<I, O> {
private static final Logger LOG = LogManager.getLogger();
private final Function<I, CompletableFuture<O>> processor;
private final Collection<CompletableFuture<O>> inProgress;
private final int maxConcurrency;

public AsyncOperationProcessor(
final Function<I, CompletableFuture<O>> processor, final int maxConcurrency) {
this.processor = processor;
this.maxConcurrency = maxConcurrency;
this.inProgress = new ArrayList<>(maxConcurrency);
}

@Override
public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe) {
if (inProgress.size() < maxConcurrency) {
final I value = inputPipe.get();
if (value != null) {
final CompletableFuture<O> future = processor.apply(value);
// When the future completes, interrupt so if we're waiting for new input we wake up and
// schedule the output.
final Thread stageThread = Thread.currentThread();
future.whenComplete((result, error) -> stageThread.interrupt());
inProgress.add(future);
}

outputCompletedTasks(0, outputPipe);
} else {
outputNextCompletedTask(outputPipe);
}
}

@Override
public void finalize(final WritePipe<O> outputPipe) {
while (!inProgress.isEmpty()) {
outputNextCompletedTask(outputPipe);
}
}

private void outputNextCompletedTask(final WritePipe<O> outputPipe) {
try {
waitForAnyFutureToComplete();
outputCompletedTasks(1, outputPipe);
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for processing to complete", e);
} catch (final ExecutionException e) {
LOG.error("Processing failed and we don't handle exceptions properly yet", e);
} catch (final TimeoutException e) {
// Ignore and go back around the loop.
}
}

@SuppressWarnings("rawtypes")
private void waitForAnyFutureToComplete()
throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture.anyOf(inProgress.toArray(new CompletableFuture[0])).get(1, TimeUnit.SECONDS);
}

private void outputCompletedTasks(final int minTasksToOutput, final WritePipe<O> outputPipe) {
int outputTasks = 0;
for (final Iterator<CompletableFuture<O>> i = inProgress.iterator();
i.hasNext() && (outputTasks < minTasksToOutput || outputPipe.hasRemainingCapacity()); ) {
final CompletableFuture<O> process = i.next();
final O result = process.getNow(null);
if (result != null) {
outputPipe.put(result);
i.remove();
outputTasks++;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

import java.util.List;

class BatchingProcessor<T> implements Processor<T, List<T>> {

private final int maximumBatchSize;

public BatchingProcessor(final int maximumBatchSize) {
this.maximumBatchSize = maximumBatchSize;
}

@Override
public void processNextInput(final ReadPipe<T> inputPipe, final WritePipe<List<T>> outputPipe) {
final List<T> batch = inputPipe.getBatch(maximumBatchSize);
if (!batch.isEmpty()) {
outputPipe.put(batch);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

import tech.pegasys.pantheon.metrics.Counter;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

class CompleterStage<T> implements Runnable {
private final ReadPipe<T> input;
private final Consumer<T> completer;
private final Counter outputCounter;
private final CompletableFuture<?> future = new CompletableFuture<>();

CompleterStage(
final ReadPipe<T> input, final Consumer<T> completer, final Counter outputCounter) {
this.input = input;
this.completer = completer;
this.outputCounter = outputCounter;
}

@Override
public void run() {
while (input.hasMore()) {
final T value = input.get();
if (value != null) {
completer.accept(value);
outputCounter.inc();
}
}
future.complete(null);
}

public CompletableFuture<?> getFuture() {
return future;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Stream;

class FlatMapProcessor<I, O> implements Processor<I, O> {

private final Function<I, Stream<O>> mapper;

public FlatMapProcessor(final Function<I, Stream<O>> mapper) {
this.mapper = mapper;
}

@Override
public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe) {
final I value = inputPipe.get();
if (value != null) {
final Iterator<O> outputs = mapper.apply(value).iterator();
while (outputs.hasNext()) {
outputPipe.put(outputs.next());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

import java.util.Iterator;

class IteratorSourceStage<T> implements Runnable {
private final Iterator<T> source;
private final Pipe<T> pipe;

IteratorSourceStage(final Iterator<T> source, final Pipe<T> pipe) {
this.source = source;
this.pipe = pipe;
}

@Override
public void run() {
while (pipe.isOpen() && source.hasNext()) {
final T value = source.next();
if (value != null) {
pipe.put(value);
}
}
pipe.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;

import java.util.function.Function;

class MapProcessor<I, O> implements Processor<I, O> {

private final Function<I, O> processor;

public MapProcessor(final Function<I, O> processor) {
this.processor = processor;
}

@Override
public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe) {
final I value = inputPipe.get();
if (value != null) {
outputPipe.put(processor.apply(value));
}
}
}
Loading