Skip to content

Commit

Permalink
Spark metrics aggregator fix (#237)
Browse files Browse the repository at this point in the history
* Fix SparkMetricsAggregator to not produce negative  ResourceUsage
  • Loading branch information
shankar37 authored and akshayrai committed Apr 18, 2017
1 parent c8a7009 commit b7e04ab
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 22 deletions.
47 changes: 29 additions & 18 deletions app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,36 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
executorMemoryBytes <- executorMemoryBytesOf(data)
} {
val applicationDurationMillis = applicationDurationMillisOf(data)
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)

val resourcesAllocatedForUse =
aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis)

val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
case false => 0.0
if( applicationDurationMillis < 0) {
logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}")
} else {
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)

val resourcesAllocatedForUse =
aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis)

val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
case false => 0.0
}
//allocated is the total used resource from the cluster.
if (resourcesAllocatedForUse.isValidLong) {
hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong)
} else {
logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue")
logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}")
logger.warn(s"executorInstances: ${executorInstances}")
logger.warn(s"executorMemoryBytes:${executorMemoryBytes}")
logger.warn(s"applicationDurationMillis:${applicationDurationMillis}")
logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}")
logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}")
logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}")
logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}")
}
hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong)
}
//allocated is the total used resource from the cluster.
if (resourcesAllocatedForUse.isValidLong) {
hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong)
} else {
logger.info(s"resourcesAllocatedForUse exceeds Long.MaxValue: ${resourcesAllocatedForUse }")
}

hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong)
}

private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
new ApplicationInfo(appId, name = "app", Seq(applicationAttemptInfo))
}

val executorSummaries = Seq(
newFakeExecutorSummary(id = "1", totalDuration = 1000000L),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L)
)
val restDerivedData = {
val executorSummaries = Seq(
newFakeExecutorSummary(id = "1", totalDuration = 1000000L),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L)
)
SparkRestDerivedData(
applicationInfo,
jobDatas = Seq.empty,
Expand Down Expand Up @@ -105,6 +105,31 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
it("doesn't calculate total delay") {
result.getTotalDelay should be(0L)
}
it("sets resourceused as 0 when duration is negative") {
//make the duration negative
val applicationInfo = {
val applicationAttemptInfo = {
val now = System.currentTimeMillis
val duration = -8000000L
newFakeApplicationAttemptInfo(Some("1"), startTime = new Date(now - duration), endTime = new Date(now))
}
new ApplicationInfo(appId, name = "app", Seq(applicationAttemptInfo))
}
val restDerivedData = SparkRestDerivedData(
applicationInfo,
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
)

val data = SparkApplicationData(appId, restDerivedData, Some(logDerivedData))

val aggregator = new SparkMetricsAggregator(aggregatorConfigurationData)
aggregator.aggregate(data)

val result = aggregator.getResult
result.getResourceUsed should be(0L)
}
}

describe("when it doesn't have log-derived data") {
Expand Down

0 comments on commit b7e04ab

Please sign in to comment.