Skip to content

Commit

Permalink
CellIterator: add "lastContributionTimestamp" to OSMEntitySnapshot & …
Browse files Browse the repository at this point in the history
…decouple from Grid implementation (#495)
  • Loading branch information
tyrasd authored Apr 6, 2023
1 parent 06a5037 commit cbb831d
Show file tree
Hide file tree
Showing 25 changed files with 1,127 additions and 1,046 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
Changelog
=========

## 1.2.0-SNAPSHOT (current master)

### new features

* `OSMEntitySnapshot` now also returns the `lastContributionTimestamp` for each snapshot ([#495])

### other changes

* `CellIterator` is now decoupled from implementation of the "Grid" ([#495])

[#495]: https://github.com/GIScience/oshdb/pull/495


## 1.1.1

* update ignite dependency to [2.14.0-heigit1] ([#491])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
Expand All @@ -40,6 +39,7 @@
import org.heigit.ohsome.oshdb.util.CellId;
import org.heigit.ohsome.oshdb.util.TableNames;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.exceptions.OSHDBTimeoutException;
import org.heigit.ohsome.oshdb.util.function.SerializableBiFunction;
import org.heigit.ohsome.oshdb.util.function.SerializableBinaryOperator;
Expand Down Expand Up @@ -139,8 +139,8 @@ private static <T> T asyncGetHandleTimeouts(IgniteFuture<T> async, Long timeout)
// When a timeout happens remotely, the exception might be burried in (few) layers of
// "ignite exceptions". This recursively unwinds these and throws the original exception.
Throwable unwrapped = unwrapNestedIgniteException(e);
if (unwrapped instanceof OSHDBTimeoutException) {
throw (OSHDBTimeoutException) unwrapped;
if (unwrapped instanceof OSHDBTimeoutException timeoutException) {
throw timeoutException;
} else {
throw e;
}
Expand All @@ -150,8 +150,8 @@ private static <T> T asyncGetHandleTimeouts(IgniteFuture<T> async, Long timeout)
/** Recursively unwinds nested ignite exceptions. */
private static Throwable unwrapNestedIgniteException(IgniteException e) {
Throwable cause = e.getCause();
if (cause instanceof IgniteException && e != cause) {
return unwrapNestedIgniteException((IgniteException) cause);
if (cause instanceof IgniteException igniteException && e != cause) {
return unwrapNestedIgniteException(igniteException);
}
return cause;
}
Expand Down Expand Up @@ -193,13 +193,13 @@ private <S> S reduce(
.mapToObj(cellLongId -> asyncGetHandleTimeouts(
compute.affinityCallAsync(cacheName, cellLongId, () -> {
@SuppressWarnings("SerializableStoresNonSerializable")
GridOSHEntity oshEntityCell = cache.localPeek(cellLongId);
GridOSHEntity cell = cache.localPeek(cellLongId);
S ret;
if (oshEntityCell == null) {
if (cell == null) {
ret = identitySupplier.get();

} else {
ret = cellProcessor.apply(oshEntityCell, cellIterator);
ret = cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator);
}
onClose.run();
return ret;
Expand Down Expand Up @@ -263,19 +263,19 @@ cacheName, cellIdRangeToCellIds(), cellIdRanges, cellProcessor, cellIterator
this.timeout
).stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
.toList();
Collections.shuffle(cellsWithData);
Stream<X> resultForType = cellsWithData.parallelStream()
.filter(ignored -> this.isActive())
.map(cellLongId -> asyncGetHandleTimeouts(
compute.affinityCallAsync(cacheName, cellLongId, () -> {
GridOSHEntity oshEntityCell = cache.localPeek(cellLongId);
GridOSHEntity cell = cache.localPeek(cellLongId);
Collection<X> ret;
if (oshEntityCell == null) {
if (cell == null) {
ret = Collections.<X>emptyList();
} else {
ret = cellProcessor.apply(oshEntityCell, cellIterator)
.collect(Collectors.toList());
ret = cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator)
.toList();
}
onClose.run();
return ret;
Expand Down Expand Up @@ -444,10 +444,11 @@ public Collection<Long> call() {
// test if cell exists and contains any relevant data
GridOSHEntity cell = localCache.localPeek(cellLongId);
return cell != null
&& cellProcessor.apply(cell, cellIterator).anyMatch(ignored -> true);
&& cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator)
.anyMatch(ignored -> true);
})
.boxed()
.collect(Collectors.toList());
.toList();
}
}

Expand Down Expand Up @@ -488,13 +489,14 @@ public Collection<Long> call() {
MapReducerIgniteScanQuery.cellKeyInRange(key, cellIdRangesByLevel)
).setPartition(part), cacheEntry -> {
Object data = cacheEntry.getValue();
GridOSHEntity oshEntityCell;
if (data instanceof BinaryObject) {
oshEntityCell = ((BinaryObject) data).deserialize();
GridOSHEntity cell;
if (data instanceof BinaryObject binaryData) {
cell = binaryData.deserialize();
} else {
oshEntityCell = (GridOSHEntity) data;
cell = (GridOSHEntity) data;
}
Stream<?> cellStream = cellProcessor.apply(oshEntityCell, this.cellIterator);
Stream<?> cellStream = cellProcessor.apply(
OSHEntitySource.fromGridOSHEntity(cell), this.cellIterator);
if (cellStream.anyMatch(ignored -> true)) {
return Optional.of(cacheEntry.getKey());
} else {
Expand All @@ -510,7 +512,7 @@ public Collection<Long> call() {
}
})
.flatMap(Collection::stream)
.collect(Collectors.toList());
.toList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.heigit.ohsome.oshdb.util.CellId;
import org.heigit.ohsome.oshdb.util.TableNames;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.exceptions.OSHDBTimeoutException;
import org.heigit.ohsome.oshdb.util.function.OSHEntityFilter;
import org.heigit.ohsome.oshdb.util.function.OSMEntityFilter;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected Stream<X> flatMapStreamCellsOSMEntitySnapshotGroupedById(

private List<String> cacheNames(String prefix) {
return this.typeFilter.stream().map(TableNames::forOSMType).filter(Optional::isPresent)
.map(Optional::get).map(tn -> tn.toString(prefix)).collect(Collectors.toList());
.map(Optional::get).map(tn -> tn.toString(prefix)).toList();
}

@Override
Expand Down Expand Up @@ -281,7 +282,8 @@ S execute(Ignite node, CellProcessor<S> cellProcessor) {
// filter out cache misses === empty oshdb cells or not "local" data
.filter(Objects::nonNull)
.filter(ignored -> this.isActive())
.map(cell -> cellProcessor.apply(cell, this.cellIterator))
.map(cell ->
cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), this.cellIterator))
.reduce(identitySupplier.get(), combiner);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
Expand All @@ -39,6 +38,7 @@
import org.heigit.ohsome.oshdb.util.CellId;
import org.heigit.ohsome.oshdb.util.TableNames;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.exceptions.OSHDBTimeoutException;
import org.heigit.ohsome.oshdb.util.function.OSHEntityFilter;
import org.heigit.ohsome.oshdb.util.function.OSMEntityFilter;
Expand Down Expand Up @@ -342,13 +342,14 @@ S execute(Ignite node, CellProcessor<S> cellProcessor) {
}
// iterate over the history of all OSM objects in the current cell
Object data = cacheEntry.getValue();
GridOSHEntity oshEntityCell;
if (data instanceof BinaryObject) {
oshEntityCell = ((BinaryObject) data).deserialize();
GridOSHEntity cell;
if (data instanceof BinaryObject binaryData) {
cell = binaryData.deserialize();
} else {
oshEntityCell = (GridOSHEntity) data;
cell = (GridOSHEntity) data;
}
return cellProcessor.apply(oshEntityCell, this.cellIterator);
return cellProcessor.apply(
OSHEntitySource.fromGridOSHEntity(cell), this.cellIterator);
}
)
) {
Expand Down Expand Up @@ -503,7 +504,7 @@ private static <V, R, M, S, P extends Geometry & Polygonal> S mapReduceOnIgniteC
null
);
S ret;
if (!oshdb.timeoutInMilliseconds().isPresent()) {
if (oshdb.timeoutInMilliseconds().isEmpty()) {
ret = result.get();
} else {
try {
Expand Down Expand Up @@ -534,13 +535,14 @@ private static <X> Stream<X> mapStreamOnIgniteCache(
).setPageSize(SCAN_QUERY_PAGE_SIZE), cacheEntry -> {
// iterate over the history of all OSM objects in the current cell
Object data = cacheEntry.getValue();
GridOSHEntity oshEntityCell;
if (data instanceof BinaryObject) {
oshEntityCell = ((BinaryObject) data).deserialize();
GridOSHEntity cell;
if (data instanceof BinaryObject binaryData) {
cell = binaryData.deserialize();
} else {
oshEntityCell = (GridOSHEntity) data;
cell = (GridOSHEntity) data;
}
return cellProcessor.apply(oshEntityCell, cellIterator).collect(Collectors.toList());
return cellProcessor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator)
.toList();
}
);
// todo: ignite scan query doesn't support timeouts -> implement ourself?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
import javax.annotation.Nonnull;
import org.heigit.ohsome.oshdb.api.object.OSMContributionImpl;
import org.heigit.ohsome.oshdb.api.object.OSMEntitySnapshotImpl;
import org.heigit.ohsome.oshdb.grid.GridOSHEntity;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.function.SerializableBiFunction;
import org.heigit.ohsome.oshdb.util.function.SerializableFunction;
import org.heigit.ohsome.oshdb.util.function.SerializableSupplier;
import org.heigit.ohsome.oshdb.util.mappable.OSMContribution;
import org.heigit.ohsome.oshdb.util.mappable.OSMEntitySnapshot;

class Kernels implements Serializable {
interface CellProcessor<S> extends SerializableBiFunction<GridOSHEntity, CellIterator, S> {}
interface CellProcessor<S> extends SerializableBiFunction<OSHEntitySource, CellIterator, S> {}

interface CancelableProcessStatus {
default <T> boolean isActive(T ignored) {
Expand Down Expand Up @@ -57,10 +57,10 @@ static <R, S> CellProcessor<S> getOSMContributionCellReducer(
SerializableBiFunction<S, R, S> accumulator,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
cellIterator.iterateByContribution(oshEntityCell)
cellIterator.iterateByContribution(source)
.takeWhile(process::isActive)
.forEach(contribution -> {
OSMContribution osmContribution = new OSMContributionImpl(contribution);
Expand All @@ -86,11 +86,11 @@ static <R, S> CellProcessor<S> getOSMContributionGroupingCellReducer(
SerializableBiFunction<S, R, S> accumulator,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
// iterate over the history of all OSM objects in the current cell
List<OSMContribution> contributions = new ArrayList<>();
cellIterator.iterateByContribution(oshEntityCell)
cellIterator.iterateByContribution(source)
.takeWhile(process::isActive)
.forEach(contribution -> {
OSMContribution thisContribution = new OSMContributionImpl(contribution);
Expand Down Expand Up @@ -131,10 +131,10 @@ static <R, S> CellProcessor<S> getOSMEntitySnapshotCellReducer(
SerializableBiFunction<S, R, S> accumulator,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
cellIterator.iterateByTimestamps(oshEntityCell)
cellIterator.iterateByTimestamps(source)
.takeWhile(process::isActive)
.forEach(data -> {
OSMEntitySnapshot snapshot = new OSMEntitySnapshotImpl(data);
Expand All @@ -161,11 +161,11 @@ static <R, S> CellProcessor<S> getOSMEntitySnapshotGroupingCellReducer(
SerializableBiFunction<S, R, S> accumulator,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
List<OSMEntitySnapshot> osmEntitySnapshots = new ArrayList<>();
cellIterator.iterateByTimestamps(oshEntityCell)
cellIterator.iterateByTimestamps(source)
.takeWhile(process::isActive)
.forEach(data -> {
OSMEntitySnapshot thisSnapshot = new OSMEntitySnapshotImpl(data);
Expand Down Expand Up @@ -204,9 +204,9 @@ static <S> CellProcessor<Stream<S>> getOSMContributionCellStreamer(
SerializableFunction<OSMContribution, S> mapper,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
return cellIterator.iterateByContribution(oshEntityCell)
return cellIterator.iterateByContribution(source)
.takeWhile(process::isActive)
.map(OSMContributionImpl::new)
.map(mapper);
Expand All @@ -225,11 +225,11 @@ static <S> CellProcessor<Stream<S>> getOSMContributionGroupingCellStreamer(
SerializableFunction<List<OSMContribution>, Iterable<S>> mapper,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
List<OSMContribution> contributions = new ArrayList<>();
List<S> result = new LinkedList<>();
cellIterator.iterateByContribution(oshEntityCell)
cellIterator.iterateByContribution(source)
.takeWhile(process::isActive)
.map(OSMContributionImpl::new)
.forEach(contribution -> {
Expand Down Expand Up @@ -261,9 +261,9 @@ static <S> CellProcessor<Stream<S>> getOSMEntitySnapshotCellStreamer(
SerializableFunction<OSMEntitySnapshot, S> mapper,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
return cellIterator.iterateByTimestamps(oshEntityCell)
return cellIterator.iterateByTimestamps(source)
.takeWhile(process::isActive)
.map(OSMEntitySnapshotImpl::new)
.map(mapper);
Expand All @@ -282,11 +282,11 @@ static <S> CellProcessor<Stream<S>> getOSMEntitySnapshotGroupingCellStreamer(
SerializableFunction<List<OSMEntitySnapshot>, Iterable<S>> mapper,
CancelableProcessStatus process
) {
return (oshEntityCell, cellIterator) -> {
return (source, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
List<OSMEntitySnapshot> snapshots = new ArrayList<>();
List<S> result = new LinkedList<>();
cellIterator.iterateByTimestamps(oshEntityCell)
cellIterator.iterateByTimestamps(source)
.takeWhile(process::isActive)
.map(OSMEntitySnapshotImpl::new)
.forEach(contribution -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.heigit.ohsome.oshdb.api.mapreducer.backend.Kernels.CellProcessor;
import org.heigit.ohsome.oshdb.index.XYGridTree.CellIdRange;
import org.heigit.ohsome.oshdb.util.celliterator.CellIterator;
import org.heigit.ohsome.oshdb.util.celliterator.OSHEntitySource;
import org.heigit.ohsome.oshdb.util.function.SerializableBiFunction;
import org.heigit.ohsome.oshdb.util.function.SerializableBinaryOperator;
import org.heigit.ohsome.oshdb.util.function.SerializableFunction;
Expand Down Expand Up @@ -69,7 +70,7 @@ private <S> S reduce(
.filter(ignored -> this.isActive())
.flatMap(this::getOshCellsStream)
.filter(ignored -> this.isActive())
.map(oshCell -> processor.apply(oshCell, cellIterator))
.map(cell -> processor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator))
.reduce(identitySupplier.get(), combiner);
}

Expand All @@ -91,7 +92,7 @@ private Stream<X> stream(
.filter(ignored -> this.isActive())
.flatMap(this::getOshCellsStream)
.filter(ignored -> this.isActive())
.flatMap(oshCell -> processor.apply(oshCell, cellIterator));
.flatMap(cell -> processor.apply(OSHEntitySource.fromGridOSHEntity(cell), cellIterator));
}

// === map-reduce operations ===
Expand Down
Loading

0 comments on commit cbb831d

Please sign in to comment.