Skip to content

Commit

Permalink
clarity is cing
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Jan 30, 2021
1 parent 37ad7b8 commit f181958
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.util;

import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.concurrency.VoidCallableNoException;
import java.util.Iterator;

/**
* A {@link ResourceIterator} that will execute its close function when {@link Iterator#hasNext}
* returns false for the first time.
*
* @param <T> type
*/
class AutoCloseIterator<T> extends AbstractIterator<T> implements ResourceIterator<T> {

private final ResourceIterator<T> internalIterator;
private final VoidCallableNoException onClose;

private boolean hasClosed;

public AutoCloseIterator(ResourceIterator<T> iterator) {
this.internalIterator = iterator;
this.onClose = VoidCallableNoException.fromVoidCallable(iterator::close);
}

@Override
protected T computeNext() {
if (internalIterator.hasNext()) {
return internalIterator.next();
} else {
if (!hasClosed) {
hasClosed = true;
onClose.call();
}
return endOfData();
}
}

@Override
public void close() throws Exception {
internalIterator.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,23 @@
import io.airbyte.commons.concurrency.VoidCallable;
import java.util.Iterator;

/**
* The canonical {@link ResourceIterator}. The default behavior guarantees that the provided close
* functional will be called no more than one time.
*
* @param <T> type
*/
class DefaultResourceIterator<T> extends AbstractIterator<T> implements ResourceIterator<T> {

private final Iterator<T> iterator;
private final VoidCallable closeable;
private final VoidCallable onClose;

public DefaultResourceIterator(Iterator<T> iterator, VoidCallable closeable) {
private boolean hasClosed;

public DefaultResourceIterator(Iterator<T> iterator, VoidCallable onClose) {
this.iterator = iterator;
this.closeable = closeable;
this.onClose = onClose;
this.hasClosed = false;
}

@Override
Expand All @@ -50,7 +58,10 @@ protected T computeNext() {

@Override
public void close() throws Exception {
closeable.call();
if (!hasClosed) {
hasClosed = true;
onClose.call();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.util;

import com.google.common.collect.AbstractIterator;
import java.util.Iterator;
import java.util.function.Supplier;

/**
* A {@link ResourceIterator} that calls the provided supplier the first time
* {@link Iterator#hasNext} is called.
*
* @param <T> type
*/
class LazyResourceIterator<T> extends AbstractIterator<T> implements ResourceIterator<T> {

private final Supplier<ResourceIterator<T>> iteratorSupplier;

private boolean hasSupplied;
private ResourceIterator<T> internalIterator;

public LazyResourceIterator(Supplier<ResourceIterator<T>> iteratorSupplier) {
this.iteratorSupplier = iteratorSupplier;
this.hasSupplied = false;
}

@Override
protected T computeNext() {
if (!hasSupplied) {
internalIterator = iteratorSupplier.get();
hasSupplied = true;
}

if (internalIterator.hasNext()) {
return internalIterator.next();
} else {
return endOfData();
}
}

@Override
public void close() throws Exception {
internalIterator.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.Iterator;

/**
* If you operate on this iterator, you better close it.
* If you operate on this iterator, you better close it. {@link ResourceIterator#close} must be
* idempotent. The contract on this interface is that it may be called MANY times.
*
* @param <T> type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@

package io.airbyte.commons.util;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.concurrency.VoidCallableNoException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -55,6 +53,74 @@ public static <T> ResourceIterator<T> resourceIterator(Iterator<T> iterator) {
return new DefaultResourceIterator<>(iterator, VoidCallable.NOOP);
}

/**
* Coerces a vanilla {@link Iterator} into a {@link ResourceIterator}. The provided {@param onClose}
* function will be called at most one time.
*
* @param iterator resource iterator to add another close to
* @param onClose the function that will be called on close
* @param <T> type
* @return new resource iterator with the close function appended
*/
public static <T> ResourceIterator<T> resourceIterator(Iterator<T> iterator, VoidCallableNoException onClose) {
return new DefaultResourceIterator<>(iterator, onClose::call);
}

/**
* Wraps a {@link Stream} in a {@link ResourceIterator}. When {@link ResourceIterator#close()} is
* called, {@link Stream#close()} will be called.
*
* @param stream stream to wrap
* @param <T> type
* @return resource iterator
*/
public static <T> ResourceIterator<T> resourceIterator(Stream<T> stream) {
return new DefaultResourceIterator<>(stream.iterator(), stream::close);
}

/**
* Returns a {@link ResourceIterator} that will call the provided supplier ONE time when
* {@link ResourceIterator#hasNext()} is called the first time. The supplier returns a stream that
* will be exposed as an iterator. When {@link ResourceIterator#close()} is called,
* {@link Stream#close()} will be called.
*
* @param iteratorSupplier supplier that provides a the resouce iterator that will be invoked lazily
* @param <T> type
* @return resource iterator
*/
public static <T> ResourceIterator<T> lazyResourceIterator(Supplier<ResourceIterator<T>> iteratorSupplier) {
return new LazyResourceIterator<>(iteratorSupplier);
}

/**
* Returns a {@link ResourceIterator} that will call {@link ResourceIterator#close()} as soon as the
* iterator returns hasNext() = false the first time. When {@link ResourceIterator#close()} is
* called, {@link Stream#close()} will be called.
*
* @param resourceIterator
* @param <T>
* @return
*/
public static <T> ResourceIterator<T> autoClosingResourceIterator(ResourceIterator<T> resourceIterator) {
return new AutoCloseIterator<>(resourceIterator);
}

/**
* Returns a {@link ResourceIterator} that will call the provided supplier ONE time when
* {@link ResourceIterator#hasNext()} is called the first time. The supplier returns a stream that
* will be exposes as an iterator. This stream will be closed
*
* @param streamSupplier supplies the stream this supplier will be called one time.
* @param <T> type
* @return resource iterator
*/
public static <T> ResourceIterator<T> lazyAutoClosingResourceIterator(Supplier<Stream<T>> streamSupplier) {
return autoClosingResourceIterator(lazyResourceIterator(() -> {
final Stream<T> stream = streamSupplier.get();
return resourceIterator(stream);
}));
}

/**
* Append a function to be called on {@link ResourceIterator#close}.
*
Expand Down Expand Up @@ -99,110 +165,23 @@ public static <T> ResourceIterator<T> transform(Function<ResourceIterator<T>, It
return new DefaultResourceIterator<>(iteratorCreator.apply(resourceIterator), resourceIterator::close);
}

@SuppressWarnings("unchecked")
public static <T> ResourceIterator<T> concat(ResourceIterator<T>... iterators) {
final AutoCloseIterator<T>[] autoCloseIterators = Arrays
.stream(iterators)
.map(iterator -> new AutoCloseIterator<>(iterator, VoidCallableNoException.fromVoidCallable(iterator::close)))
.toArray(AutoCloseIterator[]::new);

return new DefaultResourceIterator<>(Iterators.concat(autoCloseIterators), () -> {
for (ResourceIterator<T> iterator : iterators) {
iterator.close();
}
});
}

public static <T> ResourceIterator<T> concat(ResourceIterator<T> iterator1, ResourceIterator<T> iterator2) {
return concat(ImmutableList.of(iterator1, iterator2));
}

/**
* Concatenates {@link ResourceIterator}. Whenever one of the input iterators is completely consumed
* the {@link ResourceIterator#close()} will be called immediately. Calling
* {@link ResourceIterator#close()} on the output iterator will call
* {@link ResourceIterator#close()} on every input iterator. This means
* {@link ResourceIterator#close()} can be called multiple times on any of the input iterators.
* Concatenates {@link ResourceIterator}.
*
* @param iterators iterators to concatenate.
* @param <T> type
* @return concatenated iterator
*/
@SuppressWarnings("unchecked")
public static <T> ResourceIterator<T> concat(List<ResourceIterator<T>> iterators) {
final AutoCloseIterator<T>[] autoCloseIterators = iterators
.stream()
.map(iterator -> new AutoCloseIterator<>(iterator, VoidCallableNoException.fromVoidCallable(iterator::close)))
.toArray(AutoCloseIterator[]::new);

return new DefaultResourceIterator<>(Iterators.concat(autoCloseIterators), () -> {
return new DefaultResourceIterator<>(Iterators.concat(iterators.iterator()), () -> {
for (ResourceIterator<T> iterator : iterators) {
iterator.close();
}
});
}

public static <T> ResourceIterator<T> toResourceIterator(Supplier<Stream<T>> streamSupplier) {
return new LazyResourceIterator<>(() -> {
final Stream<T> stream = streamSupplier.get();
// todo problem that we will close this stream multiple times?
return new DefaultResourceIterator<>(stream.iterator(), stream::close);
});
}

private static class LazyResourceIterator<T> extends AbstractIterator<T> implements ResourceIterator<T> {

private final Supplier<ResourceIterator<T>> iteratorSupplier;

private boolean hasSupplied;
private ResourceIterator<T> internalIterator;

public LazyResourceIterator(Supplier<ResourceIterator<T>> iteratorSupplier) {
this.iteratorSupplier = iteratorSupplier;
this.hasSupplied = false;
}

@Override
protected T computeNext() {
if (!hasSupplied) {
internalIterator = iteratorSupplier.get();
hasSupplied = true;
}

if (internalIterator.hasNext()) {
return internalIterator.next();
} else {
return endOfData();
}
}

@Override
public void close() throws Exception {
internalIterator.close();
}

}

private static class AutoCloseIterator<T> extends AbstractIterator<T> implements Iterator<T> {

private final Iterator<T> internalIterator;
private final VoidCallableNoException onClose;

public AutoCloseIterator(Iterator<T> iterator, VoidCallableNoException onClose) {
this.internalIterator = iterator;
this.onClose = onClose;
}

@Override
protected T computeNext() {
if (internalIterator.hasNext()) {
return internalIterator.next();
} else {
onClose.call();
return endOfData();
}
}

public static <T> ResourceIterator<T> concat(ResourceIterator<T> iterator1, ResourceIterator<T> iterator2) {
return concat(ImmutableList.of(iterator1, iterator2));
}

}
Loading

0 comments on commit f181958

Please sign in to comment.