Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Applying pre-partitioning to DataSources. #397

Merged
merged 1 commit into from
Oct 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ case class RasterSourceRelation(
catalog.schema.fields.filter(f => !catalogTable.bandColumnNames.contains(f.name))
}

protected def defaultNumPartitions: Int =
sqlContext.sparkSession.sessionState.conf.numShufflePartitions

override def schema: StructType = {
val tileSchema = schemaOf[ProjectedRasterTile]
val paths = for {
Expand All @@ -84,10 +87,11 @@ case class RasterSourceRelation(
override def buildScan(): RDD[Row] = {
import sqlContext.implicits._

// The general transformaion is:
// The general transformation is:
// input -> path -> src -> ref -> tile
// Each step is broken down for readability
val inputs: DataFrame = sqlContext.table(catalogTable.tableName)
.repartition(defaultNumPartitions)

// Basically renames the input columns to have the '_path' suffix
val pathsAliasing = for {
Expand All @@ -112,7 +116,7 @@ case class RasterSourceRelation(

val df = if (lazyTiles) {
// Expand RasterSource into multiple columns per band, and multiple rows per tile
// There's some unintentional fragililty here in that the structure of the expression
// There's some unintentional fragility here in that the structure of the expression
// is expected to line up with our column structure here.
val refs = RasterSourceToRasterRefs(subtileDims, bandIndexes, srcs: _*) as refColNames

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.locationtech.rasterframes.util._
* @since 8/24/18
*/
trait CachedDatasetRelation extends ResourceCacheSupport { self: BaseRelation ⇒
protected def defaultNumPartitions: Int =
sqlContext.sparkSession.sessionState.conf.numShufflePartitions
protected def cacheFile: HadoopPath
protected def constructDataset: Dataset[Row]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ case class L8CatalogRelation(sqlContext: SQLContext, sceneListPath: HadoopPath)
.select(schema.map(f ⇒ col(f.name)): _*)
.orderBy(ACQUISITION_DATE.name, PATH.name, ROW.name)
.distinct() // The scene file contains duplicates.
.repartition(8, col(PATH.name), col(ROW.name))
.repartition(defaultNumPartitions, col(PATH.name), col(ROW.name))


}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class MODISCatalogRelation(sqlContext: SQLContext, sceneList: HadoopPath)
$"${GID.name}") ++ bandCols: _*
)
.orderBy(ACQUISITION_DATE.name, GID.name)
.repartition(8, col(GRANULE_ID.name))
.repartition(defaultNumPartitions, col(GRANULE_ID.name))
}
}

Expand Down