Skip to content

Commit

Permalink
stream: Added missing background mappers to fluent mapper builder.
Browse files Browse the repository at this point in the history
  • Loading branch information
kelemen committed Oct 1, 2023
1 parent b3e5354 commit d087fe9
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.jtrim2.stream;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jtrim2.executor.TaskExecutor;
import org.jtrim2.utils.ExceptionHelper;

final class ElementMappers {
public static <T> ElementMapper<T, T> identityMapper() {
Expand Down Expand Up @@ -234,6 +237,114 @@ public static <T, C extends Iterable<? extends T>> SeqGroupMapper<C, T> flatteni
};
}

public static <T, R> SeqGroupMapper<T, R> inBackgroundRetainSequencesSeqGroupMapper(
SeqGroupMapper<T, R> seqGroupMapper,
String executorName,
int queueSize
) {
Objects.requireNonNull(seqGroupMapper, "seqGroupMapper");
Objects.requireNonNull(executorName, "executorName");
ExceptionHelper.checkArgumentInRange(queueSize, 0, Integer.MAX_VALUE, "queueSize");

return (cancelToken, seqGroupProducer, seqGroupConsumer) -> {
SeqGroupProducer<? extends T> backgroundProducer = seqGroupProducer
.toFluent()
.toBackgroundRetainSequences(executorName, queueSize)
.unwrap();
seqGroupMapper.mapAll(cancelToken, backgroundProducer, seqGroupConsumer);
};
}

public static <T, R> SeqGroupMapper<T, R> inBackgroundRetainSequencesSeqGroupMapper(
SeqGroupMapper<T, R> seqGroupMapper,
ThreadFactory threadFactory,
int queueSize
) {
Objects.requireNonNull(seqGroupMapper, "seqGroupMapper");
Objects.requireNonNull(threadFactory, "threadFactory");
ExceptionHelper.checkArgumentInRange(queueSize, 0, Integer.MAX_VALUE, "queueSize");

return (cancelToken, seqGroupProducer, seqGroupConsumer) -> {
SeqGroupProducer<? extends T> backgroundProducer = seqGroupProducer
.toFluent()
.toBackgroundRetainSequences(threadFactory, queueSize)
.unwrap();
seqGroupMapper.mapAll(cancelToken, backgroundProducer, seqGroupConsumer);
};
}

public static <T, R> SeqGroupMapper<T, R> inBackgroundRetainSequencesSeqGroupMapper(
SeqGroupMapper<T, R> seqGroupMapper,
TaskExecutor executor,
int queueSize
) {
Objects.requireNonNull(seqGroupMapper, "seqGroupMapper");
Objects.requireNonNull(executor, "executor");
ExceptionHelper.checkArgumentInRange(queueSize, 0, Integer.MAX_VALUE, "queueSize");

return (cancelToken, seqGroupProducer, seqGroupConsumer) -> {
SeqGroupProducer<? extends T> backgroundProducer = seqGroupProducer
.toFluent()
.toBackgroundRetainSequences(executor, queueSize)
.unwrap();
seqGroupMapper.mapAll(cancelToken, backgroundProducer, seqGroupConsumer);
};
}

public static <T, R> SeqMapper<T, R> inBackgroundSeqMapper(
SeqMapper<T, R> seqMapper,
String executorName,
int queueSize
) {
Objects.requireNonNull(seqMapper, "seqMapper");
Objects.requireNonNull(executorName, "executorName");
ExceptionHelper.checkArgumentInRange(queueSize, 0, Integer.MAX_VALUE, "queueSize");

return (cancelToken, seqProducer, seqConsumer) -> {
SeqProducer<? extends T> backgroundProducer = seqProducer
.toFluent()
.toBackground(executorName, queueSize)
.unwrap();
seqMapper.mapAll(cancelToken, backgroundProducer, seqConsumer);
};
}

