Skip to content

Commit

Permalink
Release 0.5.12.
Browse files Browse the repository at this point in the history
Signed-off-by: Simeon H.K. Fitch <[email protected]>
  • Loading branch information
metasim committed Mar 27, 2018
1 parent 9b04956 commit b7a7dd7
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 49 deletions.
13 changes: 8 additions & 5 deletions project/ProjectPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object ProjectPlugin extends AutoPlugin {
override def trigger: PluginTrigger = allRequirements

val versions = Map(
"geotrellis" -> "1.2.0-RC1",
"geotrellis" -> "1.2.0-RC2",
"spark" -> "2.1.0"
)

Expand All @@ -39,11 +39,14 @@ object ProjectPlugin extends AutoPlugin {
scalaVersion := "2.11.11",
scalacOptions ++= Seq("-feature", "-deprecation"),
cancelable in Global := true,
// resolvers ++= Seq(
// "locationtech-releases" at "https://repo.locationtech.org/content/groups/releases"
// ),
resolvers ++= Seq(
"locationtech-releases" at "https://repo.locationtech.org/content/groups/releases"
),
libraryDependencies ++= Seq(
"com.chuusai" %% "shapeless" % "2.3.2",
//"org.locationtech.sfcurve" %% "sfcurve-zorder" % "0.2.0",
//"org.locationtech.geomesa" %% "geomesa-jts-spark" % "astraea.1",
"org.locationtech.geomesa" %% "geomesa-z3" % "1.3.5",
spark("core") % Provided,
spark("mllib") % Provided,
spark("sql") % Provided,
Expand Down Expand Up @@ -145,7 +148,7 @@ object ProjectPlugin extends AutoPlugin {
geotrellis("raster") % Tut
),
fork in (Tut, runner) := true,
javaOptions in (Tut, runner) := Seq("-Xmx6G")
javaOptions in (Tut, runner) := Seq("-Xmx8G")
)

def buildInfoSettings: Seq[Def.Setting[_]] = Seq(
Expand Down
107 changes: 107 additions & 0 deletions src/main/scala/astraea/spark/rasterframes/RFSpatialColumnMethods.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* This software is licensed under the Apache 2 license, quoted below.
*
* Copyright 2017 Astraea, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* [http://www.apache.org/licenses/LICENSE-2.0]
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/

package astraea.spark.rasterframes

import geotrellis.proj4.LatLng
import geotrellis.spark.SpatialKey
import geotrellis.spark.tiling.MapKeyTransform
import geotrellis.util.MethodExtensions
import geotrellis.vector.Extent
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{asc, udf}
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.locationtech.geomesa.curve.Z2SFC

/**
* RasterFrame extension methods associated with adding spatially descriptive columns.
*
* @author sfitch
* @since 12/15/17
*/
trait RFSpatialColumnMethods extends MethodExtensions[RasterFrame] {
/** Returns the key-space to map-space coordinate transform. */
def mapTransform: MapKeyTransform = self.tileLayerMetadata.widen.mapTransform

private def keyCol2Extent: Row Extent = {
val transform = self.sparkSession.sparkContext.broadcast(mapTransform)
(r: Row) transform.value.keyToExtent(SpatialKey(r.getInt(0), r.getInt(1)))
}

private def keyCol2LatLng: Row (Double, Double) = {
val transform = self.sparkSession.sparkContext.broadcast(mapTransform)
val crs = self.tileLayerMetadata.widen.crs
(r: Row) {
val center = transform.value.keyToExtent(SpatialKey(r.getInt(0), r.getInt(1))).center.reproject(crs, LatLng)
(center.x, center.y)
}
}

/**
* Append a column containing the extent of the row's spatial key.
* Coordinates are in native CRS.
* @param colName name of column to append. Defaults to "extent"
* @return updated RasterFrame
*/
def withExtent(colName: String = "extent"): RasterFrame = {
val key2Extent = udf(keyCol2Extent)
self.withColumn(colName, key2Extent(self.spatialKeyColumn)).certify
}

/**
* Append a column containing the center of the row's spatial key.
* Coordinate is in native CRS.
* @param colName name of column to append. Defaults to "center"
* @return updated RasterFrame
*/
def withCenter(colName: String = "center"): RasterFrame = {
val key2Center = udf(keyCol2Extent andThen (_.center) andThen (c (c.x, c.y)))
self.withColumn(colName, key2Center(self.spatialKeyColumn).cast(RFSpatialColumnMethods.PointStructType)).certify
}

/**
* Append a column containing the center of the row's spatial key.
* Coordinate is in (longitude, latitude) (EPSG:4326).
* @param colName name of column to append. Defaults to "center"
* @return updated RasterFrame
*/
def withCenterLatLng(colName: String = "center"): RasterFrame = {
val key2Center = udf(keyCol2LatLng)
self.withColumn(colName, key2Center(self.spatialKeyColumn).cast(RFSpatialColumnMethods.LngLatStructType)).certify
}

/**
* Appends a spatial index column
* @param colName name of new column to create. Defaults to `index`
* @param applyOrdering if true, adds `.orderBy(asc(colName))` to result. Defaults to `true`
* @return RasterFrame with index column.
*/
def withSpatialIndex(colName: String = "spatial_index", applyOrdering: Boolean = true): RasterFrame = {
val zindex = udf(keyCol2LatLng andThen (p Z2SFC.index(p._1, p._2).z))
self.withColumn(colName, zindex(self.spatialKeyColumn)) match {
case rf if applyOrdering rf.orderBy(asc(colName)).certify
case rf rf.certify
}
}
}

object RFSpatialColumnMethods {
private val PointStructType = StructType(Seq(StructField("x", DoubleType), StructField("y", DoubleType)))
private val LngLatStructType = StructType(Seq(StructField("longitude", DoubleType), StructField("latitude", DoubleType)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import geotrellis.util.{LazyLogging, MethodExtensions}
import geotrellis.vector.ProjectedExtent
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.gt.types.TileUDT
import org.apache.spark.sql.types.{Metadata, StructField}
import org.apache.spark.sql.{Column, DataFrame, Dataset, TypedColumn}
import spray.json._

import scala.reflect.runtime.universe._
Expand All @@ -38,7 +38,7 @@ import scala.reflect.runtime.universe._
* @author sfitch
* @since 7/18/17
*/
trait RasterFrameMethods extends MethodExtensions[RasterFrame] with LazyLogging {
trait RasterFrameMethods extends MethodExtensions[RasterFrame] with RFSpatialColumnMethods with LazyLogging {
type TileColumn = TypedColumn[Any, Tile]

private val _df = self
Expand Down Expand Up @@ -210,12 +210,12 @@ trait RasterFrameMethods extends MethodExtensions[RasterFrame] with LazyLogging
* Convert from RasterFrame to a GeoTrellis [[TileLayerMetadata]]
* @param tileCol column with tiles to be the
*/
def toTileLayerRDD(tileCol: TileColumn): Either[TileLayerRDD[SpatialKey], TileLayerRDD[SpaceTimeKey]] =
def toTileLayerRDD(tileCol: Column): Either[TileLayerRDD[SpatialKey], TileLayerRDD[SpaceTimeKey]] =
tileLayerMetadata.fold(
tlm Left(ContextRDD(self.select(spatialKeyColumn, tileCol).rdd, tlm)),
tlm Left(ContextRDD(self.select(spatialKeyColumn, tileCol.as[Tile]).rdd, tlm)),
tlm {
val rdd = self
.select(spatialKeyColumn, temporalKeyColumn.get, tileCol)
.select(spatialKeyColumn, temporalKeyColumn.get, tileCol.as[Tile])
.rdd
.map { case (sk, tk, v) (SpaceTimeKey(sk, tk), v) }
Right(ContextRDD(rdd, tlm))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import geotrellis.proj4.CRS
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.{ObjectType, StringType, StructField, StructType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import scala.reflect.classTag
Expand All @@ -36,8 +38,7 @@ import scala.reflect.classTag
*/
object CRSEncoder {
def apply(): ExpressionEncoder[CRS] = {
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._

val ctType = ScalaReflection.dataTypeFor[CRS]
val schema = StructType(Seq(StructField("crsProj4", StringType, false)))
val inputObject = BoundReference(0, ctType, nullable = false)
Expand All @@ -48,7 +49,7 @@ object CRSEncoder {
classOf[UTF8String],
StringType,
"fromString",
Invoke(inputObject, "toProj4String", intermediateType, Nil) :: Nil
InvokeSafely(inputObject, "toProj4String", intermediateType) :: Nil
)

val inputRow = GetColumnByOrdinal(0, schema)
Expand All @@ -57,7 +58,7 @@ object CRSEncoder {
CRSEncoder.getClass,
ctType,
"fromString",
Invoke(inputRow, "toString", intermediateType, Nil) :: Nil
InvokeSafely(inputRow, "toString", intermediateType) :: Nil
)

ExpressionEncoder[CRS](schema, flat = false, Seq(serializer), deserializer, classTag[CRS])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ object CellTypeEncoder {
classOf[UTF8String],
StringType,
"fromString",
Invoke(inputObject, "name", intermediateType, Nil) :: Nil
InvokeSafely(inputObject, "name", intermediateType) :: Nil
)

val inputRow = GetColumnByOrdinal(0, schema)
val deserializer: Expression =
StaticInvoke(CellType.getClass, ctType, "fromName", Invoke(inputRow, "toString", intermediateType, Nil) :: Nil)
StaticInvoke(CellType.getClass, ctType, "fromName", InvokeSafely(inputRow, "toString", intermediateType) :: Nil)

ExpressionEncoder[CellType](schema, flat = false, Seq(serializer), deserializer, classTag[CellType])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ trait DelegatingSubfieldEncoder {
case (name, encoder)
val enc = encoder.serializer.map(_.transform {
case r: BoundReference if r != inputObject
Invoke(inputObject, name, r.dataType)
InvokeSafely(inputObject, name, r.dataType)
})
Literal(name) :: CreateStruct(enc) :: Nil
})
Expand Down
31 changes: 31 additions & 0 deletions src/main/scala/astraea/spark/rasterframes/encoders/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package astraea.spark.rasterframes

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, InvokeLike}
import org.apache.spark.sql.types.DataType

import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

Expand All @@ -36,4 +40,31 @@ package object encoders {
ClassTag[T](typeTag[T].mirror.runtimeClass(typeTag[T].tpe))
}

/** Spark 2.1.0 -> 2.2.0 compatibility shim */
def InvokeSafely(targetObject: Expression, functionName: String, dataType: DataType): InvokeLike = {
val ctor = classOf[Invoke].getConstructors.head
val TRUE = Boolean.box(true)
if(ctor.getParameterTypes.length == 5) {
// In Spark 2.1.0 the signature looks like this:
// case class Invoke(
// targetObject: Expression,
// functionName: String,
// dataType: DataType,
// arguments: Seq[Expression] = Nil,
// propagateNull: Boolean = true) extends InvokeLike
ctor.newInstance(targetObject, functionName, dataType, Nil, TRUE).asInstanceOf[InvokeLike]
}
else {
// In spark 2.2.0 the signature looks like this:
// case class Invoke(
// targetObject: Expression,
// functionName: String,
// dataType: DataType,
// arguments: Seq[Expression] = Nil,
// propagateNull: Boolean = true,
// returnNullable : Boolean = true) extends InvokeLike
ctor.newInstance(targetObject, functionName, dataType, Nil, TRUE, TRUE).asInstanceOf[InvokeLike]
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package astraea.spark.rasterframes
import geotrellis.raster.histogram.Histogram
import geotrellis.raster.mapalgebra.local._
import geotrellis.raster._
import geotrellis.raster.render.ascii.AsciiArtEncoder

import scala.reflect.runtime.universe._

Expand Down Expand Up @@ -210,7 +211,7 @@ package object functions {
private[rasterframes] val localDivide: (Tile, Tile) Tile = safeEval(Divide.apply)

/** Render tile as ASCII string. */
private[rasterframes] val renderAscii: (Tile) String = safeEval(_.renderAscii())
private[rasterframes] val renderAscii: (Tile) String = safeEval(_.renderAscii(AsciiArtEncoder.Palette.NARROW))

/** Constructor for constant tiles */
private[rasterframes] val makeConstantTile: (Number, Int, Int, String) Tile = (value, cols, rows, cellTypeName) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/tut/apps/geotrellis-ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ Here's an example downsampling a tile and rendering each tile as a matrix of num

```tut
val downsample = udf((t: Tile) => t.resample(4, 4))
val downsampled = rf.select(renderAscii(downsample($"tile")) as "minime")
val render = udf((t: Tile) => "\n" + t.asciiDraw() + "\n")
val downsampled = rf.select(render(downsample($"tile")) as "minime")
downsampled.show(5, false)
```

Expand Down
Loading

0 comments on commit b7a7dd7

Please sign in to comment.