Skip to content

Commit

Permalink
Add support for ignoring text that looks like IDs in SmartTextMapVect…
Browse files Browse the repository at this point in the history
…orizer (#455)
  • Loading branch information
Jauntbox authored Jan 21, 2020
1 parent ac83ad7 commit b7e07e3
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,42 +314,49 @@ private[op] trait MapHashingFun extends HashingFun {

protected def makeVectorColumnMetadata
(
features: Array[TransientFeature],
hashFeatures: Array[TransientFeature],
ignoreFeatures: Array[TransientFeature],
params: HashingFunctionParams,
allKeys: Seq[Seq[String]],
hashKeys: Seq[Seq[String]],
ignoreKeys: Seq[Seq[String]],
shouldTrackNulls: Boolean,
shouldTrackLen: Boolean
): Array[OpVectorColumnMetadata] = {
val numHashes = params.numFeatures
val numFeatures = allKeys.map(_.length).sum
val numFeatures = hashKeys.map(_.length).sum
val hashColumns =
if (isSharedHashSpace(params, Some(numFeatures))) {
(0 until numHashes).map { i =>
OpVectorColumnMetadata(
parentFeatureName = features.map(_.name),
parentFeatureType = features.map(_.typeName),
parentFeatureName = hashFeatures.map(_.name),
parentFeatureType = hashFeatures.map(_.typeName),
grouping = None,
indicatorValue = None
)
}.toArray
} else {
for {
(keys, f) <- allKeys.toArray.zip(features)
// Need to filter out empty key sequences since the hashFeatures only contain a map feature if one of their
// keys is to be hashed, but hashKeys contains a sequence per map (whether it's empty or not)
(keys, f) <- hashKeys.filter(_.nonEmpty).zip(hashFeatures)
key <- keys
i <- 0 until numHashes
} yield f.toColumnMetaData().copy(grouping = Option(key))
}
}.toArray

// All columns get null tracking or text length tracking, whether their contents are hashed or ignored
val allTextKeys = hashKeys.zip(ignoreKeys).map{ case(h, i) => h ++ i }
val allTextFeatures = hashFeatures ++ ignoreFeatures
val nullColumns = if (shouldTrackNulls) {
for {
(keys, f) <- allKeys.toArray.zip(features)
(keys, f) <- allTextKeys.toArray.zip(allTextFeatures)
key <- keys
} yield f.toColumnMetaData(isNull = true).copy(grouping = Option(key))
} else Array.empty[OpVectorColumnMetadata]

val lenColumns = if (shouldTrackLen) {
for {
(keys, f) <- allKeys.toArray.zip(features)
(keys, f) <- allTextKeys.toArray.zip(allTextFeatures)
key <- keys
} yield f.toColumnMetaData(descriptorValue = OpVectorColumnMetadata.TextLenString).copy(grouping = Option(key))
} else Array.empty[OpVectorColumnMetadata]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata}
import com.twitter.algebird.Monoid._
import com.twitter.algebird.Operators._
import com.twitter.algebird.Monoid
import com.twitter.algebird.{Monoid, Semigroup}
import com.twitter.algebird.macros.caseclass
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Dataset, Encoder}
Expand All @@ -63,7 +63,7 @@ class SmartTextMapVectorizer[T <: OPMap[String]]
with PivotParams with CleanTextFun with SaveOthersParams
with TrackNullsParam with MinSupportParam with TextTokenizerParams with TrackTextLenParam
with HashingVectorizerParams with MapHashingFun with OneHotFun with MapStringPivotHelper
with MapVectorizerFuns[String, OPMap[String]] with MaxCardinalityParams {
with MapVectorizerFuns[String, OPMap[String]] with MaxCardinalityParams with MinLengthStdDevParams {

private implicit val textMapStatsSeqEnc: Encoder[Array[TextMapStats]] = ExpressionEncoder[Array[TextMapStats]]()

Expand All @@ -73,7 +73,7 @@ class SmartTextMapVectorizer[T <: OPMap[String]]
): TextMapStats = {
val keyValueCounts = textMap.map{ case (k, v) =>
cleanTextFn(k, shouldCleanKeys) ->
TextStats(Map(cleanTextFn(v, shouldCleanValues) -> 1), Map(cleanTextFn(v, shouldCleanValues).length -> 1))
TextStats(Map(cleanTextFn(v, shouldCleanValues) -> 1L), Map(cleanTextFn(v, shouldCleanValues).length -> 1L))
}
TextMapStats(keyValueCounts)
}
Expand Down Expand Up @@ -104,40 +104,65 @@ class SmartTextMapVectorizer[T <: OPMap[String]]
)
} else Array.empty[OpVectorColumnMetadata]

