Skip to content

Commit

Permalink
StorageListener & StorageStatusListener needs to synchronize internal…
Browse files Browse the repository at this point in the history
…ly to be thread-safe
  • Loading branch information
squito committed Apr 28, 2015
1 parent 31c79ce commit 52bbae8
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ private[v1] class AllRDDResource(uiRoot: UIRoot) {
@PathParam("appId") appId: String
): Seq[RDDStorageInfo] = {
uiRoot.withSparkUI(appId) { ui =>
// should all access on storageListener also be synchronized?
val storageStatusList = ui.storageListener.storageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ import org.apache.spark.scheduler._
/**
* :: DeveloperApi ::
* A SparkListener that maintains executor storage status.
*
* Unlike JobProgressListener, this class is thread-safe, so users do not need to synchronize
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()

def storageStatusList: Seq[StorageStatus] = executorIdToStorageStatus.values.toSeq
def storageStatusList: Seq[StorageStatus] = synchronized {
executorIdToStorageStatus.values.toSeq
}

/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag
/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
*
* Unlike JobProgressListener, this class is thread-safe, so users do not need to synchronize
*/
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
Expand All @@ -43,7 +45,9 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList

/** Filter RDD info to include only those with cached partitions */
def rddInfoList: Seq[RDDInfo] = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
def rddInfoList: Seq[RDDInfo] = synchronized {
_rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
}

/** Update the storage info of the RDDs whose blocks are among the given updated blocks */
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
Expand Down

0 comments on commit 52bbae8

Please sign in to comment.