Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Spark job grouping to distinguish steps of the machine learning flow #467

Merged
merged 33 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f2fad1c
WIP adding a job group util
nicodv Feb 25, 2020
b0d89bc
complete JobGroupUtilTest
nicodv Feb 25, 2020
babeaa3
introduce enum for job groups
nicodv Feb 25, 2020
5fde259
add job group for reading/filtering
nicodv Feb 25, 2020
5d90bf1
add OpStep enum
nicodv Feb 25, 2020
c02da84
add scoring step
nicodv Feb 27, 2020
6c7a6c7
cleanup
nicodv Feb 27, 2020
99ea18b
add some job groups to OpWorkflowModel
nicodv Mar 4, 2020
3df56f3
set job groups for feature engineering, sanity checker, cross-validation
nicodv Mar 4, 2020
8c3c388
catch job group on job start; print job group with stage metrics
nicodv Mar 4, 2020
a265437
fix test, rename test, docs
nicodv Mar 4, 2020
03f2355
make jobgroup protected
nicodv Mar 4, 2020
79ad0d7
add tagging to .computeDataUpTo
nicodv Mar 10, 2020
9b3918a
always log job groups too
nicodv Mar 10, 2020
673c3ca
refactor
nicodv Mar 10, 2020
483e62e
move job group logic out of closure
nicodv Mar 10, 2020
8e1b434
set OpStep.FeatureEngineering after data reading/filtering
nicodv Mar 11, 2020
93821d6
add OpStep.Scoring for streaming scoring too
nicodv Mar 11, 2020
7c82091
remove superfluous withJobGroup
nicodv Mar 11, 2020
39ac685
tag saving model at lower level
nicodv Mar 11, 2020
79bcdad
prune steps a bit
nicodv Mar 11, 2020
e01aaa8
re-add removed variable
nicodv Mar 11, 2020
da02414
remove superfluous job group set
nicodv Mar 12, 2020
9cd9598
set scoring job grouping in better location
nicodv Mar 12, 2020
cd00b0d
remove problematic score job group
nicodv Mar 16, 2020
d7f02f4
move feature engineering job group to OpWorkflow, consistent with oth…
nicodv Mar 16, 2020
e6bb4ae
cleanup
nicodv Mar 18, 2020
65d371f
Merge branch 'master' into ndv/jobgroups
nicodv Mar 18, 2020
7bf0111
add "other" to enum, moving OpStep to utils in the process; rename Sa…
nicodv Mar 19, 2020
27ee0c8
use Spec
nicodv Mar 19, 2020
9bb9bd0
docs
nicodv Mar 19, 2020
1b00c0b
Merge remote-tracking branch 'origin/ndv/jobgroups' into ndv/jobgroups
nicodv Mar 19, 2020
818ce67
fix test
nicodv Mar 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 64 additions & 47 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.stages.impl.selector.ModelSelector
import com.salesforce.op.utils.reflection.ReflectionUtils
import com.salesforce.op.utils.spark.{JobGroupUtil, OpStep}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.utils.stages.FitStagesUtil.{CutDAG, FittedDAG, Layer, StagesDAG}
Expand Down Expand Up @@ -232,28 +233,30 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* @return Dataframe with all the features generated + persisted
*/
protected def generateRawData()(implicit spark: SparkSession): DataFrame = {
(reader, rawFeatureFilter) match {
case (None, None) => throw new IllegalArgumentException(
"Data reader must be set either directly on the workflow or through the RawFeatureFilter")
case (Some(r), None) =>
checkReadersAndFeatures()
r.generateDataFrame(rawFeatures, parameters).persist()
case (rd, Some(rf)) =>
rd match {
case None => setReader(rf.trainingReader)
case Some(r) => if (r != rf.trainingReader) log.warn(
"Workflow data reader and RawFeatureFilter training reader do not match! " +
"The RawFeatureFilter training reader will be used to generate the data for training")
}
checkReadersAndFeatures()
JobGroupUtil.withJobGroup(OpStep.DataReadingAndFiltering) {
(reader, rawFeatureFilter) match {
case (None, None) => throw new IllegalArgumentException(
"Data reader must be set either directly on the workflow or through the RawFeatureFilter")
case (Some(r), None) =>
checkReadersAndFeatures()
r.generateDataFrame(rawFeatures, parameters).persist()
case (rd, Some(rf)) =>
rd match {
case None => setReader(rf.trainingReader)
case Some(r) => if (r != rf.trainingReader) log.warn(
"Workflow data reader and RawFeatureFilter training reader do not match! " +
"The RawFeatureFilter training reader will be used to generate the data for training")
}
checkReadersAndFeatures()

val FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop, rawFeatureFilterResults) =
rf.generateFilteredRaw(rawFeatures, parameters)
val FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop, rawFeatureFilterResults) =
rf.generateFilteredRaw(rawFeatures, parameters)

setRawFeatureFilterResults(rawFeatureFilterResults)
setBlacklist(featuresToDrop, rawFeatureFilterResults.rawFeatureDistributions)
setBlacklistMapKeys(mapKeysToDrop)
cleanedData
setRawFeatureFilterResults(rawFeatureFilterResults)
setBlacklist(featuresToDrop, rawFeatureFilterResults.rawFeatureDistributions)
setBlacklistMapKeys(mapKeysToDrop)
cleanedData
}
}
}