public static <T, R> SeqMapper<T, R> inBackgroundSeqMapper(
SeqMapper<T, R> seqMapper,
ThreadFactory threadFactory,
int queueSize
) {
Objects.requireNonNull(seqMapper, "seqMapper");
Objects.requireNonNull(threadFactory, "threadFactory");
ExceptionHelper.checkArgumentInRange(queueSize, 0, Integer.MAX_VALUE, "queueSize");

return (cancelToken, seqProducer, seqConsumer) -> {
SeqProducer<? extends T> backgroundProducer = seqProducer
.toFluent()
.toBackground(threadFactory, queueSize)
.unwrap();
seqMapper.mapAll(cancelToken, backgroundProducer, seqConsumer);
};
}

public static <T, R> SeqMapper<T, R> inBackgroundSeqMapper(
SeqMapper<T, R> seqMapper,
TaskExecutor executor,
int queueSize
) {
Objects.requireNonNull(seqMapper, "seqMapper");
Objects.requireNonNull(executor, "executor");
ExceptionHelper.checkArgumentInRange(queueSize, 0, Integer.MAX_VALUE, "queueSize");

return (cancelToken, seqProducer, seqConsumer) -> {
SeqProducer<? extends T> backgroundProducer = seqProducer
.toFluent()
.toBackground(executor, queueSize)
.unwrap();
seqMapper.mapAll(cancelToken, backgroundProducer, seqConsumer);
};
}

