diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 6a9ebb3d94..eee38398a4 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -76,6 +76,12 @@ ${slf4j.version} + + org.locationtech.jts + jts-core + ${jts.version} + + com.carrotsearch junit-benchmarks diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java index 87d39bf16e..ce64e60c9d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java @@ -18,7 +18,9 @@ */ package org.apache.parquet.column.statistics; +import org.apache.parquet.column.statistics.geometry.GeospatialStatistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -30,6 +32,7 @@ public class BinaryStatistics extends Statistics { private Binary max; private Binary min; + private GeospatialStatistics geospatialStatistics = null; /** * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead @@ -41,6 +44,13 @@ public BinaryStatistics() { BinaryStatistics(PrimitiveType type) { super(type); + LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) { + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType = + (LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) logicalType; + geospatialStatistics = + new GeospatialStatistics(geometryLogicalType.getCrs(), geometryLogicalType.getMetadata()); + } } private BinaryStatistics(BinaryStatistics other) { @@ -49,6 +59,9 @@ private BinaryStatistics(BinaryStatistics other) { initializeStats(other.min, other.max); } setNumNulls(other.getNumNulls()); + if (other.geospatialStatistics != null) { + geospatialStatistics = other.geospatialStatistics.copy(); + } } @Override @@ -62,6 +75,9 @@ public void updateStats(Binary value) { } else if (comparator().compare(max, value) < 0) { max = value.copy(); } + if (geospatialStatistics != null) { + geospatialStatistics.update(value); + } } @Override @@ -72,6 +88,9 @@ public void mergeStatisticsMinMax(Statistics stats) { } else { updateStats(binaryStats.getMin(), binaryStats.getMax()); } + if (geospatialStatistics != null) { + geospatialStatistics.merge(binaryStats.geospatialStatistics); + } } /** @@ -190,4 +209,12 @@ public void setMinMax(Binary min, Binary max) { public BinaryStatistics copy() { return new BinaryStatistics(this); } + + public void setGeometryStatistics(GeospatialStatistics geospatialStatistics) { + this.geospatialStatistics = geospatialStatistics; + } + + public GeospatialStatistics getGeospatialStatistics() { + return geospatialStatistics; + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java index 206ddadadc..b1b2b73a32 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java @@ -20,6 +20,7 @@ import java.util.Arrays; import org.apache.parquet.column.UnknownColumnTypeException; +import org.apache.parquet.column.statistics.geometry.GeospatialStatistics; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.Float16; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -64,6 +65,10 @@ public Builder withNumNulls(long numNulls) { return this; } + public Builder withGeometryStatistics(GeospatialStatistics geospatialStatistics) { + throw new UnsupportedOperationException("Please use the GeometryBuilder"); + } + public Statistics build() { Statistics stats = createStats(type); if (min != null && max != null) { @@ -178,6 +183,30 @@ public Statistics build() { } } + // Builder for GEOMETRY type to handle GeometryStatistics + private static class GeometryBuilder extends Builder { + + private GeospatialStatistics geospatialStatistics; + + public GeometryBuilder(PrimitiveType type) { + super(type); + assert type.getPrimitiveTypeName() == PrimitiveTypeName.BINARY; + } + + @Override + public Builder withGeometryStatistics(GeospatialStatistics geospatialStatistics) { + this.geospatialStatistics = geospatialStatistics; + return this; + } + + @Override + public Statistics build() { + BinaryStatistics stats = (BinaryStatistics) super.build(); + stats.setGeometryStatistics(geospatialStatistics); + return stats; + } + } + private final PrimitiveType type; private final PrimitiveComparator comparator; private boolean hasNonNullValue; @@ -278,6 +307,11 @@ public static Builder getBuilderForReading(PrimitiveType type) { if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation) { return new Float16Builder(type); } + return new Builder(type); + case BINARY: + if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) { + return new GeometryBuilder(type); + } default: return new Builder(type); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java new file mode 100644 index 0000000000..bd9fa43740 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import org.apache.parquet.Preconditions; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Envelope; +import org.locationtech.jts.geom.Geometry; + +public class BoundingBox { + + private double xMin = Double.POSITIVE_INFINITY; + private double xMax = Double.NEGATIVE_INFINITY; + private double yMin = Double.POSITIVE_INFINITY; + private double yMax = Double.NEGATIVE_INFINITY; + private double zMin = Double.POSITIVE_INFINITY; + private double zMax = Double.NEGATIVE_INFINITY; + private double mMin = Double.POSITIVE_INFINITY; + private double mMax = Double.NEGATIVE_INFINITY; + + public BoundingBox( + double xMin, double xMax, double yMin, double yMax, double zMin, double zMax, double mMin, double mMax) { + this.xMin = xMin; + this.xMax = xMax; + this.yMin = yMin; + this.yMax = yMax; + this.zMin = zMin; + this.zMax = zMax; + this.mMin = mMin; + this.mMax = mMax; + } + + public BoundingBox() {} + + public double getXMin() { + return xMin; + } + + public double getXMax() { + return xMax; + } + + public double getYMin() { + return yMin; + } + + public double getYMax() { + return yMax; + } + + public double getZMin() { + return zMin; + } + + public double getZMax() { + return zMax; + } + + public double getMMin() { + return mMin; + } + + public double getMMax() { + return mMax; + } + + // Method to update the bounding box with the coordinates of a Geometry object + // geometry can be changed by this method + void update(Geometry geometry, String crs) { + GeometryUtils.normalizeLongitude(geometry); + Envelope envelope = geometry.getEnvelopeInternal(); + double minX = envelope.getMinX(); + double minY = envelope.getMinY(); + double maxX = envelope.getMaxX(); + double maxY = envelope.getMaxY(); + + // Initialize Z and M values + double minZ = Double.POSITIVE_INFINITY; + double maxZ = Double.NEGATIVE_INFINITY; + double minM = Double.POSITIVE_INFINITY; + double maxM = Double.NEGATIVE_INFINITY; + + Coordinate[] coordinates = geometry.getCoordinates(); + for (Coordinate coord : coordinates) { + if (!Double.isNaN(coord.getZ())) { + minZ = Math.min(minZ, coord.getZ()); + maxZ = Math.max(maxZ, coord.getZ()); + } + if (!Double.isNaN(coord.getM())) { + minM = Math.min(minM, coord.getM()); + maxM = Math.max(maxM, coord.getM()); + } + } + + if (xMin == Double.POSITIVE_INFINITY || xMax == Double.NEGATIVE_INFINITY) { + xMin = minX; + xMax = maxX; + } else { + // Handle the wraparound case for X values + if (!isCrossingAntiMeridian(xMax, xMin)) { // current bounding box is NOT wrapped around + if (!isCrossingAntiMeridian(maxX, minX)) { // new bounding box is NOT wrapped around + xMin = Math.min(xMin, minX); + xMax = Math.max(xMax, maxX); + } else { // new bounding box is wrapped around + xMin = Math.max(xMin, maxX); + xMax = Math.min(xMax, minX); + } + } else { // current bounding box is wrapped around + if (!isCrossingAntiMeridian(maxX, minX)) { // new bounding box is NOT wrapped around + xMin = Math.max(xMin, minX); + xMax = Math.min(xMax, maxX); + } else { // new bounding box is wrapped around + xMin = Math.max(xMin, maxX); + xMax = Math.min(xMax, minX); + } + } + } + + yMin = Math.min(yMin, minY); + yMax = Math.max(yMax, maxY); + zMin = Math.min(zMin, minZ); + zMax = Math.max(zMax, maxZ); + mMin = Math.min(mMin, minM); + mMax = Math.max(mMax, maxM); + } + + void merge(BoundingBox other) { + Preconditions.checkArgument(other != null, "Cannot merge with null bounding box"); + double minX = other.xMin; + double maxX = other.xMax; + + if (xMin == Double.POSITIVE_INFINITY || xMax == Double.NEGATIVE_INFINITY) { + xMin = minX; + xMax = maxX; + } else { + // Handle the wraparound case for X values + if (!isCrossingAntiMeridian(xMax, xMin)) { // current bounding box is NOT wrapped around + if (!isCrossingAntiMeridian(maxX, minX)) { // new bounding box is NOT wrapped around + xMin = Math.min(xMin, minX); + xMax = Math.max(xMax, maxX); + } else { // new bounding box is wrapped around + xMin = Math.max(xMin, maxX); + xMax = Math.min(xMax, minX); + } + } else { // current bounding box is wrapped around + if (!isCrossingAntiMeridian(maxX, minX)) { // new bounding box is NOT wrapped around + xMin = Math.max(xMin, minX); + xMax = Math.min(xMax, maxX); + } else { // new bounding box is wrapped around + xMin = Math.max(xMin, minX); + xMax = Math.min(xMax, maxX); + } + } + } + + yMin = Math.min(yMin, other.yMin); + yMax = Math.max(yMax, other.yMax); + zMin = Math.min(zMin, other.zMin); + zMax = Math.max(zMax, other.zMax); + mMin = Math.min(mMin, other.mMin); + mMax = Math.max(mMax, other.mMax); + } + + public void reset() { + xMin = Double.POSITIVE_INFINITY; + xMax = Double.NEGATIVE_INFINITY; + yMin = Double.POSITIVE_INFINITY; + yMax = Double.NEGATIVE_INFINITY; + zMin = Double.POSITIVE_INFINITY; + zMax = Double.NEGATIVE_INFINITY; + mMin = Double.POSITIVE_INFINITY; + mMax = Double.NEGATIVE_INFINITY; + } + + public void abort() { + xMin = Double.NaN; + xMax = Double.NaN; + yMin = Double.NaN; + yMax = Double.NaN; + zMin = Double.NaN; + zMax = Double.NaN; + mMin = Double.NaN; + mMax = Double.NaN; + } + + private boolean isCrossingAntiMeridian(double x1, double x2) { + return Math.abs(x1 - x2) > 180; + } + + public BoundingBox copy() { + return new BoundingBox(xMin, xMax, yMin, yMax, zMin, zMax, mMin, mMax); + } + + @Override + public String toString() { + return "BoundingBox{" + "xMin=" + + xMin + ", xMax=" + + xMax + ", yMin=" + + yMin + ", yMax=" + + yMax + ", zMin=" + + zMin + ", zMax=" + + zMax + ", mMin=" + + mMin + ", mMax=" + + mMax + '}'; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/DummyBoundingBox.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/DummyBoundingBox.java new file mode 100644 index 0000000000..0a876cd3f9 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/DummyBoundingBox.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import org.locationtech.jts.geom.Geometry; + +// Immutable dummy BoundingBox class +class DummyBoundingBox extends BoundingBox { + + @Override + public void update(Geometry geometry, String crs) { + // No-op + } + + @Override + public void merge(BoundingBox other) { + // No-op + } + + @Override + public void reset() { + // No-op + } + + @Override + public void abort() { + // No-op + } + + @Override + public BoundingBox copy() { + return this; // Return the same instance + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java new file mode 100644 index 0000000000..4c85382ba0 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.parquet.Preconditions; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; + +public class GeometryTypes { + + private static final int UNKNOWN_TYPE_ID = -1; + private Set types = new HashSet<>(); + private boolean valid = true; + + public GeometryTypes(Set types) { + this.types = types; + } + + public GeometryTypes() {} + + public Set getTypes() { + return types; + } + + void update(Geometry geometry) { + if (!valid) { + return; + } + int code = getGeometryTypeCode(geometry); + if (code != UNKNOWN_TYPE_ID) { + types.add(code); + } else { + valid = false; + types.clear(); + } + } + + public void merge(GeometryTypes other) { + Preconditions.checkArgument(other != null, "Cannot merge with null GeometryTypes"); + if (!valid) { + return; + } + if (!other.valid) { + valid = false; + types.clear(); + return; + } + types.addAll(other.types); + } + + public void reset() { + types.clear(); + valid = true; + } + + public void abort() { + valid = false; + types.clear(); + } + + public GeometryTypes copy() { + return new GeometryTypes(new HashSet<>(types)); + } + + @Override + public String toString() { + return "GeometryTypes{" + "types=" + + types.stream().map(this::typeIdToString).collect(Collectors.toSet()) + '}'; + } + + private int getGeometryTypeId(Geometry geometry) { + switch (geometry.getGeometryType()) { + case Geometry.TYPENAME_POINT: + return 1; + case Geometry.TYPENAME_LINESTRING: + return 2; + case Geometry.TYPENAME_POLYGON: + return 3; + case Geometry.TYPENAME_MULTIPOINT: + return 4; + case Geometry.TYPENAME_MULTILINESTRING: + return 5; + case Geometry.TYPENAME_MULTIPOLYGON: + return 6; + case Geometry.TYPENAME_GEOMETRYCOLLECTION: + return 7; + default: + return UNKNOWN_TYPE_ID; + } + } + + /** + * This is from the following spec proposed: + *

+ * The geometry types of all geometries, or an empty array if they are not + * known. This is borrowed from `geometry_types` column metadata of GeoParquet [1] + * except that values in the list are WKB (ISO variant) integer codes [2]. Table + * below shows the most common geometry types and their codes: + *

+ * | Type | XY | XYZ | XYM | XYZM | + * | :----------------- | :--- | :--- | :--- | :--: | + * | Point | 0001 | 1001 | 2001 | 3001 | + * | LineString | 0002 | 1002 | 2002 | 3002 | + * | Polygon | 0003 | 1003 | 2003 | 3003 | + * | MultiPoint | 0004 | 1004 | 2004 | 3004 | + * | MultiLineString | 0005 | 1005 | 2005 | 3005 | + * | MultiPolygon | 0006 | 1006 | 2006 | 3006 | + * | GeometryCollection | 0007 | 1007 | 2007 | 3007 | + *

+ * In addition, the following rules are used: + * - A list of multiple values indicates that multiple geometry types are + * present (e.g. `[0003, 0006]`). + * - An empty array explicitly signals that the geometry types are not known. + * - The geometry types in the list must be unique (e.g. `[0001, 0001]` + * is not valid). + *

+ * Please refer to links below for more detail: + * [1] https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry#Well-known_binary + * [2] https://github.com/opengeospatial/geoparquet/blob/v1.0.0/format-specs/geoparquet.md?plain=1#L91 + */ + private int getGeometryTypeCode(Geometry geometry) { + int typeId = getGeometryTypeId(geometry); + if (typeId == UNKNOWN_TYPE_ID) { + return UNKNOWN_TYPE_ID; + } + Coordinate[] coordinates = geometry.getCoordinates(); + boolean hasZ = false; + boolean hasM = false; + for (Coordinate coordinate : coordinates) { + if (!Double.isNaN(coordinate.getZ())) { + hasZ = true; + } + if (!Double.isNaN(coordinate.getM())) { + hasM = true; + } + if (hasZ && hasM) { + break; + } + } + if (hasZ) { + typeId += 1000; + } + if (hasM) { + typeId += 2000; + } + return typeId; + } + + private String typeIdToString(int typeId) { + String typeString; + switch (typeId % 1000) { + case 1: + typeString = Geometry.TYPENAME_POINT; + break; + case 2: + typeString = Geometry.TYPENAME_LINESTRING; + break; + case 3: + typeString = Geometry.TYPENAME_POLYGON; + break; + case 4: + typeString = Geometry.TYPENAME_MULTIPOINT; + break; + case 5: + typeString = Geometry.TYPENAME_MULTILINESTRING; + break; + case 6: + typeString = Geometry.TYPENAME_MULTIPOLYGON; + break; + case 7: + typeString = Geometry.TYPENAME_GEOMETRYCOLLECTION; + break; + default: + return "Unknown"; + } + if (typeId >= 3000) { + typeString += " (XYZM)"; + } else if (typeId >= 2000) { + typeString += " (XYM)"; + } else if (typeId >= 1000) { + typeString += " (XYZ)"; + } else { + typeString += " (XY)"; + } + return typeString; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryUtils.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryUtils.java new file mode 100644 index 0000000000..f91eafe49b --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.statistics.geometry; + +import org.locationtech.jts.geom.CoordinateSequence; +import org.locationtech.jts.geom.CoordinateSequenceFilter; +import org.locationtech.jts.geom.Geometry; + +class GeometryUtils { + + public static void normalizeLongitude(Geometry geometry) { + if (geometry == null || geometry.isEmpty()) { + return; + } + + geometry.apply(new CoordinateSequenceFilter() { + @Override + public void filter(CoordinateSequence seq, int i) { + double x = seq.getX(i); + // Normalize the longitude to be within -180 to 180 range + while (x > 180) x -= 360; + while (x < -180) x += 360; + seq.setOrdinate(i, CoordinateSequence.X, x); + } + + @Override + public boolean isDone() { + return false; // Continue processing until all coordinates are processed + } + + @Override + public boolean isGeometryChanged() { + return true; // The geometry is changed as we are modifying the coordinates + } + }); + + geometry.geometryChanged(); // Notify the geometry that its coordinates have been changed + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeospatialStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeospatialStatistics.java new file mode 100644 index 0000000000..57b7de7fc9 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeospatialStatistics.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import java.nio.ByteBuffer; +import org.apache.parquet.Preconditions; +import org.apache.parquet.io.api.Binary; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; + +public class GeospatialStatistics { + + private static final BoundingBox DUMMY_BOUNDING_BOX = new DummyBoundingBox(); + + // Metadata that may impact the statistics calculation + private final String crs; + private final ByteBuffer metadata; + + private final BoundingBox boundingBox; + private final GeometryTypes geometryTypes; + private final WKBReader reader = new WKBReader(); + + public GeospatialStatistics(String crs, ByteBuffer metadata, BoundingBox boundingBox, GeometryTypes geometryTypes) { + this.crs = crs; + this.metadata = metadata; + this.boundingBox = supportsBoundingBox() ? boundingBox : DUMMY_BOUNDING_BOX; + this.geometryTypes = geometryTypes; + } + + public GeospatialStatistics(String crs, ByteBuffer metadata) { + this(crs, metadata, new BoundingBox(), new GeometryTypes()); + } + + public BoundingBox getBoundingBox() { + return boundingBox; + } + + public GeometryTypes getGeometryTypes() { + return geometryTypes; + } + + public void update(Binary value) { + if (value == null) { + return; + } + try { + Geometry geom = reader.read(value.getBytes()); + update(geom); + } catch (ParseException e) { + abort(); + } + } + + private void update(Geometry geom) { + if (supportsBoundingBox()) { + boundingBox.update(geom, crs); + } + geometryTypes.update(geom); + } + + /** + * A bounding box is a rectangular region defined by two points, the lower left + * and upper right corners. It is used to represent the minimum and maximum + * coordinates of a geometry. Only planar geometries can have a bounding box. + */ + private boolean supportsBoundingBox() { + // Only planar geometries can have a bounding box + // based on the current specification + return true; + } + + public void merge(GeospatialStatistics other) { + Preconditions.checkArgument(other != null, "Cannot merge with null GeometryStatistics"); + + if (boundingBox != null && other.boundingBox != null) { + boundingBox.merge(other.boundingBox); + } + + if (geometryTypes != null && other.geometryTypes != null) { + geometryTypes.merge(other.geometryTypes); + } + } + + public void reset() { + boundingBox.reset(); + geometryTypes.reset(); + } + + public void abort() { + boundingBox.abort(); + geometryTypes.abort(); + } + + // Copy the statistics + public GeospatialStatistics copy() { + return new GeospatialStatistics( + crs, + metadata, + boundingBox != null ? boundingBox.copy() : null, + geometryTypes != null ? geometryTypes.copy() : null); + } + + @Override + public String toString() { + return "GeospatialStatistics{" + "boundingBox=" + boundingBox + ", coverings=" + geometryTypes + '}'; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java index 86099717df..5a7931a1c6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.PrimitiveIterator; +import org.apache.parquet.column.statistics.geometry.GeospatialStatistics; import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; @@ -71,4 +72,12 @@ default List getRepetitionLevelHistogram() { default List getDefinitionLevelHistogram() { throw new UnsupportedOperationException("Definition level histogram is not implemented"); } + + /** + * @return the unmodifiable list of the geometry statistics for each page; + * used for converting to the related thrift object + */ + default List getGeometryStatistics() { + throw new UnsupportedOperationException("Geospatial statistics is not implemented"); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index e78b2ceae1..e67fddeeae 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -38,8 +38,10 @@ import java.util.Set; import java.util.function.IntPredicate; import org.apache.parquet.column.MinMax; +import org.apache.parquet.column.statistics.BinaryStatistics; import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.statistics.geometry.GeospatialStatistics; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Contains; import org.apache.parquet.filter2.predicate.Operators.Eq; @@ -56,6 +58,7 @@ import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveComparator; import org.apache.parquet.schema.PrimitiveStringifier; import org.apache.parquet.schema.PrimitiveType; @@ -105,6 +108,8 @@ int translate(int arrayIndex) { private long[] repLevelHistogram; // might be null private long[] defLevelHistogram; + // might be null + private GeospatialStatistics[] geospatialStatistics; static String truncate(String str) { if (str.length() <= MAX_VALUE_LENGTH_FOR_TOSTRING) { @@ -200,6 +205,17 @@ private String formatHistogram(long[] histogram, int pageIndex) { return TOSTRING_MISSING_VALUE_MARKER; } + @Override + public List getGeometryStatistics() { + List geomStats = new ArrayList<>(); + if (geospatialStatistics != null) { + for (GeospatialStatistics stats : geospatialStatistics) { + geomStats.add(stats.copy()); + } + } + return geomStats; + } + @Override public String toString() { try (Formatter formatter = new Formatter()) { @@ -521,6 +537,7 @@ public long getMinMaxSize() { private int nextPageIndex; private LongList repLevelHistogram = new LongArrayList(); private LongList defLevelHistogram = new LongArrayList(); + private List geospatialStatistics = new ArrayList<>(); /** * @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at @@ -611,10 +628,52 @@ public static ColumnIndex build( List maxValues, List repLevelHistogram, List defLevelHistogram) { + return build(type, boundaryOrder, nullPages, nullCounts, minValues, maxValues, null, null, null); + } + + /** + * @param type + * the primitive type + * @param boundaryOrder + * the boundary order of the min/max values + * @param nullPages + * the null pages (one boolean value for each page that signifies whether the page consists of nulls + * entirely) + * @param nullCounts + * the number of null values for each page + * @param minValues + * the min values for each page + * @param maxValues + * the max values for each page + * @param repLevelHistogram + * the repetition level histogram for all levels of each page + * @param defLevelHistogram + * the definition level histogram for all levels of each page + * @param geospatialStatistics + * the geometry statistics for each page (apply to GEOMETRY logical type only) + * @return the newly created {@link ColumnIndex} object based on the specified arguments + */ + public static ColumnIndex build( + PrimitiveType type, + BoundaryOrder boundaryOrder, + List nullPages, + List nullCounts, + List minValues, + List maxValues, + List repLevelHistogram, + List defLevelHistogram, + List geospatialStatistics) { ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE); - builder.fill(nullPages, nullCounts, minValues, maxValues, repLevelHistogram, defLevelHistogram); + builder.fill( + nullPages, + nullCounts, + minValues, + maxValues, + repLevelHistogram, + defLevelHistogram, + geospatialStatistics); ColumnIndexBase columnIndex = builder.build(type); columnIndex.boundaryOrder = requireNonNull(boundaryOrder); return columnIndex; @@ -662,6 +721,16 @@ public void add(Statistics stats, SizeStatistics sizeStats) { defLevelHistogram = null; } + if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) { + assert stats instanceof BinaryStatistics; + BinaryStatistics binaryStats = (BinaryStatistics) stats; + if (geospatialStatistics != null && binaryStats.getGeospatialStatistics() != null) { + geospatialStatistics.add(binaryStats.getGeospatialStatistics()); + } else { + geospatialStatistics = null; + } + } + ++nextPageIndex; } @@ -675,7 +744,8 @@ private void fill( List minValues, List maxValues, List repLevelHistogram, - List defLevelHistogram) { + List defLevelHistogram, + List geospatialStatistics) { clear(); int pageCount = nullPages.size(); if ((nullCounts != null && nullCounts.size() != pageCount) @@ -722,6 +792,9 @@ private void fill( if (defLevelHistogram != null) { this.defLevelHistogram.addAll(defLevelHistogram); } + if (geospatialStatistics != null) { + this.geospatialStatistics.addAll(geospatialStatistics); + } } /** @@ -758,6 +831,10 @@ private ColumnIndexBase build(PrimitiveType type) { if (defLevelHistogram != null && !defLevelHistogram.isEmpty()) { columnIndex.defLevelHistogram = defLevelHistogram.toLongArray(); } + if (geospatialStatistics != null && !geospatialStatistics.isEmpty()) { + columnIndex.geospatialStatistics = new GeospatialStatistics[geospatialStatistics.size()]; + geospatialStatistics.toArray(columnIndex.geospatialStatistics); + } return columnIndex; } @@ -804,6 +881,7 @@ private void clear() { pageIndexes.clear(); repLevelHistogram.clear(); defLevelHistogram.clear(); + geospatialStatistics.clear(); } abstract void clearMinMax(); diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java index 05629dd388..25ba540cc4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java @@ -33,6 +33,7 @@ import static org.apache.parquet.schema.PrimitiveStringifier.TIME_STRINGIFIER; import static org.apache.parquet.schema.PrimitiveStringifier.TIME_UTC_STRINGIFIER; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -43,6 +44,10 @@ import org.apache.parquet.Preconditions; public abstract class LogicalTypeAnnotation { + + // TODO: Move this to an external configuration + public static final String DEFAULT_GEOMETRY_CRS = "OGC:CRS84"; + enum LogicalTypeToken { MAP { @Override @@ -146,6 +151,33 @@ protected LogicalTypeAnnotation fromString(List params) { protected LogicalTypeAnnotation fromString(List params) { return float16Type(); } + }, + GEOMETRY { + @Override + protected LogicalTypeAnnotation fromString(List params) { + if (params.size() < 1) { + throw new RuntimeException( + "Expecting at least 1 parameter for geometry logical type, got " + params.size()); + } + String crs = params.size() > 0 ? params.get(0) : null; + ByteBuffer metadata = + params.size() > 2 ? ByteBuffer.wrap(params.get(2).getBytes()) : null; + return geometryType(crs, metadata); + } + }, + GEOGRAPHY { + @Override + protected LogicalTypeAnnotation fromString(List params) { + if (params.size() < 1) { + throw new RuntimeException( + "Expecting at least 1 parameter for geography logical type, got " + params.size()); + } + String crs = params.size() > 0 ? params.get(0) : null; + String edgeAlgorithm = params.size() > 1 ? params.get(1) : null; + ByteBuffer metadata = + params.size() > 2 ? ByteBuffer.wrap(params.get(2).getBytes()) : null; + return geographyType(crs, edgeAlgorithm, metadata); + } }; protected abstract LogicalTypeAnnotation fromString(List params); @@ -316,6 +348,22 @@ public static Float16LogicalTypeAnnotation float16Type() { return Float16LogicalTypeAnnotation.INSTANCE; } + public static GeometryLogicalTypeAnnotation geometryType(String crs, ByteBuffer metadata) { + return new GeometryLogicalTypeAnnotation(crs, metadata); + } + + public static GeometryLogicalTypeAnnotation geometryType(ByteBuffer metadata) { + return new GeometryLogicalTypeAnnotation(DEFAULT_GEOMETRY_CRS, metadata); + } + + public static GeometryLogicalTypeAnnotation geometryType() { + return new GeometryLogicalTypeAnnotation(DEFAULT_GEOMETRY_CRS, null); + } + + public static GeographyLogicalTypeAnnotation geographyType(String crs, String edgeAlgorithm, ByteBuffer metadata) { + return new GeographyLogicalTypeAnnotation(crs, edgeAlgorithm, metadata); + } + public static class StringLogicalTypeAnnotation extends LogicalTypeAnnotation { private static final StringLogicalTypeAnnotation INSTANCE = new StringLogicalTypeAnnotation(); @@ -1091,6 +1139,156 @@ public int hashCode() { } } + public static class GeometryLogicalTypeAnnotation extends LogicalTypeAnnotation { + private final String crs; + private final ByteBuffer metadata; + + private GeometryLogicalTypeAnnotation(String crs, ByteBuffer metadata) { + this.crs = crs; + this.metadata = metadata; + } + + @Override + @Deprecated + public OriginalType toOriginalType() { + return null; + } + + @Override + public Optional accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) { + return logicalTypeAnnotationVisitor.visit(this); + } + + @Override + LogicalTypeToken getType() { + return LogicalTypeToken.GEOMETRY; + } + + @Override + protected String typeParametersAsString() { + StringBuilder sb = new StringBuilder(); + sb.append("("); + sb.append(","); + if (crs != null && !crs.isEmpty()) { + sb.append(","); + sb.append(crs); + } + if (metadata != null) { + sb.append(","); + sb.append(metadata); + } + sb.append(")"); + return sb.toString(); + } + + public String getCrs() { + return crs; + } + + public ByteBuffer getMetadata() { + return metadata; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GeometryLogicalTypeAnnotation)) { + return false; + } + GeometryLogicalTypeAnnotation other = (GeometryLogicalTypeAnnotation) obj; + return crs.equals(other.crs); + } + + @Override + public int hashCode() { + return Objects.hash(crs); + } + + @Override + PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) { + return PrimitiveStringifier.WKB_STRINGIFIER; + } + } + + public static class GeographyLogicalTypeAnnotation extends LogicalTypeAnnotation { + private final String crs; + private final String edgeAlgorithm; + private final ByteBuffer metadata; + + private GeographyLogicalTypeAnnotation(String crs, String edgeAlgorithm, ByteBuffer metadata) { + this.crs = crs; + this.edgeAlgorithm = edgeAlgorithm; + this.metadata = metadata; + } + + @Override + @Deprecated + public OriginalType toOriginalType() { + return null; + } + + @Override + public Optional accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) { + return logicalTypeAnnotationVisitor.visit(this); + } + + @Override + LogicalTypeToken getType() { + return LogicalTypeToken.GEOMETRY; + } + + @Override + protected String typeParametersAsString() { + StringBuilder sb = new StringBuilder(); + sb.append("("); + sb.append(","); + if (crs != null && !crs.isEmpty()) { + sb.append(","); + sb.append(crs); + } + if (edgeAlgorithm != null) { + sb.append(","); + sb.append(edgeAlgorithm); + } + if (metadata != null) { + sb.append(","); + sb.append(metadata); + } + sb.append(")"); + return sb.toString(); + } + + public String getCrs() { + return crs; + } + + public String getEdgeAlgorithm() { + return edgeAlgorithm; + } + + public ByteBuffer getMetadata() { + return metadata; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GeometryLogicalTypeAnnotation)) { + return false; + } + GeometryLogicalTypeAnnotation other = (GeometryLogicalTypeAnnotation) obj; + return crs.equals(other.crs); + } + + @Override + public int hashCode() { + return Objects.hash(crs); + } + + @Override + PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) { + return PrimitiveStringifier.WKB_STRINGIFIER; + } + } + /** * Implement this interface to visit a logical type annotation in the schema. * The default implementation for each logical type specific visitor method is empty. @@ -1162,5 +1360,13 @@ default Optional visit(MapKeyValueTypeAnnotation mapKeyValueLogicalType) { default Optional visit(Float16LogicalTypeAnnotation float16LogicalType) { return empty(); } + + default Optional visit(GeometryLogicalTypeAnnotation geometryLogicalType) { + return empty(); + } + + default Optional visit(GeographyLogicalTypeAnnotation geographyLogicalType) { + return empty(); + } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java index c46e94367f..bb5c8a9474 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java @@ -35,6 +35,9 @@ import java.util.concurrent.TimeUnit; import javax.naming.OperationNotSupportedException; import org.apache.parquet.io.api.Binary; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; /** * Class that provides string representations for the primitive values. These string values are to be used for @@ -449,4 +452,20 @@ String stringifyNotNull(Binary value) { return Float16.toFloatString(value); } }; + + static final PrimitiveStringifier WKB_STRINGIFIER = new BinaryStringifierBase("WKB_STRINGIFIER") { + + @Override + String stringifyNotNull(Binary value) { + + Geometry geometry; + try { + WKBReader reader = new WKBReader(); + geometry = reader.read(value.getBytesUnsafe()); + return geometry.toText(); + } catch (ParseException e) { + return BINARY_INVALID; + } + } + }; } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index e74d7cde02..f08e20333d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -271,6 +271,14 @@ public Optional visit( LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); } + + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + // ColumnOrder is undefined for GEOMETRY logical type. Use the default comparator for + // now. + return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); + } }) .orElseThrow(() -> new ShouldNeverHappenException( "No comparator logic implemented for BINARY logical type: " + logicalType)); diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java index 5bc2f89f47..b69e3f7f6f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java @@ -571,6 +571,18 @@ public Optional visit( return checkBinaryPrimitiveType(enumLogicalType); } + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + return checkBinaryPrimitiveType(geometryLogicalType); + } + + @Override + public Optional visit( + LogicalTypeAnnotation.GeographyLogicalTypeAnnotation geographyLogicalType) { + return checkBinaryPrimitiveType(geographyLogicalType); + } + private Optional checkFixedPrimitiveType( int l, LogicalTypeAnnotation logicalTypeAnnotation) { Preconditions.checkState( diff --git a/parquet-column/src/test/java/org/apache/parquet/column/statistics/geometry/BoundingBoxTest.java b/parquet-column/src/test/java/org/apache/parquet/column/statistics/geometry/BoundingBoxTest.java new file mode 100644 index 0000000000..f0f01e5c86 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/statistics/geometry/BoundingBoxTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import org.junit.Assert; +import org.junit.Test; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Point; +import org.locationtech.jts.geom.Polygon; + +public class BoundingBoxTest { + + @Test + public void testUpdate() { + GeometryFactory geometryFactory = new GeometryFactory(); + BoundingBox boundingBox = new BoundingBox(); + + // Create a 2D point + Point point2D = geometryFactory.createPoint(new Coordinate(10, 20)); + boundingBox.update(point2D, "EPSG:4326"); + Assert.assertEquals(10.0, boundingBox.getXMin(), 0.0); + Assert.assertEquals(10.0, boundingBox.getXMax(), 0.0); + Assert.assertEquals(20.0, boundingBox.getYMin(), 0.0); + Assert.assertEquals(20.0, boundingBox.getYMax(), 0.0); + } + + @Test + public void testWraparound() { + GeometryFactory geometryFactory = new GeometryFactory(); + BoundingBox boundingBox = new BoundingBox(); + + // Create a polygon near the antimeridian line + Coordinate[] coords1 = new Coordinate[] { + new Coordinate(170, 10), new Coordinate(175, 15), new Coordinate(170, 15), new Coordinate(170, 10) + }; + Polygon polygon1 = geometryFactory.createPolygon(coords1); + boundingBox.update(polygon1, "EPSG:4326"); + // Check if the wraparound is handled correctly + Assert.assertEquals(170.0, boundingBox.getXMin(), 0.0); + Assert.assertEquals(175.0, boundingBox.getXMax(), 0.0); + Assert.assertEquals(10.0, boundingBox.getYMin(), 0.0); + Assert.assertEquals(15.0, boundingBox.getYMax(), 0.0); + + // Create an additional polygon crossing the antimeridian line + Coordinate[] coords2 = new Coordinate[] { + new Coordinate(175, -10), new Coordinate(-175, -5), new Coordinate(175, -5), new Coordinate(175, -10) + }; + Polygon polygon2 = geometryFactory.createPolygon(coords2); + + boundingBox.update(polygon2, "EPSG:4326"); + // Check if the wraparound is handled correctly + Assert.assertEquals(175.0, boundingBox.getXMin(), 0.0); + Assert.assertEquals(-175.0, boundingBox.getXMax(), 0.0); + Assert.assertEquals(-10.0, boundingBox.getYMin(), 0.0); + Assert.assertEquals(15.0, boundingBox.getYMax(), 0.0); + + // Create another polygon on the other side of the antimeridian line + Coordinate[] coords3 = new Coordinate[] { + new Coordinate(-170, 20), new Coordinate(-165, 25), new Coordinate(-170, 25), new Coordinate(-170, 20) + }; + // longitude range: [-170, -165] + Polygon polygon3 = geometryFactory.createPolygon(coords3); + boundingBox.update(polygon3, "EPSG:4326"); + + // Check if the wraparound is handled correctly + Assert.assertEquals(175.0, boundingBox.getXMin(), 0.0); + Assert.assertEquals(-175.0, boundingBox.getXMax(), 0.0); + Assert.assertEquals(-10.0, boundingBox.getYMin(), 0.0); + Assert.assertEquals(25.0, boundingBox.getYMax(), 0.0); + } +} diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index d4aa4b42a7..48e1d639e9 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -135,6 +135,12 @@ jar compile + + org.locationtech.jts + jts-core + ${jts.version} + test + io.airlift aircompressor diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index e72f2c33a2..3efc934585 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -50,6 +50,7 @@ import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.column.statistics.geometry.GeometryTypes; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.AesGcmEncryptor; @@ -65,6 +66,7 @@ import org.apache.parquet.format.BloomFilterHash; import org.apache.parquet.format.BloomFilterHeader; import org.apache.parquet.format.BoundaryOrder; +import org.apache.parquet.format.BoundingBox; import org.apache.parquet.format.BsonType; import org.apache.parquet.format.ColumnChunk; import org.apache.parquet.format.ColumnCryptoMetaData; @@ -78,12 +80,16 @@ import org.apache.parquet.format.DateType; import org.apache.parquet.format.DecimalType; import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.EdgeInterpolationAlgorithm; import org.apache.parquet.format.Encoding; import org.apache.parquet.format.EncryptionWithColumnKey; import org.apache.parquet.format.EnumType; import org.apache.parquet.format.FieldRepetitionType; import org.apache.parquet.format.FileMetaData; import org.apache.parquet.format.Float16Type; +import org.apache.parquet.format.GeographyType; +import org.apache.parquet.format.GeometryType; +import org.apache.parquet.format.GeospatialStatistics; import org.apache.parquet.format.IntType; import org.apache.parquet.format.JsonType; import org.apache.parquet.format.KeyValue; @@ -113,12 +119,8 @@ import org.apache.parquet.format.UUIDType; import org.apache.parquet.format.Uncompressed; import org.apache.parquet.format.XxHash; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.*; import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.internal.column.columnindex.BinaryTruncator; import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; @@ -519,6 +521,30 @@ public Optional visit(LogicalTypeAnnotation.Float16LogicalTypeAnnot public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { return of(LogicalType.UNKNOWN(new NullType())); } + + @Override + public Optional visit(LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + GeometryType geometryType = new GeometryType(); + if (geometryLogicalType.getCrs() != null) { + geometryType.setCrs(geometryLogicalType.getCrs()); + } + return of(LogicalType.GEOMETRY(geometryType)); + } + + @Override + public Optional visit(LogicalTypeAnnotation.GeographyLogicalTypeAnnotation geographyLogicalType) { + GeographyType geographyType = new GeographyType(); + if (geographyLogicalType.getCrs() != null) { + geographyType.setCrs(geographyLogicalType.getCrs()); + } + if (geographyType.getAlgorithm() != null) { + String algorithm = geographyLogicalType.getEdgeAlgorithm(); + if (algorithm != null) { + geographyType.setAlgorithm(EdgeInterpolationAlgorithm.valueOf(algorithm)); + } + } + return of(LogicalType.GEOGRAPHY(geographyType)); + } } private void addRowGroup( @@ -583,6 +609,11 @@ private void addRowGroup( metaData.setSize_statistics(toParquetSizeStatistics(columnMetaData.getSizeStatistics())); } + if (columnMetaData.getGeospatialStatistics() != null) { + metaData.setGeospatial_statistics( + toParquetGeometryStatistics(columnMetaData.getGeospatialStatistics())); + } + if (!encryptMetaData) { columnChunk.setMeta_data(metaData); } else { @@ -770,6 +801,37 @@ public static Statistics toParquetStatistics( return formatStats; } + private static GeospatialStatistics toParquetStatistics( + org.apache.parquet.column.statistics.geometry.GeospatialStatistics stats) { + GeospatialStatistics formatStats = new GeospatialStatistics(); + + if (stats.getBoundingBox() != null) { + formatStats.setBbox(toParquetBoundingBox(stats.getBoundingBox())); + } + List geometryTypes = new ArrayList<>(stats.getGeometryTypes().getTypes()); + Collections.sort(geometryTypes); + formatStats.setGeospatial_types(geometryTypes); + + return formatStats; + } + + private static BoundingBox toParquetBoundingBox(org.apache.parquet.column.statistics.geometry.BoundingBox bbox) { + BoundingBox formatBbox = new BoundingBox(); + formatBbox.setXmin(bbox.getXMin()); + formatBbox.setXmax(bbox.getXMax()); + formatBbox.setYmin(bbox.getYMin()); + formatBbox.setYmax(bbox.getYMax()); + if (bbox.getZMin() <= bbox.getZMax()) { + formatBbox.setZmin(bbox.getZMin()); + formatBbox.setZmax(bbox.getZMax()); + } + if (bbox.getMMin() <= bbox.getMMax()) { + formatBbox.setMmin(bbox.getMMin()); + formatBbox.setMmax(bbox.getMMax()); + } + return formatBbox; + } + private static boolean withinLimit(org.apache.parquet.column.statistics.Statistics stats, int truncateLength) { if (stats.isSmallerThan(MAX_STATS_SIZE)) { return true; @@ -875,6 +937,61 @@ public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( return fromParquetStatisticsInternal(createdBy, statistics, type, expectedOrder); } + private GeospatialStatistics toParquetGeometryStatistics( + org.apache.parquet.column.statistics.geometry.GeospatialStatistics geospatialStatistics) { + if (geospatialStatistics == null) { + return null; + } + + GeospatialStatistics formatStats = new GeospatialStatistics(); + + if (geospatialStatistics.getBoundingBox() != null) { + formatStats.setBbox(toParquetBoundingBox(geospatialStatistics.getBoundingBox())); + } + + if (geospatialStatistics.getGeometryTypes() != null) { + List geometryTypes = + new ArrayList<>(geospatialStatistics.getGeometryTypes().getTypes()); + Collections.sort(geometryTypes); + formatStats.setGeospatial_types(geometryTypes); + } + + return formatStats; + } + + static org.apache.parquet.column.statistics.geometry.GeospatialStatistics fromParquetStatistics( + GeospatialStatistics formatGeomStats, PrimitiveType type) { + org.apache.parquet.column.statistics.geometry.BoundingBox bbox = null; + if (formatGeomStats.isSetBbox()) { + BoundingBox formatBbox = formatGeomStats.getBbox(); + bbox = new org.apache.parquet.column.statistics.geometry.BoundingBox( + formatBbox.getXmin(), + formatBbox.getXmax(), + formatBbox.getYmin(), + formatBbox.getYmax(), + formatBbox.isSetZmin() ? formatBbox.getZmin() : Double.NaN, + formatBbox.isSetZmax() ? formatBbox.getZmax() : Double.NaN, + formatBbox.isSetMmin() ? formatBbox.getMmin() : Double.NaN, + formatBbox.isSetMmax() ? formatBbox.getMmax() : Double.NaN); + } + org.apache.parquet.column.statistics.geometry.GeometryTypes geometryTypes = null; + if (formatGeomStats.isSetGeospatial_types()) { + geometryTypes = new GeometryTypes(new HashSet<>(formatGeomStats.getGeospatial_types())); + } + + // get the logical type annotation data from the type + LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) { + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType = + (LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) logicalType; + return new org.apache.parquet.column.statistics.geometry.GeospatialStatistics( + geometryLogicalType.getCrs(), geometryLogicalType.getMetadata(), bbox, geometryTypes); + } + return new org.apache.parquet.column.statistics.geometry.GeospatialStatistics( + // this case should not happen in normal cases + null, null, bbox, geometryTypes); + } + /** * Sort order for page and column statistics. Types are associated with sort * orders (e.g., UTF8 columns should use UNSIGNED) and column stats are @@ -1032,6 +1149,12 @@ public Optional visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { return of(SortOrder.SIGNED); } + + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + return of(SortOrder.UNKNOWN); + } }) .orElse(defaultSortOrder(primitive.getPrimitiveTypeName())); } @@ -1142,7 +1265,8 @@ LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type, SchemaElement } LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) { - switch (type.getSetField()) { + LogicalType._Fields setField = type.getSetField(); + switch (setField) { case MAP: return LogicalTypeAnnotation.mapType(); case BSON: @@ -1175,6 +1299,17 @@ LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) { return LogicalTypeAnnotation.uuidType(); case FLOAT16: return LogicalTypeAnnotation.float16Type(); + case GEOMETRY: + GeometryType geometry = type.getGEOMETRY(); + return LogicalTypeAnnotation.geometryType(geometry.getCrs(), null); + case GEOGRAPHY: + GeographyType geography = type.getGEOGRAPHY(); + if (geography.getAlgorithm() != null) { + return LogicalTypeAnnotation.geographyType( + geography.getCrs(), geography.getAlgorithm().name(), null); + } else { + return LogicalTypeAnnotation.geographyType(geography.getCrs(), null, null); + } default: throw new RuntimeException("Unknown logical type " + type); } @@ -1611,7 +1746,8 @@ public ColumnChunkMetaData buildColumnChunkMetaData( metaData.num_values, metaData.total_compressed_size, metaData.total_uncompressed_size, - fromParquetSizeStatistics(metaData.size_statistics, type)); + fromParquetSizeStatistics(metaData.size_statistics, type), + fromParquetStatistics(metaData.geospatial_statistics, type)); } public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException { @@ -2274,6 +2410,7 @@ public static ColumnIndex toParquetColumnIndex( if (defLevelHistogram != null && !defLevelHistogram.isEmpty()) { parquetColumnIndex.setDefinition_level_histograms(defLevelHistogram); } + return parquetColumnIndex; } @@ -2290,7 +2427,8 @@ public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromPar parquetColumnIndex.getMin_values(), parquetColumnIndex.getMax_values(), parquetColumnIndex.getRepetition_level_histograms(), - parquetColumnIndex.getDefinition_level_histograms()); + parquetColumnIndex.getDefinition_level_histograms(), + null); } public static OffsetIndex toParquetOffsetIndex( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index f0a912f599..75dd4b816f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -55,8 +55,10 @@ import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.statistics.BinaryStatistics; import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.statistics.geometry.GeospatialStatistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.ColumnEncryptionProperties; @@ -1382,6 +1384,11 @@ public void endColumn() throws IOException { currentColumnIndexes.add(columnIndexBuilder.build()); } currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); + // calculate the geometryStatistics from the BinaryStatistics + GeospatialStatistics geospatialStatistics = null; + if (currentStatistics instanceof BinaryStatistics) + geospatialStatistics = ((BinaryStatistics) currentStatistics).getGeospatialStatistics(); + currentBlock.addColumn(ColumnChunkMetaData.get( currentChunkPath, currentChunkType, @@ -1394,7 +1401,8 @@ public void endColumn() throws IOException { currentChunkValueCount, compressedLength, uncompressedLength, - currentSizeStatistics)); + currentSizeStatistics, + geospatialStatistics)); this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); this.uncompressedLength = 0; this.compressedLength = 0; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 14a949b0e0..954beb36b3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -31,6 +31,7 @@ import org.apache.parquet.column.statistics.BooleanStatistics; import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.statistics.geometry.GeospatialStatistics; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.InternalColumnDecryptionSetup; import org.apache.parquet.crypto.InternalFileDecryptor; @@ -39,6 +40,7 @@ import org.apache.parquet.format.ColumnMetaData; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Types; @@ -145,6 +147,35 @@ public static ColumnChunkMetaData get( totalUncompressedSize); } + public static ColumnChunkMetaData get( + ColumnPath path, + PrimitiveType type, + CompressionCodecName codec, + EncodingStats encodingStats, + Set encodings, + Statistics statistics, + long firstDataPage, + long dictionaryPageOffset, + long valueCount, + long totalSize, + long totalUncompressedSize, + SizeStatistics sizeStatistics) { + return get( + path, + type, + codec, + encodingStats, + encodings, + statistics, + firstDataPage, + dictionaryPageOffset, + valueCount, + totalSize, + totalUncompressedSize, + sizeStatistics, + null); + } + public static ColumnChunkMetaData get( ColumnPath path, PrimitiveType type, @@ -169,6 +200,7 @@ public static ColumnChunkMetaData get( valueCount, totalSize, totalUncompressedSize, + null, null); } @@ -199,7 +231,25 @@ public static ColumnChunkMetaData get( long valueCount, long totalSize, long totalUncompressedSize, - SizeStatistics sizeStatistics) { + SizeStatistics sizeStatistics, + GeospatialStatistics geospatialStats) { + + LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) { + return new GeometryColumnChunkMetaData( + path, + type, + codec, + encodingStats, + encodings, + statistics, + firstDataPage, + dictionaryPageOffset, + valueCount, + totalSize, + totalUncompressedSize, + geospatialStats); + } // to save space we store those always positive longs in ints when they fit. if (positiveLongFitsInAnInt(firstDataPage) @@ -395,6 +445,12 @@ public SizeStatistics getSizeStatistics() { throw new UnsupportedOperationException("SizeStatistics is not implemented"); } + /** @return the geometry stats for this column */ + @JsonIgnore + public GeospatialStatistics getGeospatialStatistics() { + return null; + } + /** * Method should be considered private * @@ -850,4 +906,105 @@ public SizeStatistics getSizeStatistics() { public boolean isEncrypted() { return true; } + + public GeospatialStatistics getGeospatialStatistics() { + return shadowColumnChunkMetaData.getGeospatialStatistics(); + } +} + +class GeometryColumnChunkMetaData extends ColumnChunkMetaData { + + private final long firstDataPageOffset; + private final long dictionaryPageOffset; + private final long valueCount; + private final long totalSize; + private final long totalUncompressedSize; + private final Statistics statistics; + private final GeospatialStatistics geospatialStatistics; + + /** + * @param path column identifier + * @param type type of the column + * @param codec + * @param encodings + * @param statistics + * @param firstDataPageOffset + * @param dictionaryPageOffset + * @param valueCount + * @param totalSize + * @param totalUncompressedSize + * @param geospatialStatistics + */ + GeometryColumnChunkMetaData( + ColumnPath path, + PrimitiveType type, + CompressionCodecName codec, + EncodingStats encodingStats, + Set encodings, + Statistics statistics, + long firstDataPageOffset, + long dictionaryPageOffset, + long valueCount, + long totalSize, + long totalUncompressedSize, + GeospatialStatistics geospatialStatistics) { + super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings)); + this.statistics = statistics; + this.firstDataPageOffset = firstDataPageOffset; + this.dictionaryPageOffset = dictionaryPageOffset; + this.valueCount = valueCount; + this.totalSize = totalSize; + this.totalUncompressedSize = totalUncompressedSize; + this.geospatialStatistics = geospatialStatistics; + } + + /** + * @return start of the column data offset + */ + public long getFirstDataPageOffset() { + return firstDataPageOffset; + } + + /** + * @return the location of the dictionary page if any + */ + public long getDictionaryPageOffset() { + return dictionaryPageOffset; + } + + /** + * @return count of values in this block of the column + */ + public long getValueCount() { + return valueCount; + } + + /** + * @return the totalUncompressedSize + */ + public long getTotalUncompressedSize() { + return totalUncompressedSize; + } + + /** + * @return the totalSize + */ + public long getTotalSize() { + return totalSize; + } + + public SizeStatistics getSizeStatistics() { + return null; + } + + /** + * @return the stats for this column + */ + public Statistics getStatistics() { + return statistics; + } + + public GeospatialStatistics getGeospatialStatistics() { + return geospatialStatistics; + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 9535b4335d..8031175e1b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -508,7 +508,8 @@ private void processBlock( chunk.getValueCount(), chunk.getTotalSize(), chunk.getTotalUncompressedSize(), - chunk.getSizeStatistics()); + chunk.getSizeStatistics(), + chunk.getGeospatialStatistics()); } ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java new file mode 100644 index 0000000000..71ec709ee1 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.geometryType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.statistics.geometry.GeospatialStatistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.io.WKBWriter; + +public class TestGeometryTypeRoundTrip { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private Path newTempPath() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return file.toPath(); + } + + @Test + public void testEPSG4326BasicReadWriteGeometryValue() throws Exception { + GeometryFactory geomFactory = new GeometryFactory(); + + // A class to convert JTS Geometry objects to and from Well-Known Binary (WKB) format. + WKBWriter wkbWriter = new WKBWriter(); + + // OGC:CRS84 (WGS 84): Uses the order longitude, latitude + Binary[] points = { + Binary.fromConstantByteArray(wkbWriter.write(geomFactory.createPoint(new Coordinate(1.0, 1.0)))), + Binary.fromConstantByteArray(wkbWriter.write(geomFactory.createPoint(new Coordinate(2.0, 2.0)))) + }; + + // A message type that represents a message with a geometry column. + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(geometryType()) // OGC:CRS84: Uses the order longitude, latitude + .named("geometry") + .named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(path)) + .withConf(conf) + .withDictionaryEncoding(false) + .build()) { + for (Binary value : points) { + writer.write(factory.newGroup().append("geometry", value)); + } + } + + try (ParquetFileReader reader = ParquetFileReader.open(new LocalInputFile(path))) { + Assert.assertEquals(2, reader.getRecordCount()); + + ParquetMetadata footer = reader.getFooter(); + Assert.assertNotNull(footer); + + ColumnChunkMetaData columnChunkMetaData = + reader.getRowGroups().get(0).getColumns().get(0); + Assert.assertNotNull(columnChunkMetaData); + + GeospatialStatistics geospatialStatistics = columnChunkMetaData.getGeospatialStatistics(); + Assert.assertNotNull(geospatialStatistics); + + Assert.assertEquals(1.0, geospatialStatistics.getBoundingBox().getXMin(), 0.0); + Assert.assertEquals(2.0, geospatialStatistics.getBoundingBox().getXMax(), 0.0); + Assert.assertEquals(1.0, geospatialStatistics.getBoundingBox().getYMin(), 0.0); + Assert.assertEquals(2.0, geospatialStatistics.getBoundingBox().getYMax(), 0.0); + + ColumnIndex columnIndex = reader.readColumnIndex(columnChunkMetaData); + Assert.assertNotNull(columnIndex); + } + } + + @Test + public void testBasicReadWriteGeometryValueWithCovering() throws Exception { + GeometryFactory geomFactory = new GeometryFactory(); + + // A class to convert JTS Geometry objects to and from Well-Known Binary (WKB) format. + WKBWriter wkbWriter = new WKBWriter(); + + // EPSG:4326: Also known as WGS 84, it uses latitude and longitude coordinates. + Binary[] points = { + Binary.fromConstantByteArray(wkbWriter.write(geomFactory.createPoint(new Coordinate(1.0, 1.0)))), + Binary.fromConstantByteArray(wkbWriter.write(geomFactory.createPoint(new Coordinate(2.0, 2.0)))) + }; + + // A message type that represents a message with a geometry column. + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(geometryType("OGC:CRS84", null)) // OGC:CRS84: Uses the order longitude, latitude + .named("geometry") + .named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(path)) + .withConf(conf) + .withDictionaryEncoding(false) + .build()) { + for (Binary value : points) { + writer.write(factory.newGroup().append("geometry", value)); + } + } + + try (ParquetFileReader reader = ParquetFileReader.open(new LocalInputFile(path))) { + Assert.assertEquals(2, reader.getRecordCount()); + + ParquetMetadata footer = reader.getFooter(); + Assert.assertNotNull(footer); + + ColumnChunkMetaData columnChunkMetaData = + reader.getRowGroups().get(0).getColumns().get(0); + Assert.assertNotNull(columnChunkMetaData); + + GeospatialStatistics geospatialStatistics = columnChunkMetaData.getGeospatialStatistics(); + Assert.assertNotNull(geospatialStatistics); + + ColumnIndex columnIndex = reader.readColumnIndex(columnChunkMetaData); + Assert.assertNotNull(columnIndex); + } + } +} diff --git a/pom.xml b/pom.xml index 4ec8da95ec..4c1946127a 100644 --- a/pom.xml +++ b/pom.xml @@ -92,9 +92,8 @@ 1.3.2 2.30.0 shaded.parquet - - 3.3.0 - 2.10.0 + 3.3.6 + 2.11.0-SNAPSHOT 1.15.0 thrift ${thrift.executable} @@ -111,8 +110,9 @@ 0.1.1 1.10.19 2.0.9 - 0.27ea0 + 0.26ea0 3.5.0 + 1.19.0 2.3