From 96aa7dc4a6af5e67cbce25ec0424da3d626c44d4 Mon Sep 17 00:00:00 2001 From: "Simeon H.K. Fitch" Date: Fri, 18 Oct 2019 11:47:52 -0400 Subject: [PATCH] Applying pre-partitioning to DataSources. --- .../datasource/raster/RasterSourceRelation.scala | 8 ++++++-- .../experimental/datasource/CachedDatasetRelation.scala | 2 ++ .../datasource/awspds/L8CatalogRelation.scala | 4 +++- .../datasource/awspds/MODISCatalogRelation.scala | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/datasource/src/main/scala/org/locationtech/rasterframes/datasource/raster/RasterSourceRelation.scala b/datasource/src/main/scala/org/locationtech/rasterframes/datasource/raster/RasterSourceRelation.scala index 6af519f56..9b381d3a6 100644 --- a/datasource/src/main/scala/org/locationtech/rasterframes/datasource/raster/RasterSourceRelation.scala +++ b/datasource/src/main/scala/org/locationtech/rasterframes/datasource/raster/RasterSourceRelation.scala @@ -69,6 +69,9 @@ case class RasterSourceRelation( catalog.schema.fields.filter(f => !catalogTable.bandColumnNames.contains(f.name)) } + protected def defaultNumPartitions: Int = + sqlContext.sparkSession.sessionState.conf.numShufflePartitions + override def schema: StructType = { val tileSchema = schemaOf[ProjectedRasterTile] val paths = for { @@ -84,10 +87,11 @@ case class RasterSourceRelation( override def buildScan(): RDD[Row] = { import sqlContext.implicits._ - // The general transformaion is: + // The general transformation is: // input -> path -> src -> ref -> tile // Each step is broken down for readability val inputs: DataFrame = sqlContext.table(catalogTable.tableName) + .repartition(defaultNumPartitions) // Basically renames the input columns to have the '_path' suffix val pathsAliasing = for { @@ -112,7 +116,7 @@ case class RasterSourceRelation( val df = if (lazyTiles) { // Expand RasterSource into multiple columns per band, and multiple rows per tile - // There's some unintentional fragililty here in that the structure of the expression + // There's some unintentional fragility here in that the structure of the expression // is expected to line up with our column structure here. val refs = RasterSourceToRasterRefs(subtileDims, bandIndexes, srcs: _*) as refColNames diff --git a/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/CachedDatasetRelation.scala b/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/CachedDatasetRelation.scala index 1fac7699a..06947080d 100644 --- a/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/CachedDatasetRelation.scala +++ b/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/CachedDatasetRelation.scala @@ -33,6 +33,8 @@ import org.locationtech.rasterframes.util._ * @since 8/24/18 */ trait CachedDatasetRelation extends ResourceCacheSupport { self: BaseRelation ⇒ + protected def defaultNumPartitions: Int = + sqlContext.sparkSession.sessionState.conf.numShufflePartitions protected def cacheFile: HadoopPath protected def constructDataset: Dataset[Row] diff --git a/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/L8CatalogRelation.scala b/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/L8CatalogRelation.scala index 9a14c86f3..049617de6 100644 --- a/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/L8CatalogRelation.scala +++ b/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/L8CatalogRelation.scala @@ -68,7 +68,9 @@ case class L8CatalogRelation(sqlContext: SQLContext, sceneListPath: HadoopPath) .select(schema.map(f ⇒ col(f.name)): _*) .orderBy(ACQUISITION_DATE.name, PATH.name, ROW.name) .distinct() // The scene file contains duplicates. - .repartition(8, col(PATH.name), col(ROW.name)) + .repartition(defaultNumPartitions, col(PATH.name), col(ROW.name)) + + } } diff --git a/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/MODISCatalogRelation.scala b/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/MODISCatalogRelation.scala index 30b3ba234..6e76acc36 100644 --- a/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/MODISCatalogRelation.scala +++ b/experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/MODISCatalogRelation.scala @@ -64,7 +64,7 @@ case class MODISCatalogRelation(sqlContext: SQLContext, sceneList: HadoopPath) $"${GID.name}") ++ bandCols: _* ) .orderBy(ACQUISITION_DATE.name, GID.name) - .repartition(8, col(GRANULE_ID.name)) + .repartition(defaultNumPartitions, col(GRANULE_ID.name)) } }