diff --git a/doc/processor.md b/doc/processor.md index 06f3274881e..7557bdebb0a 100644 --- a/doc/processor.md +++ b/doc/processor.md @@ -53,3 +53,15 @@ public class CatchProcessor extends AbstractProcessor { } } ``` +## Parallel Processor + +Lets assume you want to use multiple cores for your processor. Spoon provides a simple high-level API for this task. +Using the CatchProcessor from before create a `AbstractParallelProcessor`. + +```java + Processor parallelProcessor = new AbstractParallelProcessor( + Arrays.asList(new CatchProcessor(), new CatchProcessor())) {}; +``` +Now you have the same processor behavior as before, but 2 parallel running processor. +You can upscale this pretty high, but keep in mind to not use more parallel processors than available cores for maximum speedup. +For more information about parallel processor and the API have a look in the [documentation](http://spoon.gforge.inria.fr/mvnsites/spoon-core/apidocs/spoon/processing/AbstractParallelProcessor.html). \ No newline at end of file diff --git a/src/main/java/spoon/processing/AbstractParallelProcessor.java b/src/main/java/spoon/processing/AbstractParallelProcessor.java new file mode 100644 index 00000000000..5ade1a77db7 --- /dev/null +++ b/src/main/java/spoon/processing/AbstractParallelProcessor.java @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2006-2019 INRIA and contributors + * + * Spoon is available either under the terms of the MIT License (see LICENSE-MIT.txt) of the Cecill-C License (see LICENSE-CECILL-C.txt). You as the user are entitled to choose the terms under which to adopt Spoon. + */ +package spoon.processing; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import java.util.stream.StreamSupport; + +import spoon.SpoonException; +import spoon.reflect.declaration.CtElement; + +/** + * AbstractParallelProcessor allows using multiple threads for concurrent + * processing with {@link AbstractProcessor}. + * + * This class should only be used if all processors do the same. + * Otherwise the result may vary from the expected result. All processors + * must synchronize shared fields like Collections by themselves. Multiple + * constructors exist for different approaches creating this. You can create + * this processor with either a Iterable of processors or a Consumer. + * + * For creating and managing threads a {@link Executors#newFixedThreadPool()} is + * used. Creating more threads then cores can harm the performance. Using a + * different thread pool could increase the performance, but this class should + * be general usage. If you need better performance you may want to use an own + * class with different parallel approach. + */ +public abstract class AbstractParallelProcessor extends AbstractProcessor { + + private ExecutorService service; + private ArrayBlockingQueue> processorQueue; + + /** + * Creates a new AbstractParallelProcessor from given iterable. The iterable is + * fully consumed. Giving an endless iterable of processors will result in + * errors. The processors must follow the guidelines given in the class + * description. + * + * @param processors iterable of processors. + * @throws IllegalArgumentException if size of iterable is less than 1. + * + */ + public AbstractParallelProcessor(Iterable> processors) { + // added cast because constructors need int + int processorNumber = (int) StreamSupport.stream(processors.spliterator(), false).count(); + processorQueue = new ArrayBlockingQueue<>(processorNumber); + processors.forEach(processorQueue::add); + service = Executors.newFixedThreadPool(processorNumber); + } + + /** + * Creates a new AbstractParallelProcessor from given iterable. The processors + * must follow the guidelines given in the class description. + * + * @param processors iterable of processors. + * @param numberOfProcessors number consumed from the iterable added to the + * active processors. + * @throws SpoonException if iterable has less values then + * numberOfProcessors. + * @throws IllegalArgumentException if numberOfProcessors is less than 1. + * + */ + public AbstractParallelProcessor(Iterable> processors, int numberOfProcessors) { + processorQueue = new ArrayBlockingQueue<>(numberOfProcessors); + service = Executors.newFixedThreadPool(numberOfProcessors); + Iterator> it = processors.iterator(); + for (int i = 0; i < numberOfProcessors; i++) { + if (!it.hasNext()) { + throw new SpoonException("not enough elements provided, iterable is already empty"); + } + processorQueue.add(it.next()); + } + } + + /** + * Creates a new AbstractParallelProcessor from given consumer. The processors + * must follow the guidelines given in the class description. + * + * @param processFunction Represents an operation that accepts a single + * element E and returns no result. + * @param numberOfProcessors number of concurrent running processors. + * @throws IllegalArgumentException if numberOfProcessors is less than 1. + */ + public AbstractParallelProcessor(Consumer processFunction, int numberOfProcessors) { + processorQueue = new ArrayBlockingQueue<>(numberOfProcessors); + for (int i = 0; i < numberOfProcessors; i++) { + processorQueue.add(new AbstractProcessor() { + @Override + public void process(E element) { + processFunction.accept(element); + } + }); + } + service = Executors.newFixedThreadPool(numberOfProcessors); + } + + @Override + public final void process(E element) { + try { + Processor currentProcessor = processorQueue.take(); + service.execute(() -> { + try { + currentProcessor.process(element); + processorQueue.put(currentProcessor); + } catch (InterruptedException e) { + // because rethrow is not possible here. + Thread.currentThread().interrupt(); + e.printStackTrace(); + processorQueue.add(currentProcessor); + } catch (Exception e) { + // allows throwing exception, but keeping the processor in the queue + processorQueue.add(currentProcessor); + throw e; + } + }); + } catch (InterruptedException e) { + // because rethrow is not possible here. + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + + /** + * Cleans the threadpool after processing. + */ + @Override + public void processingDone() { + service.shutdown(); + super.processingDone(); + } +} diff --git a/src/test/java/spoon/test/processing/processors/ParallelProcessorTest.java b/src/test/java/spoon/test/processing/processors/ParallelProcessorTest.java new file mode 100644 index 00000000000..0bc7fe851b8 --- /dev/null +++ b/src/test/java/spoon/test/processing/processors/ParallelProcessorTest.java @@ -0,0 +1,231 @@ +/** + * Copyright (C) 2006-2019 INRIA and contributors + * + * Spoon is available either under the terms of the MIT License (see LICENSE-MIT.txt) of the Cecill-C License (see LICENSE-CECILL-C.txt). You as the user are entitled to choose the terms under which to adopt Spoon. + */ +package spoon.test.processing.processors; + +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import spoon.FluentLauncher; +import spoon.SpoonException; +import spoon.processing.AbstractParallelProcessor; +import spoon.processing.AbstractProcessor; +import spoon.processing.Processor; +import spoon.reflect.declaration.CtElement; + +public class ParallelProcessorTest { + private static final String INPUT_FILES = "src/test/resources/deprecated/input"; + @Rule + public TemporaryFolder folderFactory = new TemporaryFolder(); + + private AtomicReferenceArray createCounter() { + Integer[] counter = new Integer[] { 0, 0, 0, 0 }; + AtomicReferenceArray atomicCounter = new AtomicReferenceArray(counter); + return atomicCounter; + } + + private Processor createProcessor(AtomicReferenceArray atomicCounter, int digit) { + Processor processor = new AbstractProcessor() { + @Override + public void process(CtElement element) { + atomicCounter.getAndUpdate(digit, i -> i + 1); + } + }; + return processor; + } + + @Test + public void compareWithSingleThreaded1() throws IOException { + // contract: running with 4 processors parallel must produce the same result as + // single threaded processor. + // for testing this a simple processor counting visited nodes is used. + + // create a countingArray for the concurrent processors. + AtomicReferenceArray atomicCounter = createCounter(); + // create processors + Processor p1 = createProcessor(atomicCounter, 0); + Processor p2 = createProcessor(atomicCounter, 1); + Processor p3 = createProcessor(atomicCounter, 2); + Processor p4 = createProcessor(atomicCounter, 3); + + new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor(Arrays.asList(p1, p2, p3, p4)) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel(); + + AtomicInteger singleThreadCounter = new AtomicInteger(0); + new FluentLauncher().inputResource(INPUT_FILES).processor(new AbstractProcessor() { + @Override + public void process(CtElement element) { + singleThreadCounter.incrementAndGet(); + } + }).noClasspath(true).outputDirectory(folderFactory.newFolder()).buildModel(); + + // after processing both |singleThreadCounter| == sum(|atomicCounter|) must be + // true. + // for checking this subtract each array value from the + // singleThreadCounter and check for == 0 + for (int j = 0; j < atomicCounter.length(); j++) { + singleThreadCounter.set(singleThreadCounter.get() - atomicCounter.get(j)); + } + assertTrue(singleThreadCounter.get() == 0); + } + + @Test + public void compareWithSingleThreaded2() throws IOException { + // contract: a parallelProcessor with one thread must produce the same result as + // a normal processor. + + AtomicReferenceArray atomicCounter = createCounter(); + Processor p1 = createProcessor(atomicCounter, 0); + + new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor(Arrays.asList(p1)) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel(); + AtomicInteger singleThreadCounter = new AtomicInteger(0); + new FluentLauncher().inputResource(INPUT_FILES).processor(new AbstractProcessor() { + @Override + public void process(CtElement element) { + singleThreadCounter.incrementAndGet(); + } + }).noClasspath(true).outputDirectory(folderFactory.newFolder()).buildModel(); + singleThreadCounter.set(singleThreadCounter.get() - atomicCounter.get(0)); + assertTrue(singleThreadCounter.get() == 0); + } + + @Test + public void consumerConstructorTest() throws IOException { + // contract: creating with consumer constructor must produces correct results. + // See other tests for explanation how the testing works. + AtomicReferenceArray atomicCounter = createCounter(); + new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor((e) -> atomicCounter.getAndUpdate(0, i -> i + 1), 4) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel(); + AtomicInteger singleThreadCounter = new AtomicInteger(0); + new FluentLauncher().inputResource(INPUT_FILES).processor(new AbstractProcessor() { + @Override + public void process(CtElement element) { + singleThreadCounter.incrementAndGet(); + } + }).noClasspath(true).outputDirectory(folderFactory.newFolder()).buildModel(); + for (int j = 0; j < atomicCounter.length(); j++) { + singleThreadCounter.set(singleThreadCounter.get() - atomicCounter.get(j)); + } + assertTrue(singleThreadCounter.get() == 0); + } + + @Test + public void compareWithSingleThreaded3() throws IOException { + // contract: using an iterable with more elements than used should only use the + // given number. Result must be correct too. + // Here the iterable has size 4 and only 3 are used. + AtomicReferenceArray atomicCounter = createCounter(); + Processor p1 = createProcessor(atomicCounter, 0); + Processor p2 = createProcessor(atomicCounter, 1); + Processor p3 = createProcessor(atomicCounter, 2); + Processor p4 = createProcessor(atomicCounter, 3); + + new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor(Arrays.asList(p1, p2, p3, p4), 3) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel(); + AtomicInteger singleThreadCounter = new AtomicInteger(0); + new FluentLauncher().inputResource(INPUT_FILES).processor(new AbstractProcessor() { + @Override + public void process(CtElement element) { + singleThreadCounter.incrementAndGet(); + } + }).noClasspath(true).outputDirectory(folderFactory.newFolder()).buildModel(); + for (int j = 0; j < atomicCounter.length(); j++) { + singleThreadCounter.set(singleThreadCounter.get() - atomicCounter.get(j)); + } + assertTrue(singleThreadCounter.get() == 0); + // because only 3 are used + assertTrue(atomicCounter.get(3) == 0); + } + + @Test + public void testSize() throws IOException { + // contract: a thread pool with size zero must not created. + AtomicReferenceArray atomicCounter = createCounter(); + Processor p1 = createProcessor(atomicCounter, 0); + + assertThrows(IllegalArgumentException.class, () -> new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor(Arrays.asList(p1), 0) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel()); + } + + @Test + public void testSize2() throws IOException { + // contract: negative processor numbers must throw an exception. + AtomicReferenceArray atomicCounter = createCounter(); + Processor p1 = createProcessor(atomicCounter, 0); + assertThrows(IllegalArgumentException.class, () -> new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor(Arrays.asList(p1), -5) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel()); + } + + @Test + public void testSize3() throws IOException { + // contract: trying to consume more processor than provided must throw an + // exception. + assertThrows(SpoonException.class, () -> new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor(Collections.emptyList(), 1) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel()); + } + + @Test + public void testSize4() throws IOException { + // contract: trying to consume more processor than provided must throw an + // exception. + assertThrows(IllegalArgumentException.class, () -> new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor(Collections.emptyList()) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel()); + } + + @Test + public void testSize5() throws IOException { + // contract: a thread pool with size zero must not created. + + assertThrows(IllegalArgumentException.class, () -> new FluentLauncher().inputResource(INPUT_FILES) + .processor(new AbstractParallelProcessor((v) -> v.toString(), 0) { + }) + .noClasspath(true) + .outputDirectory(folderFactory.newFolder()) + .buildModel()); + } +}