diff --git a/src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java b/src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java new file mode 100644 index 00000000..a6e56054 --- /dev/null +++ b/src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java @@ -0,0 +1,65 @@ +package org.bsc.langgraph4j.async; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +public interface AsyncIterator extends Iterable { + + record Data(T data, boolean done) {} + + CompletableFuture> next(); + + default CompletableFuture fetchNext( final Consumer consumer ) { + return next().thenApply(data -> { + if (data.done) { + return false; + } + consumer.accept(data.data); + return true; + }) + .thenCompose( hasNext -> { + if (!hasNext) { + return completedFuture(null); + } + return fetchNext(consumer); + }); + } + default CompletableFuture forEachAsync( final Consumer consumer) { + return fetchNext(consumer); + } + + default Iterator iterator() { + return new Iterator<>() { + + private Data currentFetchedData; + @Override + public boolean hasNext() { + if( currentFetchedData != null && currentFetchedData.done ) { + return false; + } + currentFetchedData = AsyncIterator.this.next().join(); + return !currentFetchedData.done; + } + + @Override + public T next() { + if (currentFetchedData == null ) { + if( !hasNext() ) { + throw new NoSuchElementException("no elements into iterator"); + } + } + if( currentFetchedData.done ) { + throw new NoSuchElementException("no more elements into iterator"); + } + return currentFetchedData.data; + } + }; + } + + } + + diff --git a/src/test/java/org/bsc/langgraph4j/AsyncTest.java b/src/test/java/org/bsc/langgraph4j/AsyncTest.java new file mode 100644 index 00000000..c77b3a35 --- /dev/null +++ b/src/test/java/org/bsc/langgraph4j/AsyncTest.java @@ -0,0 +1,36 @@ +package org.bsc.langgraph4j; + +import org.bsc.langgraph4j.async.AsyncIterator; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class AsyncTest { + @Test + public void asyncIteratorTest() throws Exception { + + int[] myArray = {1, 2, 3, 4, 5}; + + final var it = new AsyncIterator() { + + private int cursor = 0; + @Override + public CompletableFuture> next() { + + if (cursor == myArray.length) { + return completedFuture(new Data<>(null, true) ); + } + + return completedFuture(new Data<>(myArray[cursor++], false)); + } + }; + + it.forEachAsync(System.out::println).get(); + + for (var i : it) { + System.out.println(i); + } + } +}