Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: text chunks stream to sentence stream examples #1595

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.smallrye.reactive:mutiny:2.6.0
package _03_composition_transformation;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public class _22_Multi_Chunks_To_Sentence_Stream {

public static void main(String[] args) throws InterruptedException {
System.out.println("⚡️ Chunks of text to sentence stream");

List<String> chunks = List.of(
"Hel",
"lo ",
"world\n",
"Foo",
" B",
"ar ",
"Baz\n");

StringBuilder builder = new StringBuilder();
Multi.createFrom().iterable(chunks)
.onItem().transformToUniAndConcatenate(chunk -> {
builder.append(chunk);
String current = builder.toString();
if (current.endsWith("\n")) {
builder.setLength(0);
return Uni.createFrom().item(current.substring(0, current.length() - 1));
} else {
return Uni.createFrom().nullItem();
}
})
.onItem().transformToUniAndConcatenate(line -> sendText(line))
.subscribe().with(
line -> System.out.println(">>> " + line),
Throwable::printStackTrace,
pool::shutdownNow);

}

static final ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

static Uni<String> sendText(String text) {
return Uni.createFrom().item(text)
.onItem().delayIt().onExecutor(pool).by(Duration.ofMillis(300))
.onItem().invoke(txt -> System.out.println("[sendText] " + txt));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.smallrye.reactive:mutiny:2.6.0
package _03_composition_transformation;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;

public class _22_Multi_Chunks_To_Sentence_Stream_Custom_Operator {

public static void main(String[] args) throws InterruptedException {
System.out.println("⚡️ Chunks of text to sentence stream (with a custom operator)");

List<String> chunks = List.of(
"Hel",
"lo ",
"world\n",
"Foo",
" B",
"ar ",
"Baz\n");

Multi.createFrom().iterable(chunks)
.plug(TokenToSentence::new)
.onItem().transformToUniAndConcatenate(line -> sendText(line))
.subscribe().with(
line -> System.out.println(">>> " + line),
Throwable::printStackTrace,
pool::shutdownNow);

}

static class TokenToSentence extends AbstractMultiOperator<String, String> {

public TokenToSentence(Multi<? extends String> upstream) {
super(upstream);
}

@Override
public void subscribe(MultiSubscriber<? super String> downstream) {
upstream.subscribe().withSubscriber(new TokenToSentenceProcessor(downstream));
}

static private class TokenToSentenceProcessor extends MultiOperatorProcessor<String, String> {

private final StringBuilder builder = new StringBuilder();

public TokenToSentenceProcessor(MultiSubscriber<? super String> downstream) {
super(downstream);
}

@Override
public void onItem(String chunk) {
builder.append(chunk);
String current = builder.toString();
if (current.endsWith("\n")) {
builder.setLength(0);
super.onItem(current.substring(0, current.length() - 1));
} else {
upstream.request(1L);
}
}
}
}

static final ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

static Uni<String> sendText(String text) {
return Uni.createFrom().item(text)
.onItem().delayIt().onExecutor(pool).by(Duration.ofMillis(300))
.onItem().invoke(txt -> System.out.println("[sendText] " + txt));
}
}
Loading