private ElementMappers() {
throw new AssertionError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,68 @@ public FluentSeqGroupMapper<T, R> inBackground(
return new ParallelSeqGroupMapper<>(executorRefProvider, consumerThreadCount, queueSize, wrapped).toFluent();
}

/**
* Returns a mapper mapping each sequences on a background thread. Each sequence will be processed on
* a separate thread the same way as done by the
* {@link FluentSeqGroupProducer#toBackgroundRetainSequences(String, int)} method.
*
* @param executorName the name given to the executor on which the mapper must run on. This name will
* appear in the name of the executing threads. This argument cannot be {@code null}.
* @param queueSize the number of extra elements to store aside from what the consumer threads
* are processing. That is, the threads are effectively act as part of the queue. So, the total
* outstanding elements are {@code consumerThreadCount + queueSize}. This argument must be
* greater than or equal to zero. Setting this argument to zero is often appropriate, but can be
* set to a higher value to reduce the downtime due to variance in producing and processing times.
* @return a mapper mapping each sequences on a background thread. The method never returns {@code null}.
*/
public FluentSeqGroupMapper<T, R> inBackgroundRetainSequences(
String executorName,
int queueSize
) {
return ElementMappers.inBackgroundRetainSequencesSeqGroupMapper(wrapped, executorName, queueSize).toFluent();
}

/**
* Returns a mapper mapping each sequences on a background thread. Each sequence will be processed on
* a separate thread the same way as done by the
* {@link FluentSeqGroupProducer#toBackgroundRetainSequences(ThreadFactory, int)} method.
*
* @param threadFactory the thread factory creating the threads on which the mapper must run on.
* This argument cannot be {@code null}.
* @param queueSize the number of extra elements to store aside from what the consumer threads
* are processing. That is, the threads are effectively act as part of the queue. So, the total
* outstanding elements are {@code consumerThreadCount + queueSize}. This argument must be
* greater than or equal to zero. Setting this argument to zero is often appropriate, but can be
* set to a higher value to reduce the downtime due to variance in producing and processing times.
* @return a mapper mapping each sequences on a background thread. The method never returns {@code null}.
*/
public FluentSeqGroupMapper<T, R> inBackgroundRetainSequences(
ThreadFactory threadFactory,
int queueSize
) {
return ElementMappers.inBackgroundRetainSequencesSeqGroupMapper(wrapped, threadFactory, queueSize).toFluent();
}

/**
* Returns a mapper mapping each sequences on a background thread. Each sequence will be processed on
* a separate thread the same way as done by the
* {@link FluentSeqGroupProducer#toBackgroundRetainSequences(TaskExecutor, int)} method.
*
* @param executor the executor on which the mapper must run on. This argument cannot be {@code null}.
* @param queueSize the number of extra elements to store aside from what the consumer threads
* are processing. That is, the threads are effectively act as part of the queue. So, the total
* outstanding elements are {@code consumerThreadCount + queueSize}. This argument must be
* greater than or equal to zero. Setting this argument to zero is often appropriate, but can be
* set to a higher value to reduce the downtime due to variance in producing and processing times.
* @return a mapper mapping each sequences on a background thread. The method never returns {@code null}.
*/
public FluentSeqGroupMapper<T, R> inBackgroundRetainSequences(
TaskExecutor executor,
int queueSize
) {
return ElementMappers.inBackgroundRetainSequencesSeqGroupMapper(wrapped, executor, queueSize).toFluent();
}

/**
* Returns a {@code FluentSeqGroupConsumer} doing nothing with the output of this mapper, but the mapping
* action taking place in the returned consumer.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.jtrim2.stream;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import org.jtrim2.executor.TaskExecutor;

/**
* Defines a convenient fluent style builder for mappers mapping a single
Expand Down Expand Up @@ -113,6 +115,65 @@ public <R2> FluentSeqMapper<T, R2> mapContextFree(ElementMapper<? super R, ? ext
return map(ElementMappers.contextFreeSeqMapper(mapper));
}

/**
* Returns a mapper mapping the elements on a background thread. That is, this method does the same thing
* as the {@link FluentSeqProducer#toBackground(String, int)} method.
*
* @param executorName the name given to the executor running the mapper tasks. This name will
* appear in the name of the executing thread. This argument cannot be {@code null}.
* @param queueSize the number of extra elements to store aside from what the consumer thread
* is processing. That is, the consumer thread effectively acts as part of the queue. So, the total
* outstanding elements are {@code queueSize + 1}. This argument must be greater than or equal to zero.
* Setting this argument to zero is often appropriate, but can be set to a higher value to reduce the
* downtime due to variance in producing and processing times.
* @return a mapper mapping the elements on a background thread. This method never returns {@code null}.
*/
public FluentSeqMapper<T, R> inBackground(
String executorName,
int queueSize
) {
return ElementMappers.inBackgroundSeqMapper(wrapped, executorName, queueSize).toFluent();
}

/**
* Returns a mapper mapping the elements on a background thread. That is, this method does the same thing
* as the {@link FluentSeqProducer#toBackground(ThreadFactory, int)} method.
*
* @param threadFactory the thread factory creating the threads running the mapper tasks.
* This argument cannot be {@code null}.
* @param queueSize the number of extra elements to store aside from what the consumer thread
* is processing. That is, the consumer thread effectively acts as part of the queue. So, the total
* outstanding elements are {@code queueSize + 1}. This argument must be greater than or equal to zero.
* Setting this argument to zero is often appropriate, but can be set to a higher value to reduce the
* downtime due to variance in producing and processing times.
* @return a mapper mapping the elements on a background thread. This method never returns {@code null}.
*/
public FluentSeqMapper<T, R> inBackground(
ThreadFactory threadFactory,
int queueSize
) {
return ElementMappers.inBackgroundSeqMapper(wrapped, threadFactory, queueSize).toFluent();
}

/**
* Returns a mapper mapping the elements on a background thread. That is, this method does the same thing
* as the {@link FluentSeqProducer#toBackground(TaskExecutor, int)} method.
*
* @param executor the executor running the mapper tasks. This argument cannot be {@code null}.
* @param queueSize the number of extra elements to store aside from what the consumer thread
* is processing. That is, the consumer thread effectively acts as part of the queue. So, the total
* outstanding elements are {@code queueSize + 1}. This argument must be greater than or equal to zero.
* Setting this argument to zero is often appropriate, but can be set to a higher value to reduce the
* downtime due to variance in producing and processing times.
* @return a mapper mapping the elements on a background thread. This method never returns {@code null}.
*/
public FluentSeqMapper<T, R> inBackground(
TaskExecutor executor,
int queueSize
) {
return ElementMappers.inBackgroundSeqMapper(wrapped, executor, queueSize).toFluent();
}

/**
* Returns a mapper mapping (potentially) multiple sequences using this mapper.
* That is, each sequence will be mapped by the same mapper.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ public void testMapContextFree() throws Exception {

private void testInBackground(
Function<FluentSeqGroupMapper<String, String>, FluentSeqGroupMapper<String, String>> inBackground,
Consumer<? super String> peekAction) throws Exception {

Consumer<? super String> peekAction,
List<List<String>> expected
) throws Exception {
SeqGroupMapper<String, String> src = SeqGroupMapper.oneToOneMapper((String e) -> e + "y");

AtomicReference<RuntimeException> testErrorRef = new AtomicReference<>();
Expand All @@ -230,16 +231,24 @@ private void testInBackground(
}))
.unwrap();

List<String> expected = Arrays.asList("ay", "by", "cy", "dy", "ey", "fy", "gy", "hy", "iy", "jy", "ky", "ly");
assertEquals(
Arrays.asList(expected),
expected,
collect(testSrc(), mapper)
);

assertEquals(12, peekCount.get());
verifyNoException(testErrorRef);
}

private void testInBackground(
Function<FluentSeqGroupMapper<String, String>, FluentSeqGroupMapper<String, String>> inBackground,
Consumer<? super String> peekAction
) throws Exception {
testInBackground(inBackground, peekAction, Arrays.asList(
Arrays.asList("ay", "by", "cy", "dy", "ey", "fy", "gy", "hy", "iy", "jy", "ky", "ly")
));
}

@Test(timeout = 10000)
public void testInBackgroundOwned() throws Exception {
String executorName = "Test-Executor-testInBackgroundOwned";
Expand Down Expand Up @@ -288,6 +297,66 @@ public void testInBackgroundExternal() throws Exception {
}
}

private void testInBackgroundRetainSequences(
Function<FluentSeqGroupMapper<String, String>, FluentSeqGroupMapper<String, String>> inBackground,
Consumer<? super String> peekAction
) throws Exception {
testInBackground(inBackground, peekAction, Arrays.asList(
Arrays.asList("ay", "by", "cy", "dy", "ey", "fy"),
Arrays.asList("gy", "hy", "iy", "jy", "ky", "ly"),
Arrays.asList()
));
}

@Test(timeout = 10000)
public void testInBackgroundRetainSequencesOwned() throws Exception {
String executorName = "Test-Executor-testInBackgroundRetainSequencesOwned";
testInBackgroundRetainSequences(
mapper -> mapper.inBackgroundRetainSequences(executorName, 0),
element -> {
String threadName = Thread.currentThread().getName();
if (!threadName.contains(executorName)) {
throw new IllegalStateException("Expected to run in background, but running in " + threadName);
}
}
);
}

@Test(timeout = 10000)
public void testInBackgroundRetainSequencesThreadFactory() throws Exception {
var threadFactory = new TestThreadFactory("Test-Executor-testInBackgroundThreadFactory");
testInBackgroundRetainSequences(
mapper -> mapper.inBackgroundRetainSequences(threadFactory, 0),
element -> {
if (!threadFactory.isExecutingInThis()) {
String threadName = Thread.currentThread().getName();
throw new IllegalStateException("Expected to run in background, but running in " + threadName);
}
}
);
}

@Test(timeout = 10000)
public void testInBackgroundRetainSequencesExternal() throws Exception {
SingleThreadedExecutor executor
= new SingleThreadedExecutor("Test-Executor-testInBackgroundRetainSequencesExternal");
try {
testInBackgroundRetainSequences(
mapper -> mapper.inBackgroundRetainSequences(executor, 0),
element -> {
if (!executor.isExecutingInThis()) {
String threadName = Thread.currentThread().getName();
throw new IllegalStateException("Expected to run in background, but running in "
+ threadName);
}
}
);
} finally {
executor.shutdownAndCancel();
executor.awaitTermination(Cancellation.UNCANCELABLE_TOKEN);
}
}

private static SeqGroupMapper<String, String> collectingMapper(List<List<String>> result) {
return (cancelToken, seqGroupProducer, seqGroupConsumer) -> {
seqGroupConsumer.consumeAll(cancelToken, (cancelToken2, seqConsumer) -> {
Expand Down
Loading

0 comments on commit d087fe9

Please sign in to comment.