val textColumns = if (args.textFeatureInfo.flatten.nonEmpty) {
val allTextFeatureInfo = args.hashFeatureInfo.zip(args.ignoreFeatureInfo).map{ case (h, i) => h ++ i }
val allTextColumns = if (allTextFeatureInfo.flatten.nonEmpty) {
val (mapFeatures, mapFeatureInfo) =
inN.toSeq.zip(args.textFeatureInfo).filter{ case (tf, featureInfoSeq) => featureInfoSeq.nonEmpty }.unzip
inN.toSeq.zip(allTextFeatureInfo).filter{ case (tf, featureInfoSeq) => featureInfoSeq.nonEmpty }.unzip
val allKeys = mapFeatureInfo.map(_.map(_.key))

// Careful when zipping sequences like hashKeys (length = number of maps, always) and
// hashFeatures (length <= number of maps, depending on which ones contain keys to hash)
val hashKeys = args.hashFeatureInfo.map(
_.filter(_.vectorizationMethod == TextVectorizationMethod.Hash).map(_.key)
)
val ignoreKeys = args.ignoreFeatureInfo.map(
_.filter(_.vectorizationMethod == TextVectorizationMethod.Ignore).map(_.key)
)

val hashFeatures = inN.toSeq.zip(args.hashFeatureInfo).filter {
case (tf, featureInfoSeq) => featureInfoSeq.nonEmpty
}.map(_._1)
val ignoreFeatures = inN.toSeq.zip(args.ignoreFeatureInfo).filter{
case (tf, featureInfoSeq) => featureInfoSeq.nonEmpty
}.map(_._1)

makeVectorColumnMetadata(
features = mapFeatures.toArray,
hashFeatures = hashFeatures.toArray,
ignoreFeatures = ignoreFeatures.toArray,
params = makeHashingParams(),
allKeys = allKeys,
hashKeys = hashKeys,
ignoreKeys = ignoreKeys,
shouldTrackNulls = args.shouldTrackNulls,
shouldTrackLen = $(trackTextLen)
)
} else Array.empty[OpVectorColumnMetadata]

val columns = categoricalColumns ++ textColumns
val columns = categoricalColumns ++ allTextColumns
OpVectorMetadata(getOutputFeatureName, columns, Transmogrifier.inputFeaturesToHistory(inN, stageName))
}

