Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add support for parallel processor #3280

Merged
merged 11 commits into from
Mar 10, 2020
12 changes: 12 additions & 0 deletions doc/processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,15 @@ public class CatchProcessor extends AbstractProcessor<CtCatch> {
}
}
```
## Parallel Processor

Lets assume you want to use multiple cores for your processor. Spoon provides a simple high-level API for this task.
Using the CatchProcessor from before create a `AbstractParallelProcessor`.

```java
Processor<CtCatch> parallelProcessor = new AbstractParallelProcessor<CtCatch>(
Arrays.asList(new CatchProcessor(), new CatchProcessor())) {};
```
Now you have the same processor behavior as before, but 2 parallel running processor.
You can upscale this pretty high, but keep in mind to not use more parallel processors than available cores for maximum speedup.
For more information about parallel processor and the API have a look in the [documentation](http://spoon.gforge.inria.fr/mvnsites/spoon-core/apidocs/spoon/processing/AbstractParallelProcessor.html).
137 changes: 137 additions & 0 deletions src/main/java/spoon/processing/AbstractParallelProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Copyright (C) 2006-2019 INRIA and contributors
*
* Spoon is available either under the terms of the MIT License (see LICENSE-MIT.txt) of the Cecill-C License (see LICENSE-CECILL-C.txt). You as the user are entitled to choose the terms under which to adopt Spoon.
*/
package spoon.processing;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;

import spoon.SpoonException;
import spoon.reflect.declaration.CtElement;

/**
* AbstractParallelProcessor allows using multiple threads for concurrent
* processing with {@link AbstractProcessor}.
*
* <b> This class should only be used if all processors do the same.</b>
* Otherwise the result may vary from the expected result. All processors <b>
* must </b> synchronize shared fields like Collections by themselves. Multiple
* constructors exist for different approaches creating this. You can create
* this processor with either a Iterable of processors or a Consumer.
*
* For creating and managing threads a {@link Executors#newFixedThreadPool()} is
* used. Creating more threads then cores can harm the performance. Using a
* different thread pool could increase the performance, but this class should
* be general usage. If you need better performance you may want to use an own
* class with different parallel approach.
*/
public abstract class AbstractParallelProcessor<E extends CtElement> extends AbstractProcessor<E> {

private ExecutorService service;
private ArrayBlockingQueue<Processor<E>> processorQueue;

/**
* Creates a new AbstractParallelProcessor from given iterable. The iterable is
* fully consumed. Giving an endless iterable of processors will result in
* errors. The processors must follow the guidelines given in the class
* description.
*
* @param processors iterable of processors.
* @throws IllegalArgumentException if size of iterable is less than 1.
*
*/
public AbstractParallelProcessor(Iterable<Processor<E>> processors) {
// added cast because constructors need int
int processorNumber = (int) StreamSupport.stream(processors.spliterator(), false).count();
processorQueue = new ArrayBlockingQueue<>(processorNumber);
processors.forEach(processorQueue::add);
service = Executors.newFixedThreadPool(processorNumber);
}

/**
* Creates a new AbstractParallelProcessor from given iterable. The processors
* must follow the guidelines given in the class description.
*
* @param processors iterable of processors.
* @param numberOfProcessors number consumed from the iterable added to the
* active processors.
* @throws SpoonException if iterable has less values then
* numberOfProcessors.
* @throws IllegalArgumentException if numberOfProcessors is less than 1.
*
*/
public AbstractParallelProcessor(Iterable<Processor<E>> processors, int numberOfProcessors) {
processorQueue = new ArrayBlockingQueue<>(numberOfProcessors);
service = Executors.newFixedThreadPool(numberOfProcessors);
Iterator<Processor<E>> it = processors.iterator();
for (int i = 0; i < numberOfProcessors; i++) {
if (!it.hasNext()) {
throw new SpoonException("not enough elements provided, iterable is already empty");
}
processorQueue.add(it.next());
}
}

/**
* Creates a new AbstractParallelProcessor from given consumer. The processors
* must follow the guidelines given in the class description.
*
* @param processFunction Represents an operation that accepts a single
* element E and returns no result.
* @param numberOfProcessors number of concurrent running processors.
* @throws IllegalArgumentException if numberOfProcessors is less than 1.
*/
public AbstractParallelProcessor(Consumer<E> processFunction, int numberOfProcessors) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat API!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

processorQueue = new ArrayBlockingQueue<>(numberOfProcessors);
for (int i = 0; i < numberOfProcessors; i++) {
processorQueue.add(new AbstractProcessor<E>() {
@Override
public void process(E element) {
processFunction.accept(element);
}
});
}
service = Executors.newFixedThreadPool(numberOfProcessors);
}

@Override
public final void process(E element) {
try {
Processor<E> currentProcessor = processorQueue.take();
service.execute(() -> {
try {
currentProcessor.process(element);
processorQueue.put(currentProcessor);
} catch (InterruptedException e) {
// because rethrow is not possible here.
Thread.currentThread().interrupt();
e.printStackTrace();
processorQueue.add(currentProcessor);
} catch (Exception e) {
// allows throwing exception, but keeping the processor in the queue
processorQueue.add(currentProcessor);
throw e;
}
});
} catch (InterruptedException e) {
// because rethrow is not possible here.
Thread.currentThread().interrupt();
e.printStackTrace();
}
}

/**
* Cleans the threadpool after processing.
*/
@Override
public void processingDone() {
service.shutdown();
super.processingDone();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/**
* Copyright (C) 2006-2019 INRIA and contributors
*
* Spoon is available either under the terms of the MIT License (see LICENSE-MIT.txt) of the Cecill-C License (see LICENSE-CECILL-C.txt). You as the user are entitled to choose the terms under which to adopt Spoon.
*/
package spoon.test.processing.processors;

import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import spoon.FluentLauncher;
import spoon.SpoonException;
import spoon.processing.AbstractParallelProcessor;
import spoon.processing.AbstractProcessor;
import spoon.processing.Processor;
import spoon.reflect.declaration.CtElement;

public class ParallelProcessorTest {
private static final String INPUT_FILES = "src/test/resources/deprecated/input";
@Rule
public TemporaryFolder folderFactory = new TemporaryFolder();

private AtomicReferenceArray<Integer> createCounter() {
Integer[] counter = new Integer[] { 0, 0, 0, 0 };
AtomicReferenceArray<Integer> atomicCounter = new AtomicReferenceArray<Integer>(counter);
return atomicCounter;
}

private Processor<CtElement> createProcessor(AtomicReferenceArray<Integer> atomicCounter, int digit) {
Processor<CtElement> processor = new AbstractProcessor<CtElement>() {
@Override
public void process(CtElement element) {
atomicCounter.getAndUpdate(digit, i -> i + 1);
}
};
return processor;
}

@Test
public void compareWithSingleThreaded1() throws IOException {
// contract: running with 4 processors parallel must produce the same result as
// single threaded processor.
// for testing this a simple processor counting visited nodes is used.

// create a countingArray for the concurrent processors.
AtomicReferenceArray<Integer> atomicCounter = createCounter();
// create processors
Processor<CtElement> p1 = createProcessor(atomicCounter, 0);
Processor<CtElement> p2 = createProcessor(atomicCounter, 1);
Processor<CtElement> p3 = createProcessor(atomicCounter, 2);
Processor<CtElement> p4 = createProcessor(atomicCounter, 3);

new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>(Arrays.asList(p1, p2, p3, p4)) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel();

AtomicInteger singleThreadCounter = new AtomicInteger(0);
new FluentLauncher().inputResource(INPUT_FILES).processor(new AbstractProcessor<CtElement>() {
@Override
public void process(CtElement element) {
singleThreadCounter.incrementAndGet();
}
}).noClasspath(true).outputDirectory(folderFactory.newFolder()).buildModel();

// after processing both |singleThreadCounter| == sum(|atomicCounter|) must be
// true.
// for checking this subtract each array value from the
// singleThreadCounter and check for == 0
for (int j = 0; j < atomicCounter.length(); j++) {
singleThreadCounter.set(singleThreadCounter.get() - atomicCounter.get(j));
}
assertTrue(singleThreadCounter.get() == 0);
}

@Test
public void compareWithSingleThreaded2() throws IOException {
// contract: a parallelProcessor with one thread must produce the same result as
// a normal processor.

AtomicReferenceArray<Integer> atomicCounter = createCounter();
Processor<CtElement> p1 = createProcessor(atomicCounter, 0);

new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>(Arrays.asList(p1)) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel();
AtomicInteger singleThreadCounter = new AtomicInteger(0);
new FluentLauncher().inputResource(INPUT_FILES).processor(new AbstractProcessor<CtElement>() {
@Override
public void process(CtElement element) {
singleThreadCounter.incrementAndGet();
}
}).noClasspath(true).outputDirectory(folderFactory.newFolder()).buildModel();
singleThreadCounter.set(singleThreadCounter.get() - atomicCounter.get(0));
assertTrue(singleThreadCounter.get() == 0);
}

@Test
public void consumerConstructorTest() throws IOException {
// contract: creating with consumer constructor must produces correct results.
// See other tests for explanation how the testing works.
AtomicReferenceArray<Integer> atomicCounter = createCounter();
new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>((e) -> atomicCounter.getAndUpdate(0, i -> i + 1), 4) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel();
AtomicInteger singleThreadCounter = new AtomicInteger(0);
new FluentLauncher().inputResource(INPUT_FILES).processor(new AbstractProcessor<CtElement>() {
@Override
public void process(CtElement element) {
singleThreadCounter.incrementAndGet();
}
}).noClasspath(true).outputDirectory(folderFactory.newFolder()).buildModel();
for (int j = 0; j < atomicCounter.length(); j++) {
singleThreadCounter.set(singleThreadCounter.get() - atomicCounter.get(j));
}
assertTrue(singleThreadCounter.get() == 0);
}

@Test
public void compareWithSingleThreaded3() throws IOException {
// contract: using an iterable with more elements than used should only use the
// given number. Result must be correct too.
// Here the iterable<Processor> has size 4 and only 3 are used.
AtomicReferenceArray<Integer> atomicCounter = createCounter();
Processor<CtElement> p1 = createProcessor(atomicCounter, 0);
Processor<CtElement> p2 = createProcessor(atomicCounter, 1);
Processor<CtElement> p3 = createProcessor(atomicCounter, 2);
Processor<CtElement> p4 = createProcessor(atomicCounter, 3);

new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>(Arrays.asList(p1, p2, p3, p4), 3) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel();
AtomicInteger singleThreadCounter = new AtomicInteger(0);
new FluentLauncher().inputResource(INPUT_FILES).processor(new AbstractProcessor<CtElement>() {
@Override
public void process(CtElement element) {
singleThreadCounter.incrementAndGet();
}
}).noClasspath(true).outputDirectory(folderFactory.newFolder()).buildModel();
for (int j = 0; j < atomicCounter.length(); j++) {
singleThreadCounter.set(singleThreadCounter.get() - atomicCounter.get(j));
}
assertTrue(singleThreadCounter.get() == 0);
// because only 3 are used
assertTrue(atomicCounter.get(3) == 0);
}

@Test
public void testSize() throws IOException {
// contract: a thread pool with size zero must not created.
AtomicReferenceArray<Integer> atomicCounter = createCounter();
Processor<CtElement> p1 = createProcessor(atomicCounter, 0);

assertThrows(IllegalArgumentException.class, () -> new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>(Arrays.asList(p1), 0) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel());
}

@Test
public void testSize2() throws IOException {
// contract: negative processor numbers must throw an exception.
AtomicReferenceArray<Integer> atomicCounter = createCounter();
Processor<CtElement> p1 = createProcessor(atomicCounter, 0);
assertThrows(IllegalArgumentException.class, () -> new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>(Arrays.asList(p1), -5) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel());
}

@Test
public void testSize3() throws IOException {
// contract: trying to consume more processor than provided must throw an
// exception.
assertThrows(SpoonException.class, () -> new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>(Collections.emptyList(), 1) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel());
}

@Test
public void testSize4() throws IOException {
// contract: trying to consume more processor than provided must throw an
// exception.
assertThrows(IllegalArgumentException.class, () -> new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>(Collections.emptyList()) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel());
}

@Test
public void testSize5() throws IOException {
// contract: a thread pool with size zero must not created.

assertThrows(IllegalArgumentException.class, () -> new FluentLauncher().inputResource(INPUT_FILES)
.processor(new AbstractParallelProcessor<CtElement>((v) -> v.toString(), 0) {
})
.noClasspath(true)
.outputDirectory(folderFactory.newFolder())
.buildModel());
}
}