Expand Down Expand Up @@ -400,28 +403,34 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {

// doing regular workflow fit without workflow level CV
if (!isWorkflowCV) {
FitStagesUtil.fitAndTransformDAG(
dag = dag,
train = train,
test = test,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages
).transformers
// The cross-validation job group is handled in the appropriate Estimator
JobGroupUtil.withJobGroup(OpStep.FeatureEngineering) {
FitStagesUtil.fitAndTransformDAG(
dag = dag,
train = train,
test = test,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages
).transformers
}
} else {
// doing workflow level CV/TS
// Extract Model Selector and Split the DAG into
val CutDAG(modelSelectorOpt, before, during, after) = FitStagesUtil.cutDAG(dag)

log.info("Applying initial DAG before CV/TS. Stages: {}", before.flatMap(_.map(_._1.stageName)).mkString(", "))
val FittedDAG(beforeTrain, beforeTest, beforeTransformers) = FitStagesUtil.fitAndTransformDAG(
dag = before,
train = train,
test = test,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages
)
val FittedDAG(beforeTrain, beforeTest, beforeTransformers) =
JobGroupUtil.withJobGroup(OpStep.FeatureEngineering) {
FitStagesUtil.fitAndTransformDAG(
dag = before,
train = train,
test = test,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages
)
}

// Break up catalyst (cause it chokes) by converting into rdd, persisting it and then back to dataframe
val (trainRDD, testRDD) = (beforeTrain.rdd.persist(), beforeTest.rdd.persist())
Expand All @@ -437,19 +446,23 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
log.info("Estimate best Model with CV/TS. Stages included in CV are: {}, {}",
during.flatMap(_.map(_._1.stageName)).mkString(", "), modelSelector.uid: Any
)
modelSelector.findBestEstimator(trainFixed, Option(during))
JobGroupUtil.withJobGroup(OpStep.CrossValidation) {
modelSelector.findBestEstimator(trainFixed, Option(during))
}
val remainingDAG: StagesDAG = (during :+ (Array(modelSelector -> distance): Layer)) ++ after

log.info("Applying DAG after CV/TS. Stages: {}", remainingDAG.flatMap(_.map(_._1.stageName)).mkString(", "))
val fitted = FitStagesUtil.fitAndTransformDAG(
dag = remainingDAG,
train = trainFixed,
test = testFixed,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages,
fittedTransformers = beforeTransformers
).transformers
val fitted = JobGroupUtil.withJobGroup(OpStep.FeatureEngineering) {
FitStagesUtil.fitAndTransformDAG(
dag = remainingDAG,
train = trainFixed,
test = testFixed,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages,
fittedTransformers = beforeTransformers
).transformers
}
trainRDD.unpersist()
testRDD.unpersist()
fitted
Expand Down Expand Up @@ -480,7 +493,11 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* @param path to the trained workflow model
* @return workflow model
*/
def loadModel(path: String): OpWorkflowModel = new OpWorkflowModelReader(Some(this)).load(path)
def loadModel(path: String)(implicit spark: SparkSession): OpWorkflowModel = {
JobGroupUtil.withJobGroup(OpStep.ModelIO) {
new OpWorkflowModelReader(Some(this)).load(path)
}
}

/**
* Returns a dataframe containing all the columns generated up to and including the feature input
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilterResults}
import com.salesforce.op.readers.{CustomReader, Reader, ReaderKey}
import com.salesforce.op.stages.{FeatureGeneratorStage, OPStage, OpTransformer}
import com.salesforce.op.utils.spark.{JobGroupUtil, OpStep}
import com.salesforce.op.utils.spark.RichDataset._
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml._
Expand Down Expand Up @@ -108,7 +109,7 @@ private[op] trait OpWorkflowCore {

/**
* Whether the cross-validation/train-validation-split will be done at workflow level
*g c
*
* @return true if the cross-validation will be done at workflow level, false otherwise
*/
final def isWorkflowCV: Boolean = isWorkflowCVEnabled
Expand Down Expand Up @@ -306,7 +307,9 @@ private[op] trait OpWorkflowCore {
def computeDataUpTo(feature: OPFeature, path: String)
(implicit spark: SparkSession): Unit = {
val df = computeDataUpTo(feature)
df.saveAvro(path)
JobGroupUtil.withJobGroup(OpStep.ResultsSaving) {
df.saveAvro(path)
}
}

/**
Expand Down
20 changes: 14 additions & 6 deletions core/src/main/scala/com/salesforce/op/OpWorkflowModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.features.{Feature, FeatureLike, OPFeature}
import com.salesforce.op.readers.DataFrameFieldNames._
import com.salesforce.op.stages.{OPStage, OpPipelineStage, OpTransformer}
import com.salesforce.op.utils.spark.{JobGroupUtil, OpStep}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.RichMetadata._
import com.salesforce.op.utils.stages.FitStagesUtil
Expand Down Expand Up @@ -91,9 +92,11 @@ class OpWorkflowModel(val uid: String = UID[OpWorkflowModel], val trainingParams
* @return Dataframe with all the features generated + persisted
*/
protected def generateRawData()(implicit spark: SparkSession): DataFrame = {
require(reader.nonEmpty, "Data reader must be set")
checkReadersAndFeatures()
reader.get.generateDataFrame(rawFeatures, parameters).persist() // don't want to redo this
JobGroupUtil.withJobGroup(OpStep.DataReadingAndFiltering) {
require(reader.nonEmpty, "Data reader must be set")
checkReadersAndFeatures()
reader.get.generateDataFrame(rawFeatures, parameters).persist() // don't want to redo this
}
}

/**
Expand Down Expand Up @@ -217,8 +220,11 @@ class OpWorkflowModel(val uid: String = UID[OpWorkflowModel], val trainingParams
* @param path path to save the model
* @param overwrite should overwrite if the path exists
*/
def save(path: String, overwrite: Boolean = true): Unit =
OpWorkflowModelWriter.save(this, path = path, overwrite = overwrite)
def save(path: String, overwrite: Boolean = true)(implicit spark: SparkSession): Unit = {
JobGroupUtil.withJobGroup(OpStep.ModelIO) {
OpWorkflowModelWriter.save(this, path = path, overwrite = overwrite)
}
}

/**
* Gets the fitted stage that generates the input feature
Expand Down Expand Up @@ -416,7 +422,9 @@ class OpWorkflowModel(val uid: String = UID[OpWorkflowModel], val trainingParams
if (persistScores) scores.persist()

// Save the scores if a path was provided
path.foreach(scores.saveAvro(_))
JobGroupUtil.withJobGroup(OpStep.ResultsSaving) {
path.foreach(scores.saveAvro(_))
}

scores -> metrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import com.salesforce.op.stages.impl.selector.ModelSelectorNames.ModelType
import com.salesforce.op.stages.impl.tuning.{BestEstimator, _}
import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams
import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapperModel, SparkModelConverter}
import com.salesforce.op.utils.spark.{JobGroupUtil, OpStep}
import com.salesforce.op.utils.spark.RichMetadata._
import com.salesforce.op.utils.spark.RichParamMap._
import com.salesforce.op.utils.stages.FitStagesUtil._
Expand Down Expand Up @@ -143,13 +144,14 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]]
final override def fit(dataset: Dataset[_]): SelectedModel = {

implicit val spark = dataset.sparkSession
JobGroupUtil.setJobGroup(OpStep.CrossValidation)
tovbinm marked this conversation as resolved.
Show resolved Hide resolved
setInputSchema(dataset.schema).transformSchema(dataset.schema)
require(!dataset.isEmpty, "Dataset cannot be empty")
val data = dataset.select(labelColName, in2.name)
val (BestEstimator(name, estimator, summary), splitterSummary, datasetWithID) = bestEstimator.map{ e =>
val (BestEstimator(name, estimator, summary), splitterSummary, datasetWithID) = bestEstimator.map { e =>
val PrevalidationVal(summary, dataOpt) = prepareForValidation(data, labelColName)
(e, summary, dataOpt.getOrElse(data))
}.getOrElse{ findBestEstimator(data.toDF()) }
}.getOrElse { findBestEstimator(data.toDF()) }

val preparedData = splitter.map(_.validationPrepare(datasetWithID)).getOrElse(datasetWithID)
val bestModel = estimator.fit(preparedData).asInstanceOf[M]
Expand All @@ -158,7 +160,7 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]]
log.info(s"With parameters : ${bestEst.extractParamMap()}")

// set input and output params
outputsColNamesMap.foreach{ case (pname, pvalue) => bestModel.set(bestModel.getParam(pname), pvalue) }
outputsColNamesMap.foreach { case (pname, pvalue) => bestModel.set(bestModel.getParam(pname), pvalue) }

// get eval results for metadata
val trainingEval = evaluate(bestModel.transform(preparedData))
Expand All @@ -183,12 +185,20 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]]
val meta = metadataSummary.toMetadata(skipUnsupported = true)
setMetadata(meta.toSummaryMetadata())

new SelectedModel(bestModel.asInstanceOf[ModelType], outputsColNamesMap, uid = uid, operationName = operationName)
val selectedModel = new SelectedModel(
bestModel.asInstanceOf[ModelType],
outputsColNamesMap,
uid = uid,
operationName = operationName
)
.setInput(in1.asFeatureLike[RealNN], in2.asFeatureLike[OPVector])
.setParent(this)
.setMetadata(getMetadata())
.setOutputFeatureName(getOutputFeatureName)
.setEvaluators(evaluators)
// Reset the job group to feature engineering.
JobGroupUtil.setJobGroup(OpStep.FeatureEngineering)
selectedModel
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.utils.spark

import org.apache.spark.sql.SparkSession

/**
* Convenience methods for working with Spark's job groups.
*/
object JobGroupUtil {
nicodv marked this conversation as resolved.
Show resolved Hide resolved
/**
* Sets the Spark job group name and description for a wrapped code block.
* The job group is cleared afterwards.
*
* @param step The OpStep with which to the mark the Spark job group
*/
def withJobGroup[R](step: OpStep)(block: => R)(implicit spark: SparkSession): R = {
spark.sparkContext.setJobGroup(step.toString, step.entryDescription)
val result = block
spark.sparkContext.clearJobGroup()
result
}

/**
* Indefinitely sets the Spark job group name and description.
*
* @param step The OpStep with which to the mark the Spark job group
*/
def setJobGroup(step: OpStep)(implicit spark: SparkSession): Unit = {
spark.sparkContext.setJobGroup(step.toString, step.entryDescription)
}
}
Loading