Skip to content

Commit

Permalink
Merge pull request #511 from GIScience/perf-non-flatmapped-filters
Browse files Browse the repository at this point in the history
allow queries with filters to use fast-lane reducers
  • Loading branch information
tyrasd authored Aug 31, 2023
2 parents 22aef0e + e0aa315 commit bf9e5e5
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 116 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ Changelog

* `OSMEntitySnapshot` now also returns the `lastContributionTimestamp` for each snapshot ([#495])

### performance improvements

* Significantly improve performance of queries which use filters and don't use `flatMap` ([#511])

### other changes

* `CellIterator` is now decoupled from implementation of the "Grid" ([#495])
* Enhance geometry builder with methods to generate geometries from entities when their members are already pre-resolved for a specific timestamp ([#501])

[#495]: https://github.com/GIScience/oshdb/pull/495
[#501]: https://github.com/GIScience/oshdb/pull/501
[#511]: https://github.com/GIScience/oshdb/pull/511


## 1.1.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ cacheName, cellIdRangeToCellIds(), cellIdRanges, cellProcessor, cellIterator

@Override
protected <R, S> S mapReduceCellsOSMContribution(
SerializableFunction<OSMContribution, R> mapper,
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner
Expand Down Expand Up @@ -331,7 +331,7 @@ protected <R, S> S flatMapReduceCellsOSMContributionGroupedById(

@Override
protected <R, S> S mapReduceCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, R> mapper,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner
Expand Down Expand Up @@ -371,7 +371,7 @@ protected <R, S> S flatMapReduceCellsOSMEntitySnapshotGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMContribution(
SerializableFunction<OSMContribution, X> mapper) throws Exception {
SerializableFunction<OSMContribution, Optional<X>> mapper) throws Exception {
return stream(Kernels.getOSMContributionCellStreamer(mapper, this));
}

Expand All @@ -383,7 +383,7 @@ protected Stream<X> flatMapStreamCellsOSMContributionGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, X> mapper) throws Exception {
SerializableFunction<OSMEntitySnapshot, Optional<X>> mapper) throws Exception {
return stream(Kernels.getOSMEntitySnapshotCellStreamer(mapper, this));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected MapReducer<X> copy() {

@Override
protected Stream<X> mapStreamCellsOSMContribution(
SerializableFunction<OSMContribution, X> mapper
SerializableFunction<OSMContribution, Optional<X>> mapper
) throws Exception {
throw new UnsupportedOperationException("Stream function not yet implemented");
}
Expand All @@ -92,7 +92,7 @@ protected Stream<X> flatMapStreamCellsOSMContributionGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, X> mapper
SerializableFunction<OSMEntitySnapshot, Optional<X>> mapper
) throws Exception {
throw new UnsupportedOperationException("Stream function not yet implemented");
}
Expand All @@ -115,7 +115,8 @@ public boolean isCancelable() {
}

@Override
protected <R, S> S mapReduceCellsOSMContribution(SerializableFunction<OSMContribution, R> mapper,
protected <R, S> S mapReduceCellsOSMContribution(
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner) throws Exception {
return mapReduceOnIgniteCache((OSHDBIgnite) this.oshdb, identitySupplier, combiner,
Expand All @@ -140,7 +141,8 @@ protected <R, S> S flatMapReduceCellsOSMContributionGroupedById(

@Override
protected <R, S> S mapReduceCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner)
throws Exception {
return mapReduceOnIgniteCache((OSHDBIgnite) this.oshdb, identitySupplier, combiner,
Expand Down Expand Up @@ -290,12 +292,13 @@ S execute(Ignite node, CellProcessor<S> cellProcessor) {

private static class MapReduceCellsOSMContributionOnIgniteCacheComputeJob
<R, S, P extends Geometry & Polygonal>
extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, R, S, P> {
extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, Optional<R>, S, P> {
MapReduceCellsOSMContributionOnIgniteCacheComputeJob(TagInterpreter tagInterpreter,
List<String> cacheNames, Iterable<CellIdRange> cellIdRanges,
SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly,
OSHEntityFilter preFilter, OSMEntityFilter filter,
SerializableFunction<OSMContribution, R> mapper, SerializableSupplier<S> identitySupplier,
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter,
mapper, identitySupplier, accumulator, combiner);
Expand Down Expand Up @@ -339,12 +342,13 @@ public S execute(Ignite node) {

private static class MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob
<R, S, P extends Geometry & Polygonal>
extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, R, S, P> {
extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, Optional<R>, S, P> {
MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(TagInterpreter tagInterpreter,
List<String> cacheNames, Iterable<CellIdRange> cellIdRanges,
SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly,
OSHEntityFilter preFilter, OSMEntityFilter filter,
SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter,
mapper, identitySupplier, accumulator, combiner);
Expand Down Expand Up @@ -408,7 +412,7 @@ private static <V, R, M, S, P extends Geometry & Polygonal> S mapReduceOnIgniteC
null
);

if (!oshdb.timeoutInMilliseconds().isPresent()) {
if (oshdb.timeoutInMilliseconds().isEmpty()) {
return asyncResult.get();
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.UUID;
Expand Down Expand Up @@ -110,7 +111,8 @@ public boolean isCancelable() {
// === map-reduce operations ===

@Override
protected <R, S> S mapReduceCellsOSMContribution(SerializableFunction<OSMContribution, R> mapper,
protected <R, S> S mapReduceCellsOSMContribution(
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner) throws Exception {
// load tag interpreter helper which is later used for geometry building
Expand Down Expand Up @@ -147,7 +149,8 @@ protected <R, S> S flatMapReduceCellsOSMContributionGroupedById(

@Override
protected <R, S> S mapReduceCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner)
throws Exception {
// load tag interpreter helper which is later used for geometry building
Expand Down Expand Up @@ -185,7 +188,7 @@ protected <R, S> S flatMapReduceCellsOSMEntitySnapshotGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMContribution(
SerializableFunction<OSMContribution, X> mapper) throws Exception {
SerializableFunction<OSMContribution, Optional<X>> mapper) throws Exception {
// load tag interpreter helper which is later used for geometry building
TagInterpreter tagInterpreter = this.getTagInterpreter();

Expand Down Expand Up @@ -219,7 +222,7 @@ protected Stream<X> flatMapStreamCellsOSMContributionGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, X> mapper) throws Exception {
SerializableFunction<OSMEntitySnapshot, Optional<X>> mapper) throws Exception {
// load tag interpreter helper which is later used for geometry building
TagInterpreter tagInterpreter = this.getTagInterpreter();

Expand Down Expand Up @@ -367,12 +370,12 @@ S execute(Ignite node, CellProcessor<S> cellProcessor) {

private static class MapReduceCellsOSMContributionOnIgniteCacheComputeJob
<R, S, P extends Geometry & Polygonal>
extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, R, S, P> {
extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, Optional<R>, S, P> {
MapReduceCellsOSMContributionOnIgniteCacheComputeJob(TagInterpreter tagInterpreter,
String cacheName, Map<Integer, TreeMap<Long, CellIdRange>> cellIdRangesByLevel,
SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly,
OSHEntityFilter preFilter, OSMEntityFilter filter,
SerializableFunction<OSMContribution, R> mapper,
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner) {
super(tagInterpreter, cacheName, cellIdRangesByLevel, tstamps, bbox, poly, preFilter, filter,
Expand Down Expand Up @@ -417,12 +420,12 @@ public S execute(Ignite node) {

private static class MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob
<R, S, P extends Geometry & Polygonal>
extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, R, S, P> {
extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, Optional<R>, S, P> {
MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(TagInterpreter tagInterpreter,
String cacheName, Map<Integer, TreeMap<Long, CellIdRange>> cellIdRangesByLevel,
SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly,
OSHEntityFilter preFilter, OSMEntityFilter filter,
SerializableFunction<OSMEntitySnapshot, R> mapper,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner) {
super(tagInterpreter, cacheName, cellIdRangesByLevel, tstamps, bbox, poly, preFilter, filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,4 @@ class MapReduceOSHDBIgniteAffinityCallTest extends MapReduceOSHDBIgniteTest {
MapReduceOSHDBIgniteAffinityCallTest() throws Exception {
super(oshdb -> oshdb.computeMode(AFFINITY_CALL));
}

@Test
void testOSMEntitySnapshotViewStreamNullValues() throws Exception {
// simple stream query
Set<Integer> result = createMapReducerOSMEntitySnapshot()
.timestamps(
new OSHDBTimestamps("2010-01-01", "2015-01-01", OSHDBTimestamps.Interval.YEARLY))
.filter("id:617308093")
.map(snapshot -> snapshot.getEntity().getUserId())
.map(x -> (Integer) null)
.stream()
.collect(Collectors.toSet());

assertEquals(1, result.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.heigit.ohsome.oshdb.api.mapreducer;

import java.util.Collections;
import org.heigit.ohsome.oshdb.util.function.SerializablePredicate;

/**
* A special map function that represents a filter.
*
* <p>Note that this class is using raw types on purpose because MapReducer's "map functions"
* are designed to input and output arbitrary data types. The necessary type checks are performed
* at at runtime in the respective setters.</p>
*/
@SuppressWarnings({"rawtypes", "unchecked"}) // see javadoc above
class FilterFunction extends MapFunction {
private final SerializablePredicate filter;

FilterFunction(SerializablePredicate filter) {
super((x, ignored) -> filter.test(x)
? Collections.singletonList(x)
: Collections.emptyList(),
true);
this.filter = filter;
}

public boolean test(Object root) {
return this.filter.test(root);
}
}
Loading

0 comments on commit bf9e5e5

Please sign in to comment.