Skip to content

Commit

Permalink
feat: add AsyncIterator support
Browse files Browse the repository at this point in the history
experimental feature
  • Loading branch information
bsorrentino committed Mar 24, 2024
1 parent 261b537 commit ddac14d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
65 changes: 65 additions & 0 deletions src/main/java/org/bsc/langgraph4j/async/AsyncIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.bsc.langgraph4j.async;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import static java.util.concurrent.CompletableFuture.completedFuture;

public interface AsyncIterator<T> extends Iterable<T> {

record Data<T>(T data, boolean done) {}

CompletableFuture<Data<T>> next();

default CompletableFuture<Void> fetchNext( final Consumer<T> consumer ) {
return next().thenApply(data -> {
if (data.done) {
return false;
}
consumer.accept(data.data);
return true;
})
.thenCompose( hasNext -> {
if (!hasNext) {
return completedFuture(null);
}
return fetchNext(consumer);
});
}
default CompletableFuture<Void> forEachAsync( final Consumer<T> consumer) {
return fetchNext(consumer);
}

default Iterator<T> iterator() {
return new Iterator<>() {

private Data<T> currentFetchedData;
@Override
public boolean hasNext() {
if( currentFetchedData != null && currentFetchedData.done ) {
return false;
}
currentFetchedData = AsyncIterator.this.next().join();
return !currentFetchedData.done;
}

@Override
public T next() {
if (currentFetchedData == null ) {
if( !hasNext() ) {
throw new NoSuchElementException("no elements into iterator");
}
}
if( currentFetchedData.done ) {
throw new NoSuchElementException("no more elements into iterator");
}
return currentFetchedData.data;
}
};
}

}


36 changes: 36 additions & 0 deletions src/test/java/org/bsc/langgraph4j/AsyncTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.bsc.langgraph4j;

import org.bsc.langgraph4j.async.AsyncIterator;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;

import static java.util.concurrent.CompletableFuture.completedFuture;

public class AsyncTest {
@Test
public void asyncIteratorTest() throws Exception {

int[] myArray = {1, 2, 3, 4, 5};

final var it = new AsyncIterator<Integer>() {

private int cursor = 0;
@Override
public CompletableFuture<Data<Integer>> next() {

if (cursor == myArray.length) {
return completedFuture(new Data<>(null, true) );
}

return completedFuture(new Data<>(myArray[cursor++], false));
}
};

it.forEachAsync(System.out::println).get();

for (var i : it) {
System.out.println(i);
}
}
}

0 comments on commit ddac14d

Please sign in to comment.