Skip to content

Commit

Permalink
[SPARK-33841][CORE] Fix issue with jobs disappearing intermittently f…
Browse files Browse the repository at this point in the history
…rom the SHS under high load

### What changes were proposed in this pull request?

Mark SHS event log entries that were `processing` at the beginning of the `checkForLogs` run as not stale and check for this mark before deleting an event log. This fixes the issue when a particular job was displayed in the SHS and disappeared after some time, but then, in several minutes showed up again.

### Why are the changes needed?

The issue is caused by [SPARK-29043](https://issues.apache.org/jira/browse/SPARK-29043), which is designated to improve the concurrent performance of the History Server. The [change](https://github.com/apache/spark/pull/25797/files#) breaks the ["app deletion" logic](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R563) because of missing proper synchronization for `processing` event log entries. Since SHS now [filters out](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R462) all `processing` event log entries, such entries do not have a chance to be [updated with the new `lastProcessed`](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R472) time and thus any entity that completes processing right after [filtering](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R462) and before [the check for stale entities](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R560) will be identified as stale and will be deleted from the UI until the next `checkForLogs` run. This is because [updated `lastProcessed` time is used as criteria](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R557), and event log entries that missed to be updated with a new time, will match that criteria.

The issue can be reproduced by generating a big number of event logs and uploading them to the SHS event log directory on S3. Essentially, around 236(26.7 MB) copies of an event log directory were created using [shs-monitor](https://github.com/vladhlinsky/shs-monitor/tree/spark-master) script. Strange behavior of SHS counting the total number of applications was noticed - at first, the number was increasing as expected, but with the next page refresh, the total number of applications decreased. No errors were logged by SHS.

58 entities are displayed at `17:35:35`:
![1-58-entries-at-17-35](https://user-images.githubusercontent.com/61428392/102648949-1129e400-4171-11eb-9463-ed1454a8f6b2.png)
25 entities are displayed at `17:36:40`:
![2-25-entries-at-17-36](https://user-images.githubusercontent.com/61428392/102648974-1c7d0f80-4171-11eb-95d8-78c2bb37a168.png)

### Does this PR introduce _any_ user-facing change?

Yes, SHS users won't face the behavior when the number of displayed applications decreases periodically.

### How was this patch tested?

Tested using [shs-monitor](https://github.com/vladhlinsky/shs-monitor/tree/spark-master) script:
* Build SHS with the proposed change
* Download Hadoop AWS and AWS Java SDK
* Prepare S3 bucket and user for programmatic access, grant required roles to the user. Get access key and secret key
* Configure SHS to read event logs from S3
* Start [monitor](https://github.com/vladhlinsky/shs-monitor/blob/spark-master/monitor.sh) script to query SHS API
* Run 5 [producers](https://github.com/vladhlinsky/shs-monitor/blob/spark-master/producer.sh) for ~5 mins, create 125(14.2 MB) event log directory copies
* Wait for SHS to load all the applications
* Verify that the number of loaded applications increases continuously over time

For more details, please refer to the [shs-monitor](https://github.com/vladhlinsky/shs-monitor/tree/spark-master) repository.
> This version of the reproduction uses event log directories instead of single files, since recent optimization
> [SPARK-33790](https://issues.apache.org/jira/browse/SPARK-33790) makes it hard to reproduce the issue with single event log files.

Closes #30845 from vladhlinsky/SPARK-33841.

Authored-by: Vlad Glinsky <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
vhlinskyi authored and dongjoon-hyun committed Dec 18, 2020
1 parent c17c76d commit 554600c
Showing 1 changed file with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newLastScanTime = clock.getTimeMillis()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")

// Mark entries that are processing as not stale. Such entries do not have a chance to be
// updated with the new 'lastProcessed' time and thus any entity that completes processing
// right after this check and before the check for stale entities will be identified as stale
// and will be deleted from the UI until the next 'checkForLogs' run.
val notStale = mutable.HashSet[String]()
val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry => isAccessible(entry.getPath) }
.filter { entry => !isProcessing(entry.getPath) }
.filter { entry =>
if (isProcessing(entry.getPath)) {
notStale.add(entry.getPath.toString())
false
} else {
true
}
}
.flatMap { entry => EventLogFileReader(fs, entry) }
.filter { reader =>
try {
Expand Down Expand Up @@ -576,12 +588,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.last(newLastScanTime - 1)
.asScala
.toList
stale.filterNot(isProcessing).foreach { log =>
log.appId.foreach { appId =>
cleanAppData(appId, log.attemptId, log.logPath)
listing.delete(classOf[LogInfo], log.logPath)
stale.filterNot(isProcessing)
.filterNot(info => notStale.contains(info.logPath))
.foreach { log =>
log.appId.foreach { appId =>
cleanAppData(appId, log.attemptId, log.logPath)
listing.delete(classOf[LogInfo], log.logPath)
}
}
}

lastScanTime.set(newLastScanTime)
} catch {
Expand Down

0 comments on commit 554600c

Please sign in to comment.