Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Add pipeline framework to make parallel processing simpler #1077

Merged
merged 44 commits into from
Mar 12, 2019

Conversation

ajsutton
Copy link
Contributor

@ajsutton ajsutton commented Mar 8, 2019

PR description

Adds a new pipeline module to provide a simple way of chaining processing that needs to happen in parallel. This will wind up being used in the WorldStateDownloader to improve its performance.

World state downloading will come out something like:

final Pipeline pipeline =
          createPipelineFrom(new TaskQueueIterator(newDownloadState), persistenceQueueCapacity)
              .thenProcess(this::checkForLocalData)
              .inBatches(hashCountPerRequest)
              .thenProcessAsync(
                  requestTasks -> sendRequestForData(requestTasks, header, newDownloadState),
                  maxOutstandingRequests)
              .thenProcess(tasks -> persistBatch(tasks, header, newDownloadState))
              .thenFlatMap(List::stream, persistenceQueueCapacity)
              .thenProcessInParallel(task -> enqueueChildren(task, header, newDownloadState), 3)
              .andFinishWith(task -> markAsCompleteOrFailed(header, newDownloadState, task));

ajsutton added 30 commits March 8, 2019 11:02
…ed before all threads finished processing their last entry.
…sume that earlier entries have completed processing by then.
…rvices take an unusually long time to start.
This reverts commit 3515376.
Copy link
Contributor

@mbaxter mbaxter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor questions / comments, but LGTM!

package tech.pegasys.pantheon.services.pipeline;

/**
* The end of the pipe that stages write their output to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The end of the pipe that stages write their output to.
* The start of the pipe that stages write their output to.

*
* @param <T> the type of output.
*/
public interface OutputPipe<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, a Pipe is composed of an OutputPipe and an InputPipe. Which means OutputPipe and InputPipe are really part of a Pipe not "pipes" themselves. I think the relationship might be clearer if OutputPipe became PipeInput and InputPipe became PipeOutput ... As confusing as that seems :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends which way you look at it. The place where pipes are actually held is mostly inside a stage where it reads inputs from it's InputPipe and writes outputs to it's OutputPipe. Admittedly it is a bit weird to think of a pipe as being made up of an output which leads to an input but most of the time we're looking at the pipe from the point of view of a stage which the naming works.

When I tried renaming I inevitably wound up with APIs like:
void processNextInput(PipeOutput inputPipe, PipeInput outputPipe)

So I've side-stepped the issue a bit and used ReadPipe and WritePipe. The variable names are unchanged so a stage has an API like:
void processNextInput(ReadPipe inputPipe, WritePipe outputPipe)

private final CompletableFuture<Void> overallFuture = new CompletableFuture<>();
private volatile List<Future<?>> futures;

public Pipeline(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this constructor be package-private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*/
public PipelineBuilder<List<T>> inBatches(final String stageName, final int maximumBatchSize) {
return addStage(
new BatchingProcessor<>(maximumBatchSize), bufferSize / maximumBatchSize + 1, stageName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to add an argument check on maximumBatchSize: > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable.

}

@Override
public boolean hasRemainingCapacity() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the pipe is not open, should this return 0 ??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also seems reasonable.

@ajsutton ajsutton merged commit bf1c861 into PegaSysEng:master Mar 12, 2019
@ajsutton ajsutton deleted the pipeline-framework branch March 12, 2019 20:45
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants