-
Notifications
You must be signed in to change notification settings - Fork 130
Conversation
ParallelImportChainSegmentTask, an explicitly parallel re-implementation of PipelinedImportChainSegmentTask. Data is passed between stages via BlockingQueues. Pipeline stages are implemented in AbstractPipelinePeerTask and the parent task assembles and initiates the pipeline execution.
* Don't use deterministic scheduler, this depends on explicit parallelism * Turn off some problematic tests
propigate when an exception happens. * make ParallelImport... a regular AbstractEthTask * two more tests work.
…oader may stall for 10 seconds looking for an alternative target.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few suggestions made but generally LGTM.
private BlockingQueue<O> outboundQueue; | ||
private List<O> results; | ||
|
||
private boolean lameDuckMode = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we name this something a little more descriptive? Maybe just stopWhenInboundQueueEmpty
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lame duck is google server speak, keep processing but accept no new connections and then stop when you're done. Like a Lame Duck session in the US congress. But this is Java, shutdown
is better for the method and shuttingDown
for the var.
return outboundQueue; | ||
} | ||
|
||
public boolean isLameDuckMode() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be unused so can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return lameDuckMode; | ||
} | ||
|
||
public void setLameDuckMode(final boolean lameDuckMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming-wise this seems quite similar to Executor.shutdown
and may read better with that kind of action name rather than a setter style name - especially since we never need to setLameDuckMode(false)
} | ||
final int segmentLength = | ||
(int) (nextCheckpointHeader.getNumber() - previousCheckpointHeader.get().getNumber()) - 1; | ||
// body |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this comment here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not entirely sure. Gone.
result.get().completeExceptionally(e); | ||
return Optional.empty(); | ||
} | ||
headers.add(nextCheckpointHeader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't DownloadHeaderSequenceTask.endingAtHeader
include nextCheckpointHeader
as the last thing it retrieves? So would this wind up adding it twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
this.checkpointHeaders = | ||
new ArrayBlockingQueue<>(checkpointHeaders.size(), false, checkpointHeaders); | ||
// this.chunksInTotal = checkpointHeaders.size() - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}); | ||
|
||
} else { | ||
LOG.warn("Import task requested with no checkpoint headers."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically switch this around to use an early return (if (...) { LOG.warn(...); return;}
) rather than nesting the whole method inside the if.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this started out life as r and e responses, it got consumed by the cancelOnException lambda. I'll reform it so it's more clear.
|
||
@Override | ||
protected Optional<List<B>> processStep( | ||
final List<B> blocks, final Optional<List<B>> previousHeaders, final EthPeer peer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previousBlocks not previousHeaders?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
() -> { | ||
LockSupport.parkNanos(1000000); | ||
return !result.isDone(); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a serious aversion to hard coded waits in tests so would be good to bring in ajsutton@0c7a625 if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Use a blocking queue instead of fixed wait period.
PR description
ParallelImportChainSegmentTask, an explicitly parallel re-implementation
of PipelinedImportChainSegmentTask. Data is passed between stages
via BlockingQueues.
Pipeline stages are implemented in AbstractPipelinePeerTask and the
parent task assembles and initiates the pipeline execution.
Fixed Issue(s)
NC-2236