def makeSmartTextMapVectorizerModelArgs(aggregatedStats: Array[TextMapStats]): SmartTextMapVectorizerModelArgs = {
val maxCard = $(maxCardinality)
val minLenStdDev = $(minLengthStdDev)
val minSup = $(minSupport)
val shouldCleanKeys = $(cleanKeys)
val shouldCleanValues = $(cleanText)
val shouldTrackNulls = $(trackNulls)

val allFeatureInfo = aggregatedStats.toSeq.map { textMapStats =>
textMapStats.keyValueCounts.toSeq.map { case (k, textStats) =>
val isCat = textStats.valueCounts.size <= maxCard
val topVals = if (isCat) {
val vecMethod: TextVectorizationMethod = textStats match {
case _ if textStats.valueCounts.size <= maxCard => TextVectorizationMethod.Pivot
case _ if textStats.lengthStdDev < minLenStdDev => TextVectorizationMethod.Ignore
case _ => TextVectorizationMethod.Hash
}
val topVals = if (vecMethod == TextVectorizationMethod.Pivot) {
textStats.valueCounts
.filter { case (_, count) => count >= minSup }
.toSeq.sortBy(v => -v._2 -> v._1)
.take($(topK)).map(_._1).toArray
} else Array.empty[String]
SmartTextFeatureInfo(key = k, isCategorical = isCat, topValues = topVals)
SmartTextFeatureInfo(key = k, vectorizationMethod = vecMethod, topValues = topVals)
}
}

Expand Down Expand Up @@ -197,11 +222,15 @@ private[op] object TextMapStats {
/**
* Info about each feature within a text map
*
* @param key name of a feature
* @param isCategorical indicate whether a feature is categorical or not
* @param topValues most common values of a feature (only for categoricals)
* @param key name of a feature
* @param vectorizationMethod method to use for text vectorization (either pivot, hashing, or ignoring)
* @param topValues most common values of a feature (only for categoricals)
*/
case class SmartTextFeatureInfo(key: String, isCategorical: Boolean, topValues: Array[String]) extends JsonLike
case class SmartTextFeatureInfo(
key: String,
vectorizationMethod: TextVectorizationMethod,
topValues: Array[String]
) extends JsonLike


/**
Expand All @@ -221,11 +250,22 @@ case class SmartTextMapVectorizerModelArgs
shouldTrackNulls: Boolean,
hashingParams: HashingFunctionParams
) extends JsonLike {
val (categoricalFeatureInfo, textFeatureInfo) = allFeatureInfo.map{ featureInfoSeq =>
featureInfoSeq.partition{_ .isCategorical }
}.unzip
val categoricalKeys = categoricalFeatureInfo.map(featureInfoSeq => featureInfoSeq.map(_.key))
val textKeys = textFeatureInfo.map(featureInfoSeq => featureInfoSeq.map(_.key))
// Partition allFeatureInfo into separate SmartTextFeatureInfo sequences corresponding to each vectorization type
val (categoricalFeatureInfo, hashFeatureInfo, ignoreFeatureInfo) = allFeatureInfo.map{ featureInfoSeq =>
val groups = featureInfoSeq.groupBy(_.vectorizationMethod)
val catGroup = groups.getOrElse(TextVectorizationMethod.Pivot, Seq.empty)
val hashGroup = groups.getOrElse(TextVectorizationMethod.Hash, Seq.empty)
val ignoreGroup = groups.getOrElse(TextVectorizationMethod.Ignore, Seq.empty)
(catGroup, hashGroup, ignoreGroup)
}.unzip3

// Seq[Seq[String]] corresponding to the keys in each map that are treated with each vectorization type
val categoricalKeys = categoricalFeatureInfo.map(_.map(_.key))
val hashKeys = hashFeatureInfo.map(_.map(_.key))
val ignoreKeys = ignoreFeatureInfo.map(_.map(_.key))

// Combined keys for hashed and ignored features (everything that's not pivoted)
val textKeys = hashKeys.zip(ignoreKeys).map{ case (hk, ik) => hk ++ ik }
}


Expand All @@ -240,40 +280,73 @@ final class SmartTextMapVectorizerModel[T <: OPMap[String]] private[op]
with MapHashingFun
with TextMapPivotVectorizerModelFun[OPMap[String]] {

/**
* Storage for results of row partitioning
*
* @param categoricalMaps Sequence of maps that have at least one key that should be treated as a categorical
* @param categoricalKeys Sequence containing keys for each map that correspond to categorical features
* @param hashMaps Sequence of maps that have at least one key that should be hashed
* @param hashKeys Sequence containing keys for each map that correspond to hashed features
* @param ignoreMaps Sequence of maps that have at least one key that should be ignored
* @param ignoreKeys Sequence containing keys for each map that correspond to ignored features
*/
case class PartitionResult(
categoricalMaps: Seq[OPMap[String]],
categoricalKeys: Seq[Seq[String]],
hashMaps: Seq[OPMap[String]],
hashKeys: Seq[Seq[String]],
ignoreMaps: Seq[OPMap[String]],
ignoreKeys: Seq[Seq[String]]
)

private val categoricalPivotFn = pivotFn(
topValues = args.categoricalFeatureInfo.filter(_.nonEmpty).map(_.map(info => info.key -> info.topValues)),
shouldCleanKeys = args.shouldCleanKeys,
shouldCleanValues = args.shouldCleanValues,
shouldTrackNulls = args.shouldTrackNulls
)

private def partitionRow(row: Seq[OPMap[String]]):
(Seq[OPMap[String]], Seq[Seq[String]], Seq[OPMap[String]], Seq[Seq[String]]) = {
private def partitionRow(row: Seq[OPMap[String]]): PartitionResult = {
val (rowCategorical, keysCategorical) =
row.view.zip(args.categoricalKeys).collect { case (elements, keys) if keys.nonEmpty =>
val filtered = elements.value.filter { case (k, v) => keys.contains(k) }
(TextMap(filtered), keys)
}.unzip

val (rowText, keysText) =
row.view.zip(args.textKeys).collect { case (elements, keys) if keys.nonEmpty =>
val (rowHashedText, keysHashedText) =
row.view.zip(args.hashKeys).collect { case (elements, keys) if keys.nonEmpty =>
val filtered = elements.value.filter { case (k, v) => keys.contains(k) }
(TextMap(filtered), keys)
}.unzip

(rowCategorical.toList, keysCategorical.toList, rowText.toList, keysText.toList)
val (rowIgnoredText, keysIgnoredText) =
row.view.zip(args.ignoreKeys).collect { case (elements, keys) if keys.nonEmpty =>
val filtered = elements.value.filter { case (k, v) => keys.contains(k) }
(TextMap(filtered), keys)
}.unzip

PartitionResult(rowCategorical.toList, keysCategorical.toList, rowHashedText.toList, keysHashedText.toList,
rowIgnoredText.toList, keysIgnoredText.toList)
}

def transformFn: Seq[T] => OPVector = row => {
val (rowCategorical, keysCategorical, rowText, keysText) = partitionRow(row)
implicit val textListMonoid: Monoid[TextList] = TextList.monoid

val PartitionResult(rowCategorical, keysCategorical, rowHash, keysHash, rowIgnore, keysIgnore) = partitionRow(row)
val keysText = keysHash + keysIgnore // Go algebird!
val categoricalVector = categoricalPivotFn(rowCategorical)
val rowTextTokenized = rowText.map(_.value.map { case (k, v) => k -> tokenize(v.toText).tokens })
val textVector = hash(rowTextTokenized, keysText, args.hashingParams)

val rowHashTokenized = rowHash.map(_.value.map { case (k, v) => k -> tokenize(v.toText).tokens })
val rowIgnoreTokenized = rowIgnore.map(_.value.map { case (k, v) => k -> tokenize(v.toText).tokens })
val rowTextTokenized = rowHashTokenized + rowIgnoreTokenized // Go go algebird!
val hashVector = hash(rowHashTokenized, keysHash, args.hashingParams)

// All columns get null tracking or text length tracking, whether their contents are hashed or ignored
val textNullIndicatorsVector =
if (args.shouldTrackNulls) getNullIndicatorsVector(keysText, rowTextTokenized) else OPVector.empty
val textLenVector = if ($(trackTextLen)) getLenVector(keysText, rowTextTokenized) else OPVector.empty

categoricalVector.combine(textVector, textLenVector, textNullIndicatorsVector)
categoricalVector.combine(hashVector, textLenVector, textNullIndicatorsVector)
}

private def getNullIndicatorsVector(keysSeq: Seq[Seq[String]], inputs: Seq[Map[String, TextList]]): OPVector = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SmartTextVectorizer[T <: Text](uid: String = UID[SmartTextVectorizer[T]])(
val (vectorizationMethods, topValues) = aggregatedStats.map { stats =>
val vecMethod: TextVectorizationMethod = stats match {
case _ if stats.valueCounts.size <= maxCard => TextVectorizationMethod.Pivot
case _ if stats.lengthStdDev <= minLenStdDev => TextVectorizationMethod.Ignore
case _ if stats.lengthStdDev < minLenStdDev => TextVectorizationMethod.Ignore
case _ => TextVectorizationMethod.Hash
}
val topValues = stats.valueCounts
Expand Down Expand Up @@ -225,8 +225,6 @@ private[op] object TextStats {
* Arguments for [[SmartTextVectorizerModel]]
*
* @param vectorizationMethods method to use for text vectorization (either pivot, hashing, or ignoring)
* @param isCategorical is feature a categorical or not
* @param isIgnorable is a text feature that we think is ignorable? high cardinality + low length variance
* @param topValues top values to each feature
* @param shouldCleanText should clean text value
* @param shouldTrackNulls should track nulls
Expand Down
Loading

0 comments on commit b7e07e3

Please sign in to comment.