diff --git a/CHANGELOG.md b/CHANGELOG.md index 37e087fac..cb4409e07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ Changelog * `OSMEntitySnapshot` now also returns the `lastContributionTimestamp` for each snapshot ([#495]) +### performance improvements + +* Significantly improve performance of queries which use filters and don't use `flatMap` ([#511]) + ### other changes * `CellIterator` is now decoupled from implementation of the "Grid" ([#495]) @@ -14,6 +18,7 @@ Changelog [#495]: https://github.com/GIScience/oshdb/pull/495 [#501]: https://github.com/GIScience/oshdb/pull/501 +[#511]: https://github.com/GIScience/oshdb/pull/511 ## 1.1.2 diff --git a/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteAffinityCall.java b/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteAffinityCall.java index ac8538707..d9561c158 100644 --- a/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteAffinityCall.java +++ b/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteAffinityCall.java @@ -292,7 +292,7 @@ cacheName, cellIdRangeToCellIds(), cellIdRanges, cellProcessor, cellIterator @Override protected S mapReduceCellsOSMContribution( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner @@ -331,7 +331,7 @@ protected S flatMapReduceCellsOSMContributionGroupedById( @Override protected S mapReduceCellsOSMEntitySnapshot( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner @@ -371,7 +371,7 @@ protected S flatMapReduceCellsOSMEntitySnapshotGroupedById( @Override protected Stream mapStreamCellsOSMContribution( - SerializableFunction mapper) throws Exception { + SerializableFunction> mapper) throws Exception { return stream(Kernels.getOSMContributionCellStreamer(mapper, this)); } @@ -383,7 +383,7 @@ protected Stream flatMapStreamCellsOSMContributionGroupedById( @Override protected Stream mapStreamCellsOSMEntitySnapshot( - SerializableFunction mapper) throws Exception { + SerializableFunction> mapper) throws Exception { return stream(Kernels.getOSMEntitySnapshotCellStreamer(mapper, this)); } diff --git a/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteLocalPeek.java b/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteLocalPeek.java index 0b2b55fca..7412b1e89 100644 --- a/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteLocalPeek.java +++ b/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteLocalPeek.java @@ -78,7 +78,7 @@ protected MapReducer copy() { @Override protected Stream mapStreamCellsOSMContribution( - SerializableFunction mapper + SerializableFunction> mapper ) throws Exception { throw new UnsupportedOperationException("Stream function not yet implemented"); } @@ -92,7 +92,7 @@ protected Stream flatMapStreamCellsOSMContributionGroupedById( @Override protected Stream mapStreamCellsOSMEntitySnapshot( - SerializableFunction mapper + SerializableFunction> mapper ) throws Exception { throw new UnsupportedOperationException("Stream function not yet implemented"); } @@ -115,7 +115,8 @@ public boolean isCancelable() { } @Override - protected S mapReduceCellsOSMContribution(SerializableFunction mapper, + protected S mapReduceCellsOSMContribution( + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) throws Exception { return mapReduceOnIgniteCache((OSHDBIgnite) this.oshdb, identitySupplier, combiner, @@ -140,7 +141,8 @@ protected S flatMapReduceCellsOSMContributionGroupedById( @Override protected S mapReduceCellsOSMEntitySnapshot( - SerializableFunction mapper, SerializableSupplier identitySupplier, + SerializableFunction> mapper, + SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) throws Exception { return mapReduceOnIgniteCache((OSHDBIgnite) this.oshdb, identitySupplier, combiner, @@ -290,12 +292,13 @@ S execute(Ignite node, CellProcessor cellProcessor) { private static class MapReduceCellsOSMContributionOnIgniteCacheComputeJob - extends MapReduceCellsOnIgniteCacheComputeJob { + extends MapReduceCellsOnIgniteCacheComputeJob, S, P> { MapReduceCellsOSMContributionOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, List cacheNames, Iterable cellIdRanges, SortedSet tstamps, OSHDBBoundingBox bbox, P poly, OSHEntityFilter preFilter, OSMEntityFilter filter, - SerializableFunction mapper, SerializableSupplier identitySupplier, + SerializableFunction> mapper, + SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) { super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter, mapper, identitySupplier, accumulator, combiner); @@ -339,12 +342,13 @@ public S execute(Ignite node) { private static class MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob - extends MapReduceCellsOnIgniteCacheComputeJob { + extends MapReduceCellsOnIgniteCacheComputeJob, S, P> { MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, List cacheNames, Iterable cellIdRanges, SortedSet tstamps, OSHDBBoundingBox bbox, P poly, OSHEntityFilter preFilter, OSMEntityFilter filter, - SerializableFunction mapper, SerializableSupplier identitySupplier, + SerializableFunction> mapper, + SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) { super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter, mapper, identitySupplier, accumulator, combiner); @@ -408,7 +412,7 @@ private static S mapReduceOnIgniteC null ); - if (!oshdb.timeoutInMilliseconds().isPresent()) { + if (oshdb.timeoutInMilliseconds().isEmpty()) { return asyncResult.get(); } else { try { diff --git a/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteScanQuery.java b/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteScanQuery.java index e214ab2e0..e26fb53af 100644 --- a/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteScanQuery.java +++ b/oshdb-api-ignite/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerIgniteScanQuery.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.SortedSet; import java.util.TreeMap; import java.util.UUID; @@ -110,7 +111,8 @@ public boolean isCancelable() { // === map-reduce operations === @Override - protected S mapReduceCellsOSMContribution(SerializableFunction mapper, + protected S mapReduceCellsOSMContribution( + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) throws Exception { // load tag interpreter helper which is later used for geometry building @@ -147,7 +149,8 @@ protected S flatMapReduceCellsOSMContributionGroupedById( @Override protected S mapReduceCellsOSMEntitySnapshot( - SerializableFunction mapper, SerializableSupplier identitySupplier, + SerializableFunction> mapper, + SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) throws Exception { // load tag interpreter helper which is later used for geometry building @@ -185,7 +188,7 @@ protected S flatMapReduceCellsOSMEntitySnapshotGroupedById( @Override protected Stream mapStreamCellsOSMContribution( - SerializableFunction mapper) throws Exception { + SerializableFunction> mapper) throws Exception { // load tag interpreter helper which is later used for geometry building TagInterpreter tagInterpreter = this.getTagInterpreter(); @@ -219,7 +222,7 @@ protected Stream flatMapStreamCellsOSMContributionGroupedById( @Override protected Stream mapStreamCellsOSMEntitySnapshot( - SerializableFunction mapper) throws Exception { + SerializableFunction> mapper) throws Exception { // load tag interpreter helper which is later used for geometry building TagInterpreter tagInterpreter = this.getTagInterpreter(); @@ -367,12 +370,12 @@ S execute(Ignite node, CellProcessor cellProcessor) { private static class MapReduceCellsOSMContributionOnIgniteCacheComputeJob - extends MapReduceCellsOnIgniteCacheComputeJob { + extends MapReduceCellsOnIgniteCacheComputeJob, S, P> { MapReduceCellsOSMContributionOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, String cacheName, Map> cellIdRangesByLevel, SortedSet tstamps, OSHDBBoundingBox bbox, P poly, OSHEntityFilter preFilter, OSMEntityFilter filter, - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) { super(tagInterpreter, cacheName, cellIdRangesByLevel, tstamps, bbox, poly, preFilter, filter, @@ -417,12 +420,12 @@ public S execute(Ignite node) { private static class MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob - extends MapReduceCellsOnIgniteCacheComputeJob { + extends MapReduceCellsOnIgniteCacheComputeJob, S, P> { MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, String cacheName, Map> cellIdRangesByLevel, SortedSet tstamps, OSHDBBoundingBox bbox, P poly, OSHEntityFilter preFilter, OSMEntityFilter filter, - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) { super(tagInterpreter, cacheName, cellIdRangesByLevel, tstamps, bbox, poly, preFilter, filter, diff --git a/oshdb-api-ignite/src/test/java/org/heigit/ohsome/oshdb/api/tests/MapReduceOSHDBIgniteAffinityCallTest.java b/oshdb-api-ignite/src/test/java/org/heigit/ohsome/oshdb/api/tests/MapReduceOSHDBIgniteAffinityCallTest.java index 37dda5b47..5056f1b9b 100644 --- a/oshdb-api-ignite/src/test/java/org/heigit/ohsome/oshdb/api/tests/MapReduceOSHDBIgniteAffinityCallTest.java +++ b/oshdb-api-ignite/src/test/java/org/heigit/ohsome/oshdb/api/tests/MapReduceOSHDBIgniteAffinityCallTest.java @@ -22,19 +22,4 @@ class MapReduceOSHDBIgniteAffinityCallTest extends MapReduceOSHDBIgniteTest { MapReduceOSHDBIgniteAffinityCallTest() throws Exception { super(oshdb -> oshdb.computeMode(AFFINITY_CALL)); } - - @Test - void testOSMEntitySnapshotViewStreamNullValues() throws Exception { - // simple stream query - Set result = createMapReducerOSMEntitySnapshot() - .timestamps( - new OSHDBTimestamps("2010-01-01", "2015-01-01", OSHDBTimestamps.Interval.YEARLY)) - .filter("id:617308093") - .map(snapshot -> snapshot.getEntity().getUserId()) - .map(x -> (Integer) null) - .stream() - .collect(Collectors.toSet()); - - assertEquals(1, result.size()); - } } diff --git a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/FilterFunction.java b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/FilterFunction.java new file mode 100644 index 000000000..e9e940ab2 --- /dev/null +++ b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/FilterFunction.java @@ -0,0 +1,28 @@ +package org.heigit.ohsome.oshdb.api.mapreducer; + +import java.util.Collections; +import org.heigit.ohsome.oshdb.util.function.SerializablePredicate; + +/** + * A special map function that represents a filter. + * + *

Note that this class is using raw types on purpose because MapReducer's "map functions" + * are designed to input and output arbitrary data types. The necessary type checks are performed + * at at runtime in the respective setters.

+ */ +@SuppressWarnings({"rawtypes", "unchecked"}) // see javadoc above +class FilterFunction extends MapFunction { + private final SerializablePredicate filter; + + FilterFunction(SerializablePredicate filter) { + super((x, ignored) -> filter.test(x) + ? Collections.singletonList(x) + : Collections.emptyList(), + true); + this.filter = filter; + } + + public boolean test(Object root) { + return this.filter.test(root); + } +} diff --git a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/MapReducer.java b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/MapReducer.java index 49117a887..7b78df096 100644 --- a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/MapReducer.java +++ b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/MapReducer.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TimeZone; @@ -475,8 +476,9 @@ protected MapReducer flatMap(SerializableBiFunction filter(SerializablePredicate f) { - return this - .flatMap(data -> f.test(data) ? Collections.singletonList(data) : Collections.emptyList()); + MapReducer ret = this.copy(); + ret.mappers.add(new FilterFunction(f)); + return ret; } /** @@ -592,7 +594,8 @@ public MapReducer> groupByEntity() throws UnsupportedOperationException List mapFunctions = new ArrayList<>(ret.mappers); ret.mappers.clear(); ret.grouping = Grouping.BY_ID; - @SuppressWarnings("unchecked") // now in the reduce step the backend will return a list of items + @SuppressWarnings("unchecked") + // now in the reduce step the backend will return a list of items MapReducer> listMapReducer = (MapReducer>) ret; for (MapFunction action : mapFunctions) { if (action.isFlatMapper()) { @@ -754,8 +757,10 @@ MapAggregator aggregateByGeometry(Map geometries) var prevMapper = this.getMapper(); SerializableFunction> prevFlatMapper = - this.mappers.stream().noneMatch(MapFunction::isFlatMapper) - ? root -> (Iterable) List.of(prevMapper.apply(root)) + this.mappers.stream().allMatch(this::canUseFastPath) + ? root -> (Iterable) prevMapper.apply(root) + .map(Collections::singletonList) + .orElse(Collections.emptyList()) : this.getFlatMapper(); MapReducer> mapRed; if (isOSMContributionViewQuery()) { @@ -833,12 +838,12 @@ public S reduce( checkTimeout(); switch (this.grouping) { case NONE: - if (this.mappers.stream().noneMatch(MapFunction::isFlatMapper)) { - final SerializableFunction mapper = this.getMapper(); + if (this.mappers.stream().allMatch(this::canUseFastPath)) { + final SerializableFunction> mapper = this.getMapper(); if (isOSMContributionViewQuery()) { @SuppressWarnings("Convert2MethodRef") // having just `mapper::apply` here is problematic, see https://github.com/GIScience/oshdb/pull/37 - final SerializableFunction contributionMapper = + final SerializableFunction> contributionMapper = data -> mapper.apply(data); return this.mapReduceCellsOSMContribution( contributionMapper, @@ -849,7 +854,7 @@ public S reduce( } else if (isOSMEntitySnapshotViewQuery()) { @SuppressWarnings("Convert2MethodRef") // having just `mapper::apply` here is problematic, see https://github.com/GIScience/oshdb/pull/37 - final SerializableFunction snapshotMapper = + final SerializableFunction> snapshotMapper = data -> mapper.apply(data); return this.mapReduceCellsOSMEntitySnapshot( snapshotMapper, @@ -888,11 +893,11 @@ public S reduce( } case BY_ID: final SerializableFunction> flatMapper; - if (this.mappers.stream().noneMatch(MapFunction::isFlatMapper)) { - final SerializableFunction mapper = this.getMapper(); - flatMapper = data -> Collections.singletonList(mapper.apply(data)); - // todo: check if this is actually necessary, doesn't getFlatMapper() do the "same" in - // this case? should we add this as optimization case to getFlatMapper()?? + if (this.mappers.stream().allMatch(this::canUseFastPath)) { + final SerializableFunction> mapper = this.getMapper(); + flatMapper = data -> mapper.apply(data) + .map(Collections::singletonList) + .orElse(Collections.emptyList()); } else { flatMapper = this.getFlatMapper(); } @@ -1302,10 +1307,11 @@ private TDigest digest(SerializableFunction mapper) thr @Deprecated @SuppressWarnings("ResultOfMethodCallIgnored") public void forEach(SerializableConsumer action) throws Exception { + final Object ignored = new Object(); this.map(data -> { action.accept(data); - return null; - }).reduce(() -> null, (ignored, ignored2) -> null); + return ignored; + }).reduce(() -> ignored, (ignored2, ignored3) -> ignored); } /** @@ -1349,18 +1355,18 @@ private Stream streamInternal() throws Exception { checkTimeout(); switch (this.grouping) { case NONE: - if (this.mappers.stream().noneMatch(MapFunction::isFlatMapper)) { - final SerializableFunction mapper = this.getMapper(); + if (this.mappers.stream().allMatch(this::canUseFastPath)) { + final SerializableFunction> mapper = this.getMapper(); if (isOSMContributionViewQuery()) { @SuppressWarnings("Convert2MethodRef") // having just `mapper::apply` here is problematic, see https://github.com/GIScience/oshdb/pull/37 - final SerializableFunction contributionMapper = + final SerializableFunction> contributionMapper = data -> mapper.apply(data); return this.mapStreamCellsOSMContribution(contributionMapper); } else if (isOSMEntitySnapshotViewQuery()) { @SuppressWarnings("Convert2MethodRef") // having just `mapper::apply` here is problematic, see https://github.com/GIScience/oshdb/pull/37 - final SerializableFunction snapshotMapper = + final SerializableFunction> snapshotMapper = data -> mapper.apply(data); return this.mapStreamCellsOSMEntitySnapshot(snapshotMapper); } else { @@ -1394,11 +1400,11 @@ private Stream streamInternal() throws Exception { } case BY_ID: final SerializableFunction> flatMapper; - if (this.mappers.stream().noneMatch(MapFunction::isFlatMapper)) { - final SerializableFunction mapper = this.getMapper(); - flatMapper = data -> Collections.singletonList(mapper.apply(data)); - // todo: check if this is actually necessary, doesn't getFlatMapper() do the "same" in - // this case? should we add this as optimization case to getFlatMapper()?? + if (this.mappers.stream().allMatch(this::canUseFastPath)) { + final SerializableFunction> mapper = this.getMapper(); + flatMapper = data -> mapper.apply(data) + .map(Collections::singletonList) + .orElse(Collections.emptyList()); } else { flatMapper = this.getFlatMapper(); } @@ -1430,13 +1436,13 @@ private Stream streamInternal() throws Exception { // ----------------------------------------------------------------------------------------------- protected abstract Stream mapStreamCellsOSMContribution( - SerializableFunction mapper) throws Exception; + SerializableFunction> mapper) throws Exception; protected abstract Stream flatMapStreamCellsOSMContributionGroupedById( SerializableFunction, Iterable> mapper) throws Exception; protected abstract Stream mapStreamCellsOSMEntitySnapshot( - SerializableFunction mapper) throws Exception; + SerializableFunction> mapper) throws Exception; protected abstract Stream flatMapStreamCellsOSMEntitySnapshotGroupedById( SerializableFunction, Iterable> mapper) throws Exception; @@ -1485,7 +1491,7 @@ protected abstract Stream flatMapStreamCellsOSMEntitySnapshotGroupedById( * `accumulator` and `combiner` steps) */ protected abstract S mapReduceCellsOSMContribution( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner @@ -1585,7 +1591,7 @@ protected abstract S flatMapReduceCellsOSMContributionGroupedById( * `accumulator` and `combiner` steps) */ protected abstract S mapReduceCellsOSMEntitySnapshot( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner @@ -1714,14 +1720,18 @@ protected

P getPolyFilter() { } // concatenates all applied `map` functions - private SerializableFunction getMapper() { + private SerializableFunction> getMapper() { // todo: maybe we can somehow optimize this?? at least for special cases like // this.mappers.size() == 1 - return (SerializableFunction) data -> { + return (SerializableFunction>) data -> { // working with raw Objects since we don't know the actual intermediate types ¯\_(ツ)_/¯ Object result = data; for (MapFunction mapper : this.mappers) { - if (mapper.isFlatMapper()) { + if (mapper instanceof FilterFunction filter) { + if (!filter.test(result)) { + return Optional.empty(); + } + } else if (mapper.isFlatMapper()) { assert false : "flatMap callback requested in getMapper"; throw new UnsupportedOperationException("cannot flat map this"); } else { @@ -1731,7 +1741,7 @@ private SerializableFunction getMapper() { @SuppressWarnings("unchecked") // after applying all mapper functions, the result type is X X mappedResult = (X) result; - return mappedResult; + return Optional.of(mappedResult); }; } @@ -1928,4 +1938,8 @@ private String currentDate() { formatter.setTimeZone(TimeZone.getTimeZone("UTC")); return formatter.format(new Date()); } + + private boolean canUseFastPath(MapFunction f) { + return f instanceof FilterFunction || !f.isFlatMapper(); + } } diff --git a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/Kernels.java b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/Kernels.java index c62eab363..b4a2fdcea 100644 --- a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/Kernels.java +++ b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/Kernels.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import javax.annotation.Nonnull; @@ -43,7 +44,7 @@ public boolean isActive() { @Nonnull static CellProcessor getOSMContributionCellReducer( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator ) { @@ -52,7 +53,7 @@ static CellProcessor getOSMContributionCellReducer( @Nonnull static CellProcessor getOSMContributionCellReducer( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, CancelableProcessStatus process @@ -64,7 +65,8 @@ static CellProcessor getOSMContributionCellReducer( .takeWhile(process::isActive) .forEach(contribution -> { OSMContribution osmContribution = new OSMContributionImpl(contribution); - accInternal.set(accumulator.apply(accInternal.get(), mapper.apply(osmContribution))); + mapper.apply(osmContribution).ifPresent(mapped -> + accInternal.set(accumulator.apply(accInternal.get(), mapped))); }); return accInternal.get(); }; @@ -117,7 +119,7 @@ static CellProcessor getOSMContributionGroupingCellReducer( @Nonnull static CellProcessor getOSMEntitySnapshotCellReducer( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator ) { @@ -126,7 +128,7 @@ static CellProcessor getOSMEntitySnapshotCellReducer( @Nonnull static CellProcessor getOSMEntitySnapshotCellReducer( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, CancelableProcessStatus process @@ -139,7 +141,8 @@ static CellProcessor getOSMEntitySnapshotCellReducer( .forEach(data -> { OSMEntitySnapshot snapshot = new OSMEntitySnapshotImpl(data); // immediately fold the result - accInternal.set(accumulator.apply(accInternal.get(), mapper.apply(snapshot))); + mapper.apply(snapshot).ifPresent(mapped -> + accInternal.set(accumulator.apply(accInternal.get(), mapped))); }); return accInternal.get(); }; @@ -194,23 +197,23 @@ static CellProcessor getOSMEntitySnapshotGroupingCellReducer( @Nonnull static CellProcessor> getOSMContributionCellStreamer( - SerializableFunction mapper + SerializableFunction> mapper ) { return getOSMContributionCellStreamer(mapper, NC); } @Nonnull static CellProcessor> getOSMContributionCellStreamer( - SerializableFunction mapper, + SerializableFunction> mapper, CancelableProcessStatus process ) { - return (source, cellIterator) -> { - // iterate over the history of all OSM objects in the current cell - return cellIterator.iterateByContribution(source) - .takeWhile(process::isActive) - .map(OSMContributionImpl::new) - .map(mapper); - }; + // iterate over the history of all OSM objects in the current cell + return (source, cellIterator) -> cellIterator.iterateByContribution(source) + .takeWhile(process::isActive) + .map(OSMContributionImpl::new) + .map(mapper) + .filter(Optional::isPresent) + .map(Optional::get); } @Nonnull @@ -251,23 +254,23 @@ static CellProcessor> getOSMContributionGroupingCellStreamer( @Nonnull static CellProcessor> getOSMEntitySnapshotCellStreamer( - SerializableFunction mapper + SerializableFunction> mapper ) { return getOSMEntitySnapshotCellStreamer(mapper, NC); } @Nonnull static CellProcessor> getOSMEntitySnapshotCellStreamer( - SerializableFunction mapper, + SerializableFunction> mapper, CancelableProcessStatus process ) { - return (source, cellIterator) -> { - // iterate over the history of all OSM objects in the current cell - return cellIterator.iterateByTimestamps(source) - .takeWhile(process::isActive) - .map(OSMEntitySnapshotImpl::new) - .map(mapper); - }; + // iterate over the history of all OSM objects in the current cell + return (source, cellIterator) -> cellIterator.iterateByTimestamps(source) + .takeWhile(process::isActive) + .map(OSMEntitySnapshotImpl::new) + .map(mapper) + .filter(Optional::isPresent) + .map(Optional::get); } @Nonnull diff --git a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbc.java b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbc.java index 740e5e4eb..2442eadaa 100644 --- a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbc.java +++ b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbc.java @@ -99,7 +99,11 @@ private class GridOSHEntityIterator implements Iterator { private final Connection conn; GridOSHEntity next; - public GridOSHEntityIterator(ResultSet oshCellsRawData, PreparedStatement pstmt, Connection conn) { + public GridOSHEntityIterator( + ResultSet oshCellsRawData, + PreparedStatement pstmt, + Connection conn + ) { this.oshCellsRawData = oshCellsRawData; this.preparedStatement = pstmt; this.conn = conn; diff --git a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbcMultithread.java b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbcMultithread.java index f046f8cf6..ac0721ac8 100644 --- a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbcMultithread.java +++ b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbcMultithread.java @@ -4,6 +4,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.heigit.ohsome.oshdb.api.db.OSHDBDatabase; import org.heigit.ohsome.oshdb.api.mapreducer.MapReducer; @@ -99,7 +100,7 @@ private Stream stream( @Override protected S mapReduceCellsOSMContribution( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner @@ -137,7 +138,7 @@ protected S flatMapReduceCellsOSMContributionGroupedById( @Override protected S mapReduceCellsOSMEntitySnapshot( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner @@ -177,7 +178,7 @@ protected S flatMapReduceCellsOSMEntitySnapshotGroupedById( @Override protected Stream mapStreamCellsOSMContribution( - SerializableFunction mapper) throws Exception { + SerializableFunction> mapper) throws Exception { return this.stream(Kernels.getOSMContributionCellStreamer(mapper, this)); } @@ -189,7 +190,7 @@ protected Stream flatMapStreamCellsOSMContributionGroupedById( @Override protected Stream mapStreamCellsOSMEntitySnapshot( - SerializableFunction mapper) throws Exception { + SerializableFunction> mapper) throws Exception { return this.stream(Kernels.getOSMEntitySnapshotCellStreamer(mapper, this)); } diff --git a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbcSinglethread.java b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbcSinglethread.java index f5d13e1cd..e9ae88ad9 100644 --- a/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbcSinglethread.java +++ b/oshdb-api/src/main/java/org/heigit/ohsome/oshdb/api/mapreducer/backend/MapReducerJdbcSinglethread.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.heigit.ohsome.oshdb.api.db.OSHDBDatabase; import org.heigit.ohsome.oshdb.api.db.OSHDBJdbc; @@ -105,7 +106,7 @@ private Stream stream( @Override protected S mapReduceCellsOSMContribution( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner @@ -143,7 +144,7 @@ protected S flatMapReduceCellsOSMContributionGroupedById( @Override protected S mapReduceCellsOSMEntitySnapshot( - SerializableFunction mapper, + SerializableFunction> mapper, SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner @@ -183,7 +184,7 @@ protected S flatMapReduceCellsOSMEntitySnapshotGroupedById( @Override protected Stream mapStreamCellsOSMContribution( - SerializableFunction mapper) throws Exception { + SerializableFunction> mapper) throws Exception { return this.stream(Kernels.getOSMContributionCellStreamer(mapper, this)); } @@ -196,7 +197,7 @@ protected Stream flatMapStreamCellsOSMContributionGroupedById( @Override protected Stream mapStreamCellsOSMEntitySnapshot( - SerializableFunction mapper) throws Exception { + SerializableFunction> mapper) throws Exception { return this.stream(Kernels.getOSMEntitySnapshotCellStreamer(mapper, this)); } diff --git a/oshdb-api/src/test/java/org/heigit/ohsome/oshdb/api/tests/HelpersOSMContributionViewTest.java b/oshdb-api/src/test/java/org/heigit/ohsome/oshdb/api/tests/HelpersOSMContributionViewTest.java index a5b004c67..7026e699c 100644 --- a/oshdb-api/src/test/java/org/heigit/ohsome/oshdb/api/tests/HelpersOSMContributionViewTest.java +++ b/oshdb-api/src/test/java/org/heigit/ohsome/oshdb/api/tests/HelpersOSMContributionViewTest.java @@ -247,13 +247,6 @@ void testUniq() throws Exception { assertEquals(21, result4.get(true).size()); assertEquals(21, result4.get(false).size()); - - // doesn't crash with null pointers - Set result5 = this.createMapReducer() - .timestamps(timestamps2) - .map(x -> null) - .uniq(); - assertEquals(1, result5.size() ); } } diff --git a/oshdb-api/src/test/java/org/heigit/ohsome/oshdb/api/tests/TestAutoAggregation.java b/oshdb-api/src/test/java/org/heigit/ohsome/oshdb/api/tests/TestAutoAggregation.java index ad4fe1f55..bd95645eb 100644 --- a/oshdb-api/src/test/java/org/heigit/ohsome/oshdb/api/tests/TestAutoAggregation.java +++ b/oshdb-api/src/test/java/org/heigit/ohsome/oshdb/api/tests/TestAutoAggregation.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.SortedMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -135,7 +136,7 @@ protected MapReducer copy() { @Override protected Stream mapStreamCellsOSMContribution( - SerializableFunction mapper) { + SerializableFunction> mapper) { throw new UnsupportedOperationException(); } @@ -147,7 +148,7 @@ protected Stream flatMapStreamCellsOSMContributionGroupedById( @Override protected Stream mapStreamCellsOSMEntitySnapshot( - SerializableFunction mapper) { + SerializableFunction> mapper) { throw new UnsupportedOperationException(); } @@ -159,7 +160,8 @@ protected Stream flatMapStreamCellsOSMEntitySnapshotGroupedById( @Override protected S mapReduceCellsOSMContribution( - SerializableFunction mapper, SerializableSupplier identitySupplier, + SerializableFunction> mapper, + SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) { throw new UnsupportedOperationException(); } @@ -174,10 +176,15 @@ protected S flatMapReduceCellsOSMContributionGroupedById( @Override protected S mapReduceCellsOSMEntitySnapshot( - SerializableFunction mapper, SerializableSupplier identitySupplier, + SerializableFunction> mapper, + SerializableSupplier identitySupplier, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) { - return nodes.stream().map(TestAutoAggregation::snapshot).map(mapper).reduce(identitySupplier.get(), - accumulator, combiner); + return nodes.stream() + .map(TestAutoAggregation::snapshot) + .map(mapper) + .filter(Optional::isPresent) + .map(Optional::get) + .reduce(identitySupplier.get(), accumulator, combiner); } @Override diff --git a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/mappable/OSMContribution.java b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/mappable/OSMContribution.java index f8734aaf3..a27b0681e 100644 --- a/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/mappable/OSMContribution.java +++ b/oshdb-util/src/main/java/org/heigit/ohsome/oshdb/util/mappable/OSMContribution.java @@ -91,7 +91,8 @@ public interface OSMContribution extends OSHDBMapReducible, Comparable * * @param contributionType the ContributionType to check * @return true if this contribution does the respective change, false otherwise