Skip to content

Commit

Permalink
Fix bug in handling multiple actions
Browse files Browse the repository at this point in the history
  • Loading branch information
manojlds committed Oct 27, 2017
1 parent 9ec9fe0 commit c3eb35f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
5 changes: 3 additions & 2 deletions src/main/scala/sparkplug/SparkPlug.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ case class SparkPlug(

def plug(in: DataFrame, rules: List[PlugRule])
: Either[List[PlugRuleValidationError], DataFrame] = {

val validationResult = Option(isValidateRulesEnabled)
.filter(identity)
.map(_ => validate(in.schema, rules))
Expand Down Expand Up @@ -161,12 +162,12 @@ case class SparkPlugBuilder(
def enableRulesValidation = copy(isValidateRulesEnabled = true)
def enableCheckpointing(checkpointDir: String,
rulesPerStage: Int,
numberOfParitions: Int) =
numberOfPartitions: Int) =
copy(
checkpointDetails = Some(
SparkPlugCheckpointDetails(checkpointDir,
rulesPerStage,
numberOfParitions)))
numberOfPartitions)))

def enableAccumulators =
copy(isAccumulatorsEnabled = true, isPlugDetailsEnabled = true)
Expand Down
22 changes: 15 additions & 7 deletions src/main/scala/sparkplug/models/PlugRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,20 @@ case class PlugRule(name: String,
plugDetailsColumn: String) = {
val notEqualsBuilder = new ListBuffer[String]
val builder = new StringBuilder
convertedActions(schema).foldLeft(builder)((builder, action) => {
val actionKey = action.key
val actionValue = action.value
notEqualsBuilder.append(s"not($actionKey <=> $actionValue)")
updateField(schema, builder, actionKey, actionValue)
})

val convertedActions = convertActions(schema)
convertedActions.zipWithIndex.foldLeft(builder) {
case (b, (action, i)) =>
val actionKey = action.key
val actionValue = action.value
notEqualsBuilder.append(s"not($actionKey <=> $actionValue)")
val builderWithField = updateField(schema, b, actionKey, actionValue)
if (i < convertedActions.length - 1) {
builderWithField.append(",")
} else {
builderWithField
}
}

if (addPlugDetails) {
val notEqualCondition = s"(${notEqualsBuilder.mkString(" or ")})"
Expand Down Expand Up @@ -134,7 +142,7 @@ case class PlugRule(name: String,
}
}

private def convertedActions(schema: StructType) = {
private def convertActions(schema: StructType) = {
val fields = buildFieldsMap(schema).toMap
actions
.map(
Expand Down
13 changes: 9 additions & 4 deletions src/test/scala/sparkplug/SparkPlugSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ trait SpecAccumulatorsSparkListener extends ScalaFutures {
info.name
.filter(_.startsWith(accumulatorsNamespace))
.foreach((s: String) => {
accumulators = accumulators ++ Map(s -> info.value.get.asInstanceOf[Long])
accumulators = accumulators ++ Map(
s -> info.value.get.asInstanceOf[Long])
})
}
}
Expand Down Expand Up @@ -168,11 +169,13 @@ class SparkPlugSpec
PlugRule("rule1",
"version1",
"title like '%iPhone%'",
Seq(PlugAction("price", "1000"))),
Seq(PlugAction("price", "1000"),
PlugAction("title", "Apple iPhone"))),
PlugRule("rule2",
"version1",
"title like '%Galaxy%'",
Seq(PlugAction("price", "700")))
Seq(PlugAction("price", "700"),
PlugAction("title", "Apple iPhone")))
)

import spark.implicits._
Expand Down Expand Up @@ -302,7 +305,8 @@ class SparkPlugSpec
PlugRule("rule1",
"version1",
"title like '%iPhone%'",
Seq(PlugAction("price.minPrice", "1000.0"))),
Seq(PlugAction("price.minPrice", "1000.0"),
PlugAction("title", "Apple iPhone"))),
PlugRule("rule2",
"version1",
"title like '%Galaxy%'",
Expand All @@ -318,6 +322,7 @@ class SparkPlugSpec
sparkPlug.plug(df, rules).right.get.as[TestRowWithStruct].collect()
output.length should be(3)
output(0).price.get.minPrice should be(1000.0)
output(0).title should be("Apple iPhone")
output(1).price.get.availability should be("available")
output(2).price.isDefined should be(false)
}
Expand Down

0 comments on commit c3eb35f

Please sign in to comment.