Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outputting Raw Feature Filter information: Part 1 #237

Merged
merged 18 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions core/src/main/scala/com/salesforce/op/ModelInsights.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ package com.salesforce.op
import com.salesforce.op.evaluators._
import com.salesforce.op.features._
import com.salesforce.op.features.types._
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.filters.{ExclusionReasons, FeatureDistribution, RawFeatureFilterResults}
import com.salesforce.op.stages._
import com.salesforce.op.stages.impl.feature.TransmogrifierDefaults
import com.salesforce.op.stages.impl.preparators._
Expand Down Expand Up @@ -89,7 +89,6 @@ case class ModelInsights
if (pretty) writePretty(this) else write(this)
}


/**
* High level model summary in a compact print friendly format containing:
* selected model info, model evaluation results and feature correlations/contributions/cramersV values.
Expand Down Expand Up @@ -327,17 +326,20 @@ case class Discrete(domain: Seq[String], prob: Seq[Double]) extends LabelInfo
/**
* Summary of feature insights for all features derived from a given input (raw) feature
*
* @param featureName name of raw feature insights are about
* @param featureType type of raw feature insights are about
* @param derivedFeatures sequence containing insights for each feature derived from the raw feature
* @param distributions distribution information for the raw feature (if calculated in RawFeatureFilter)
* @param featureName name of raw feature insights are about
* @param featureType type of raw feature insights are about
* @param derivedFeatures sequence containing insights for each feature derived from the raw feature
* @param distributions distribution information for the raw feature (if calculated in RawFeatureFilter)
* @param exclusionReasons exclusion reasons for the raw feature (if calculated in RawFeatureFilter)
*
*/
case class FeatureInsights
(
featureName: String,
featureType: String,
derivedFeatures: Seq[Insights],
distributions: Seq[FeatureDistribution] = Seq.empty
distributions: Seq[FeatureDistribution] = Seq.empty,
exclusionReasons: Seq[ExclusionReasons] = Seq.empty
)

/**
Expand Down Expand Up @@ -422,18 +424,21 @@ case object ModelInsights {
/**
* Function to extract the model summary info from the stages used to create the selected model output feature
*
* @param stages stages used to make the feature
* @param rawFeatures raw features in the workflow
* @param trainingParams parameters used to create the workflow model
* @return model insight summary
* @param stages stages used to make the feature
* @param rawFeatures raw features in the workflow
* @param trainingParams parameters used to create the workflow model
* @param blacklistedFeatures blacklisted features from use in DAG
* @param blacklistedMapKeys blacklisted map keys from use in DAG
* @param rawFeatureFilterResults results of raw feature filter
* @return
*/
private[op] def extractFromStages(
stages: Array[OPStage],
rawFeatures: Array[features.OPFeature],
trainingParams: OpParams,
blacklistedFeatures: Array[features.OPFeature],
blacklistedMapKeys: Map[String, Set[String]],
rawFeatureDistributions: Array[FeatureDistribution]
rawFeatureFilterResults: RawFeatureFilterResults
): ModelInsights = {
val sanityCheckers = stages.collect { case s: SanityCheckerModel => s }
val sanityChecker = sanityCheckers.lastOption
Expand Down Expand Up @@ -480,7 +485,7 @@ case object ModelInsights {
ModelInsights(
label = getLabelSummary(label, checkerSummary),
features = getFeatureInsights(vectorInput, checkerSummary, model, rawFeatures,
blacklistedFeatures, blacklistedMapKeys, rawFeatureDistributions),
blacklistedFeatures, blacklistedMapKeys, rawFeatureFilterResults),
selectedModelInfo = getModelInfo(model),
trainingParams = trainingParams,
stageInfo = getStageInfo(stages)
Expand Down Expand Up @@ -529,7 +534,7 @@ case object ModelInsights {
rawFeatures: Array[features.OPFeature],
blacklistedFeatures: Array[features.OPFeature],
blacklistedMapKeys: Map[String, Set[String]],
rawFeatureDistributions: Array[FeatureDistribution]
rawFeatureFilterResults: RawFeatureFilterResults = RawFeatureFilterResults()
): Seq[FeatureInsights] = {
val featureInsights = (vectorInfo, summary) match {
case (Some(v), Some(s)) =>
Expand Down Expand Up @@ -617,9 +622,10 @@ case object ModelInsights {
val ftype = allFeatures.find(_.name == fname)
.map(_.typeName)
.getOrElse("")
val distributions = rawFeatureDistributions.filter(_.name == fname)
val distributions = rawFeatureFilterResults.rawFeatureDistributions.filter(_.name == fname)
val exclusionReasons = rawFeatureFilterResults.exclusionReasons.filter(_.name == fname)
FeatureInsights(featureName = fname, featureType = ftype, derivedFeatures = seq.map(_._2),
distributions = distributions)
distributions = distributions, exclusionReasons = exclusionReasons)
}.toSeq
}

Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,12 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
"The RawFeatureFilter training reader will be used to generate the data for training")
}
checkReadersAndFeatures()
val FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop, featureDistributions) =

val FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop, rawFeatureFilterResults) =
rf.generateFilteredRaw(rawFeatures, parameters)
setRawFeatureDistributions(featureDistributions.toArray)
setBlacklist(featuresToDrop, featureDistributions)

setRawFeatureFilterResults(rawFeatureFilterResults)
setBlacklist(featuresToDrop, rawFeatureFilterResults.rawFeatureDistributions)
setBlacklistMapKeys(mapKeysToDrop)
cleanedData
}
Expand Down Expand Up @@ -349,7 +351,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
.setParameters(getParameters())
.setBlacklist(getBlacklist())
.setBlacklistMapKeys(getBlacklistMapKeys())
.setRawFeatureDistributions(getRawFeatureDistributions())
.setRawFeatureFilterResults(getRawFeatureFilterResults())

reader.map(model.setReader).getOrElse(model)
}
Expand Down
27 changes: 17 additions & 10 deletions core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.salesforce.op.utils.stages.FitStagesUtil._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.features.{FeatureDistributionType, OPFeature}
import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilterResults}
import com.salesforce.op.readers.{CustomReader, Reader, ReaderKey}
import com.salesforce.op.stages.{FeatureGeneratorStage, OPStage, OpTransformer}
import com.salesforce.op.utils.spark.RichDataset._
Expand Down Expand Up @@ -74,8 +74,8 @@ private[op] trait OpWorkflowCore {
// map keys that were blacklisted from use in dag
private[op] var blacklistedMapKeys: Map[String, Set[String]] = Map[String, Set[String]]()

// raw feature distributions calculated in raw feature filter
private[op] var rawFeatureDistributions: Array[FeatureDistribution] = Array[FeatureDistribution]()
// raw feature filter results calculated in raw feature filter
private[op] var rawFeatureFilterResults: RawFeatureFilterResults = RawFeatureFilterResults()

// stages of the workflow
private[op] var stages: Array[OPStage] = Array[OPStage]()
Expand All @@ -93,8 +93,8 @@ private[op] trait OpWorkflowCore {
this
}

private[op] final def setRawFeatureDistributions(distributions: Array[FeatureDistribution]): this.type = {
rawFeatureDistributions = distributions
private[op] final def setRawFeatureFilterResults(results: RawFeatureFilterResults): this.type = {
rawFeatureFilterResults = results
this
}

Expand Down Expand Up @@ -198,21 +198,28 @@ private[op] trait OpWorkflowCore {
* Get raw feature distribution information computed on training and scoring data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawFeatureDistributions(): Array[FeatureDistribution] = rawFeatureDistributions
final def getRawFeatureDistributions(): Seq[FeatureDistribution] = rawFeatureFilterResults.rawFeatureDistributions

/**
* Get raw feature distribution information computed on training data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawTrainingFeatureDistributions(): Array[FeatureDistribution] =
rawFeatureDistributions.filter(_.`type` == FeatureDistributionType.Training)
final def getRawTrainingFeatureDistributions(): Seq[FeatureDistribution] =
rawFeatureFilterResults.rawFeatureDistributions.filter(_.`type` == FeatureDistributionType.Training)

/**
* Get raw feature distribution information computed on scoring data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawScoringFeatureDistributions(): Array[FeatureDistribution] =
rawFeatureDistributions.filter(_.`type` == FeatureDistributionType.Scoring)
final def getRawScoringFeatureDistributions(): Seq[FeatureDistribution] =
rawFeatureFilterResults.rawFeatureDistributions.filter(_.`type` == FeatureDistributionType.Scoring)

/**
* Get raw feature filter results (filter configuration, feature distributions, and feature exclusion reasons)
* @return raw feature filter results
*/
final def getRawFeatureFilterResults(): RawFeatureFilterResults = rawFeatureFilterResults


/**
* Determine if any of the raw features do not have a matching reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class OpWorkflowModel(val uid: String = UID[OpWorkflowModel], val trainingParams
val parentStageIds = feature.traverse[Set[String]](Set.empty[String])((s, f) => s + f.originStage.uid)
val modelStages = stages.filter(s => parentStageIds.contains(s.uid))
ModelInsights.extractFromStages(modelStages, rawFeatures, trainingParams,
blacklistedFeatures, blacklistedMapKeys, rawFeatureDistributions)
blacklistedFeatures, blacklistedMapKeys, rawFeatureFilterResults)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

package com.salesforce.op

import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames
import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.{FeatureJsonHelper, OPFeature, TransientFeature}
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilterResults}
import com.salesforce.op.stages.OpPipelineStageReadWriteShared._
import com.salesforce.op.stages.{OpPipelineStageReader, _}
import org.apache.spark.ml.util.MLReader
Expand Down Expand Up @@ -81,19 +82,28 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
* @return workflow model instance
*/
def loadJson(json: JValue, path: String): Try[OpWorkflowModel] = {

for {
trainParams <- OpParams.fromString((json \ TrainParameters.entryName).extract[String])
params <- OpParams.fromString((json \ Parameters.entryName).extract[String])
model <- Try(new OpWorkflowModel(uid = (json \ Uid.entryName).extract[String], trainParams))
(stages, resultFeatures) <- Try(resolveFeaturesAndStages(json, path))
blacklist <- Try(resolveBlacklist(json))
results <- resolveRawFeatureFilterResults(json)
distributions <- resolveRawFeatureDistributions(json)
} yield model
.setStages(stages.filterNot(_.isInstanceOf[FeatureGeneratorStage[_, _]]))
.setFeatures(resultFeatures)
.setParameters(params)
.setBlacklist(blacklist)
.setRawFeatureDistributions(distributions)
.setRawFeatureFilterResults(
if (results.rawFeatureDistributions.nonEmpty) { // for backwards compatibility
results
}
else {
RawFeatureFilterResults(rawFeatureDistributions = distributions)
}
)
}

private def resolveBlacklist(json: JValue): Array[OPFeature] = {
Expand Down Expand Up @@ -154,12 +164,24 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
}
}

private def resolveRawFeatureDistributions(json: JValue): Try[Array[FeatureDistribution]] = {
if ((json \ RawFeatureDistributions.entryName) != JNothing) { // for backwards compatibility
val distString = (json \ RawFeatureDistributions.entryName).extract[String]
private def resolveRawFeatureDistributions(json: JValue): Try[Seq[FeatureDistribution]] = {

val rawFeatureDistributionsEntryName = "rawFeatureDistributions"

if ((json \ rawFeatureDistributionsEntryName) != JNothing) { // for backwards compatibility
val distString = (json \ rawFeatureDistributionsEntryName).extract[String]
FeatureDistribution.fromJson(distString)
} else {
Success(Array.empty[FeatureDistribution])
Success(Seq.empty)
}
}

private def resolveRawFeatureFilterResults(json: JValue): Try[RawFeatureFilterResults] = {
if ((json \ RawFeatureFilterResultsFieldName.entryName) != JNothing) { // for backwards compatibility
val resultsString = (json \ RawFeatureFilterResultsFieldName.entryName).extract[String]
RawFeatureFilterResults.fromJson(resultsString)
} else {
Success(RawFeatureFilterResults())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
package com.salesforce.op

import com.salesforce.op.features.FeatureJsonHelper
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.filters.RawFeatureFilterResults
import com.salesforce.op.stages.{OpPipelineStageBase, OpPipelineStageWriter}
import enumeratum._
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -81,7 +81,8 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
(FN.AllFeatures.entryName -> allFeaturesJArray) ~
(FN.Parameters.entryName -> model.parameters.toJson(pretty = false)) ~
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) ~
(FN.RawFeatureDistributions.entryName -> FeatureDistribution.toJson(model.getRawFeatureDistributions()))
(FN.RawFeatureFilterResultsFieldName.entryName ->
RawFeatureFilterResults.toJson(model.getRawFeatureFilterResults()))
}

private def resultFeaturesJArray(): JArray =
Expand Down Expand Up @@ -138,7 +139,7 @@ private[op] object OpWorkflowModelReadWriteShared {
case object AllFeatures extends FieldNames("allFeatures")
case object Parameters extends FieldNames("parameters")
case object TrainParameters extends FieldNames("trainParameters")
case object RawFeatureDistributions extends FieldNames("rawFeatureDistributions")
case object RawFeatureFilterResultsFieldName extends FieldNames("rawFeatureFilterResults")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,16 @@ object FeatureDistribution {
* @param fd feature distributions
* @return json array
*/
def toJson(fd: Array[FeatureDistribution]): String = Serialization.write[Array[FeatureDistribution]](fd)
def toJson(fd: Seq[FeatureDistribution]): String = Serialization.write[Seq[FeatureDistribution]](fd)

/**
* Feature distributions from json
*
* @param json feature distributions json
* @return feature distributions array
*/
def fromJson(json: String): Try[Array[FeatureDistribution]] = Try {
Serialization.read[Array[FeatureDistribution]](json)
def fromJson(json: String): Try[Seq[FeatureDistribution]] = Try {
Serialization.read[Seq[FeatureDistribution]](json)
}

/**
Expand Down
Loading