Skip to content
This repository was archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Fix #671
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajroark committed Jul 2, 2016
1 parent 7422550 commit b502b7b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,6 @@ class TopologyTests extends WordSpec {
assert(TDistMap(1).get_common.get_parallelism_hint == 50)
}

"With same setting on multiple names we use the one for the node" in {
val fmNodeName = "flatMapper"
val smNodeName = "summer"
val p = Storm.source(TraversableSpout(sample[List[Int]]))
.flatMap(testFn).name(fmNodeName)
.sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName)

val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)),
smNodeName -> Options().set(SummerParallelism(20)))
val storm = Storm.local(opts)
val stormTopo = storm.plan(p).topology
val bolts = stormTopo.get_bolts

// Tail should use parallelism specified for the summer node
assert(bolts("Tail").get_common.get_parallelism_hint == 20)
}

"If the closes doesnt contain the option we keep going" in {
val nodeName = "super dooper node"
val otherNodeName = "super dooper node"
Expand Down Expand Up @@ -199,4 +182,22 @@ class TopologyTests extends WordSpec {

assert(TDistMap(0).get_common.get_parallelism_hint == 5)
}

"With same setting on multiple names we use the one for the node" in {
val fmNodeName = "flatMapper"
val smNodeName = "summer"
val p = Storm.source(TraversableSpout(sample[List[Int]]))
.flatMap(testFn).name(fmNodeName)
.sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName)

val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)),
smNodeName -> Options().set(SummerParallelism(20)))
val storm = Storm.local(opts)
val stormTopo = storm.plan(p).topology
val bolts = stormTopo.get_bolts

// Tail should use parallelism specified for the summer node
assert(bolts("Tail").get_common.get_parallelism_hint == 20)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
private type Prod[T] = Producer[Storm, T]

private[storm] def get[T <: AnyRef: ClassTag](dag: Dag[Storm], node: StormNode): Option[(String, T)] = {
val producer = node.members.last
val producer = node.members.head
Options.getFirst[T](options, dag.producerToPriorityNames(producer))
}

Expand Down

0 comments on commit b502b7b

Please sign in to comment.