diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala index ae0d21563358f..08cc581d31c38 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala @@ -30,9 +30,8 @@ private[v1] class AllJobsResource(uiRoot: UIRoot) { @GET def jobsList( - @PathParam("appId") appId: String, - @QueryParam("status") statuses: JList[JobExecutionStatus] - ): Seq[JobData] = { + @PathParam("appId") appId: String, + @QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = { uiRoot.withSparkUI(appId) { ui => val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] = AllJobsResource.getStatusToJobs(ui) @@ -69,10 +68,9 @@ private[v1] object AllJobsResource { } def convertJobData( - job: JobUIData, - listener: JobProgressListener, - includeStageDetails: Boolean - ): JobData = { + job: JobUIData, + listener: JobProgressListener, + includeStageDetails: Boolean): JobData = { listener.synchronized { val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max) val lastStageData = lastStageInfo.flatMap { s => diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala index 4920c4d7b0fbf..11873215e8c87 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala @@ -26,9 +26,7 @@ import org.apache.spark.ui.storage.StorageListener private[v1] class AllRDDResource(uiRoot: UIRoot) { @GET - def jobsList( - @PathParam("appId") appId: String - ): Seq[RDDStorageInfo] = { + def jobsList(@PathParam("appId") appId: String): Seq[RDDStorageInfo] = { uiRoot.withSparkUI(appId) { ui => val storageStatusList = ui.storageListener.storageStatusList val rddInfos = ui.storageListener.rddInfoList @@ -55,11 +53,10 @@ private[spark] object AllRDDResource { } def getRDDStorageInfo( - rddId: Int, - rddInfo: RDDInfo, - storageStatusList: Seq[StorageStatus], - includeDetails: Boolean - ): RDDStorageInfo = { + rddId: Int, + rddInfo: RDDInfo, + storageStatusList: Seq[StorageStatus], + includeDetails: Boolean): RDDStorageInfo = { val workers = storageStatusList.map { (rddId, _) } val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) val blocks = storageStatusList diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 8341554434209..edfa7f1b1b86a 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -22,7 +22,6 @@ import javax.ws.rs.core.MediaType import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics} import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo} -import org.apache.spark.status.api._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData} import org.apache.spark.util.Distribution @@ -32,8 +31,8 @@ private[v1] class AllStagesResource(uiRoot: UIRoot) { @GET def stageList( - @PathParam("appId") appId: String, - @QueryParam("status") statuses: JList[StageStatus] + @PathParam("appId") appId: String, + @QueryParam("status") statuses: JList[StageStatus] ): Seq[StageData] = { uiRoot.withSparkUI(appId) { ui => val listener = ui.jobProgressListener @@ -60,11 +59,10 @@ private[v1] class AllStagesResource(uiRoot: UIRoot) { private[v1] object AllStagesResource { def stageUiToStageData( - status: StageStatus, - stageInfo: StageInfo, - stageUiData: StageUIData, - includeDetails: Boolean - ): StageData = { + status: StageStatus, + stageInfo: StageInfo, + stageUiData: StageUIData, + includeDetails: Boolean): StageData = { val taskData = if (includeDetails) { Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index d7db4b19292b2..0e05bd7c27395 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -28,10 +28,10 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) { @GET def appList( - @QueryParam("status") status: JList[ApplicationStatus], - @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: SimpleDateParam, - @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam - ): Iterator[ApplicationInfo] = { + @QueryParam("status") status: JList[ApplicationStatus], + @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: SimpleDateParam, + @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam) + : Iterator[ApplicationInfo] = { val allApps = uiRoot.getApplicationInfoList val adjStatus = { if (status.isEmpty) { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 442ee9ebbb525..ffc15812794d0 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -25,9 +25,7 @@ import org.apache.spark.ui.exec.ExecutorsPage private[v1] class ExecutorListResource(uiRoot: UIRoot) { @GET - def jobsList( - @PathParam("appId") appId: String - ): Seq[ExecutorSummary] = { + def jobsList(@PathParam("appId") appId: String): Seq[ExecutorSummary] = { uiRoot.withSparkUI(appId) { ui => val listener = ui.executorsListener val storageStatusList = listener.storageStatusList diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala index 299092668df68..202a5191ad57d 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala @@ -52,21 +52,21 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{ mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat) override def isWriteable( - aClass: Class[_], - `type`: Type, - annotations: Array[Annotation], - mediaType: MediaType): Boolean = { - true + aClass: Class[_], + `type`: Type, + annotations: Array[Annotation], + mediaType: MediaType): Boolean = { + true } override def writeTo( - t: Object, - aClass: Class[_], - `type`: Type, - annotations: Array[Annotation], - mediaType: MediaType, - multivaluedMap: MultivaluedMap[String, AnyRef], - outputStream: OutputStream): Unit = { + t: Object, + aClass: Class[_], + `type`: Type, + annotations: Array[Annotation], + mediaType: MediaType, + multivaluedMap: MultivaluedMap[String, AnyRef], + outputStream: OutputStream): Unit = { t match { case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8")) case _ => mapper.writeValue(outputStream, t) @@ -74,11 +74,11 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{ } override def getSize( - t: Object, - aClass: Class[_], - `type`: Type, - annotations: Array[Annotation], - mediaType: MediaType): Long = { + t: Object, + aClass: Class[_], + `type`: Type, + annotations: Array[Annotation], + mediaType: MediaType): Long = { -1L } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala index 40e6f641a1df7..92ce8c0a4ad01 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala @@ -26,10 +26,7 @@ import org.apache.spark.ui.jobs.UIData.JobUIData private[v1] class OneJobResource(uiRoot: UIRoot) { @GET - def jobsList( - @PathParam("appId") appId: String, - @PathParam("jobId") jobId: Int - ): JobData = { + def jobsList(@PathParam("appId") appId: String, @PathParam("jobId") jobId: Int): JobData = { uiRoot.withSparkUI(appId) { ui => val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] = AllJobsResource.getStatusToJobs(ui) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala index c15a04f1b4793..656de3a7814c8 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala @@ -24,9 +24,8 @@ private[v1] class OneRDDResource(uiRoot: UIRoot) { @GET def rddData( - @PathParam("appId") appId: String, - @PathParam("rddId") rddId: Int - ): RDDStorageInfo = { + @PathParam("appId") appId: String, + @PathParam("rddId") rddId: Int): RDDStorageInfo = { uiRoot.withSparkUI(appId) { ui => AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse( throw new NotFoundException(s"no rdd found w/ id $rddId") diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala index 79011b3983214..78a60175352cc 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala @@ -32,9 +32,8 @@ private[v1] class OneStageResource(uiRoot: UIRoot) { @GET @Path("") def stageData( - @PathParam("appId") appId: String, - @PathParam("stageId") stageId: Int - ): Seq[StageData] = { + @PathParam("appId") appId: String, + @PathParam("stageId") stageId: Int): Seq[StageData] = { withStage(appId, stageId){ stageAttempts => stageAttempts.map { stage => AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, @@ -46,10 +45,9 @@ private[v1] class OneStageResource(uiRoot: UIRoot) { @GET @Path("/{attemptId: \\d+}") def oneAttemptData( - @PathParam("appId") appId: String, - @PathParam("stageId") stageId: Int, - @PathParam("attemptId") attemptId: Int - ): StageData = { + @PathParam("appId") appId: String, + @PathParam("stageId") stageId: Int, + @PathParam("attemptId") attemptId: Int): StageData = { withStageAttempt(appId, stageId, attemptId) { stage => AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, includeDetails = true) @@ -59,11 +57,11 @@ private[v1] class OneStageResource(uiRoot: UIRoot) { @GET @Path("/{attemptId: \\d+}/taskSummary") def stageData( - @PathParam("appId") appId: String, - @PathParam("stageId") stageId: Int, - @PathParam("attemptId") attemptId: Int, - @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String - ): TaskMetricDistributions = { + @PathParam("appId") appId: String, + @PathParam("stageId") stageId: Int, + @PathParam("attemptId") attemptId: Int, + @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String) + : TaskMetricDistributions = { withStageAttempt(appId, stageId, attemptId) { stage => val quantiles = quantileString.split(",").map { s => try { @@ -80,13 +78,12 @@ private[v1] class OneStageResource(uiRoot: UIRoot) { @GET @Path("/{attemptId: \\d+}/taskList") def taskList( - @PathParam("appId") appId: String, - @PathParam("stageId") stageId: Int, - @PathParam("attemptId") attemptId: Int, - @DefaultValue("0") @QueryParam("offset") offset: Int, - @DefaultValue("20") @QueryParam("length") length: Int, - @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting - ): Seq[TaskData] = { + @PathParam("appId") appId: String, + @PathParam("stageId") stageId: Int, + @PathParam("attemptId") attemptId: Int, + @DefaultValue("0") @QueryParam("offset") offset: Int, + @DefaultValue("20") @QueryParam("length") length: Int, + @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { withStageAttempt(appId, stageId, attemptId) { stage => val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq .sorted(OneStageResource.ordering(sortBy)) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 5705ac2fa9217..fc4b7364527fb 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -23,221 +23,202 @@ import scala.collection.Map import org.apache.spark.JobExecutionStatus class ApplicationInfo( - val id: String, - val name: String, - val startTime: Date, - val endTime: Date, - val sparkUser: String, - val completed: Boolean = false -) + val id: String, + val name: String, + val startTime: Date, + val endTime: Date, + val sparkUser: String, + val completed: Boolean = false) class ExecutorStageSummary( - val taskTime : Long, - val failedTasks : Int, - val succeededTasks : Int, - val inputBytes : Long, - val outputBytes : Long, - val shuffleRead : Long, - val shuffleWrite : Long, - val memoryBytesSpilled : Long, - val diskBytesSpilled : Long -) + val taskTime : Long, + val failedTasks : Int, + val succeededTasks : Int, + val inputBytes : Long, + val outputBytes : Long, + val shuffleRead : Long, + val shuffleWrite : Long, + val memoryBytesSpilled : Long, + val diskBytesSpilled : Long) class ExecutorSummary( - val id: String, - val hostPort: String, - val rddBlocks: Int, - val memoryUsed: Long, - val diskUsed: Long, - val activeTasks: Int, - val failedTasks: Int, - val completedTasks: Int, - val totalTasks: Int, - val totalDuration: Long, - val totalInputBytes: Long, - val totalShuffleRead: Long, - val totalShuffleWrite: Long, - val maxMemory: Long, - val executorLogs: Map[String, String] -) + val id: String, + val hostPort: String, + val rddBlocks: Int, + val memoryUsed: Long, + val diskUsed: Long, + val activeTasks: Int, + val failedTasks: Int, + val completedTasks: Int, + val totalTasks: Int, + val totalDuration: Long, + val totalInputBytes: Long, + val totalShuffleRead: Long, + val totalShuffleWrite: Long, + val maxMemory: Long, + val executorLogs: Map[String, String]) class JobData( - val jobId: Int, - val name: String, - val description: Option[String], - val submissionTime: Option[Date], - val completionTime: Option[Date], - val stageIds: Seq[Int], - val jobGroup: Option[String], - val status: JobExecutionStatus, - val numTasks: Int, - val numActiveTasks: Int, - val numCompletedTasks: Int, - val numSkippedTasks: Int, - val numFailedTasks: Int, - val numActiveStages: Int, - val numCompletedStages: Int, - val numSkippedStages: Int, - val numFailedStages: Int -) + val jobId: Int, + val name: String, + val description: Option[String], + val submissionTime: Option[Date], + val completionTime: Option[Date], + val stageIds: Seq[Int], + val jobGroup: Option[String], + val status: JobExecutionStatus, + val numTasks: Int, + val numActiveTasks: Int, + val numCompletedTasks: Int, + val numSkippedTasks: Int, + val numFailedTasks: Int, + val numActiveStages: Int, + val numCompletedStages: Int, + val numSkippedStages: Int, + val numFailedStages: Int) // Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage // page ... does anybody pay attention to it? class RDDStorageInfo( - val id: Int, - val name: String, - val numPartitions: Int, - val numCachedPartitions: Int, - val storageLevel: String, - val memoryUsed: Long, - val diskUsed: Long, - val dataDistribution: Option[Seq[RDDDataDistribution]], - val partitions: Option[Seq[RDDPartitionInfo]] -) + val id: Int, + val name: String, + val numPartitions: Int, + val numCachedPartitions: Int, + val storageLevel: String, + val memoryUsed: Long, + val diskUsed: Long, + val dataDistribution: Option[Seq[RDDDataDistribution]], + val partitions: Option[Seq[RDDPartitionInfo]]) class RDDDataDistribution( - val address: String, - val memoryUsed: Long, - val memoryRemaining: Long, - val diskUsed: Long -) + val address: String, + val memoryUsed: Long, + val memoryRemaining: Long, + val diskUsed: Long) class RDDPartitionInfo( - val blockName: String, - val storageLevel: String, - val memoryUsed: Long, - val diskUsed: Long, - val executors: Seq[String] -) + val blockName: String, + val storageLevel: String, + val memoryUsed: Long, + val diskUsed: Long, + val executors: Seq[String]) class StageData( - val status: StageStatus, - val stageId: Int, - val attemptId: Int, - val numActiveTasks: Int , - val numCompleteTasks: Int, - val numFailedTasks: Int, - - val executorRunTime: Long, - - val inputBytes: Long, - val inputRecords: Long, - val outputBytes: Long, - val outputRecords: Long, - val shuffleReadBytes: Long, - val shuffleReadRecords: Long, - val shuffleWriteBytes: Long, - val shuffleWriteRecords: Long, - val memoryBytesSpilled: Long, - val diskBytesSpilled: Long, - - val name: String, - val details: String, - val schedulingPool: String, - - val accumulatorUpdates: Seq[AccumulableInfo], - val tasks: Option[Map[Long, TaskData]], - val executorSummary:Option[Map[String,ExecutorStageSummary]] -) + val status: StageStatus, + val stageId: Int, + val attemptId: Int, + val numActiveTasks: Int , + val numCompleteTasks: Int, + val numFailedTasks: Int, + + val executorRunTime: Long, + + val inputBytes: Long, + val inputRecords: Long, + val outputBytes: Long, + val outputRecords: Long, + val shuffleReadBytes: Long, + val shuffleReadRecords: Long, + val shuffleWriteBytes: Long, + val shuffleWriteRecords: Long, + val memoryBytesSpilled: Long, + val diskBytesSpilled: Long, + + val name: String, + val details: String, + val schedulingPool: String, + + val accumulatorUpdates: Seq[AccumulableInfo], + val tasks: Option[Map[Long, TaskData]], + val executorSummary:Option[Map[String,ExecutorStageSummary]]) class TaskData( - val taskId: Long, - val index: Int, - val attempt: Int, - val launchTime: Date, - val executorId: String, - val host: String, - val taskLocality: String, - val speculative: Boolean, - val accumulatorUpdates: Seq[AccumulableInfo], - val errorMessage: Option[String] = None, - val taskMetrics: Option[TaskMetrics] = None -) + val taskId: Long, + val index: Int, + val attempt: Int, + val launchTime: Date, + val executorId: String, + val host: String, + val taskLocality: String, + val speculative: Boolean, + val accumulatorUpdates: Seq[AccumulableInfo], + val errorMessage: Option[String] = None, + val taskMetrics: Option[TaskMetrics] = None) class TaskMetrics( - val executorDeserializeTime: Long, - val executorRunTime: Long, - val resultSize: Long, - val jvmGcTime: Long, - val resultSerializationTime: Long, - val memoryBytesSpilled: Long, - val diskBytesSpilled: Long, - val inputMetrics: Option[InputMetrics], - val outputMetrics: Option[OutputMetrics], - val shuffleReadMetrics: Option[ShuffleReadMetrics], - val shuffleWriteMetrics: Option[ShuffleWriteMetrics] -) + val executorDeserializeTime: Long, + val executorRunTime: Long, + val resultSize: Long, + val jvmGcTime: Long, + val resultSerializationTime: Long, + val memoryBytesSpilled: Long, + val diskBytesSpilled: Long, + val inputMetrics: Option[InputMetrics], + val outputMetrics: Option[OutputMetrics], + val shuffleReadMetrics: Option[ShuffleReadMetrics], + val shuffleWriteMetrics: Option[ShuffleWriteMetrics]) class InputMetrics( - val bytesRead: Long, - val recordsRead: Long -) + val bytesRead: Long, + val recordsRead: Long) class OutputMetrics( - val bytesWritten: Long, - val recordsWritten: Long -) + val bytesWritten: Long, + val recordsWritten: Long) class ShuffleReadMetrics( - val remoteBlocksFetched: Int, - val localBlocksFetched: Int, - val fetchWaitTime: Long, - val remoteBytesRead: Long, - val totalBlocksFetched: Int, - val recordsRead: Long -) + val remoteBlocksFetched: Int, + val localBlocksFetched: Int, + val fetchWaitTime: Long, + val remoteBytesRead: Long, + val totalBlocksFetched: Int, + val recordsRead: Long) class ShuffleWriteMetrics( - val bytesWritten: Long, - val writeTime: Long, - val recordsWritten: Long -) + val bytesWritten: Long, + val writeTime: Long, + val recordsWritten: Long) class TaskMetricDistributions( - val quantiles: IndexedSeq[Double], - - val executorDeserializeTime: IndexedSeq[Double], - val executorRunTime: IndexedSeq[Double], - val resultSize: IndexedSeq[Double], - val jvmGcTime: IndexedSeq[Double], - val resultSerializationTime: IndexedSeq[Double], - val memoryBytesSpilled: IndexedSeq[Double], - val diskBytesSpilled: IndexedSeq[Double], - - val inputMetrics: Option[InputMetricDistributions], - val outputMetrics: Option[OutputMetricDistributions], - val shuffleReadMetrics: Option[ShuffleReadMetricDistributions], - val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] -) + val quantiles: IndexedSeq[Double], + + val executorDeserializeTime: IndexedSeq[Double], + val executorRunTime: IndexedSeq[Double], + val resultSize: IndexedSeq[Double], + val jvmGcTime: IndexedSeq[Double], + val resultSerializationTime: IndexedSeq[Double], + val memoryBytesSpilled: IndexedSeq[Double], + val diskBytesSpilled: IndexedSeq[Double], + + val inputMetrics: Option[InputMetricDistributions], + val outputMetrics: Option[OutputMetricDistributions], + val shuffleReadMetrics: Option[ShuffleReadMetricDistributions], + val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions]) class InputMetricDistributions( - val bytesRead: IndexedSeq[Double], - val recordsRead: IndexedSeq[Double] -) + val bytesRead: IndexedSeq[Double], + val recordsRead: IndexedSeq[Double]) class OutputMetricDistributions( - val bytesWritten: IndexedSeq[Double], - val recordsWritten: IndexedSeq[Double] -) + val bytesWritten: IndexedSeq[Double], + val recordsWritten: IndexedSeq[Double]) class ShuffleReadMetricDistributions( - val readBytes: IndexedSeq[Double], - val readRecords: IndexedSeq[Double], - val remoteBlocksFetched: IndexedSeq[Double], - val localBlocksFetched: IndexedSeq[Double], - val fetchWaitTime: IndexedSeq[Double], - val remoteBytesRead: IndexedSeq[Double], - val totalBlocksFetched: IndexedSeq[Double] -) + val readBytes: IndexedSeq[Double], + val readRecords: IndexedSeq[Double], + val remoteBlocksFetched: IndexedSeq[Double], + val localBlocksFetched: IndexedSeq[Double], + val fetchWaitTime: IndexedSeq[Double], + val remoteBytesRead: IndexedSeq[Double], + val totalBlocksFetched: IndexedSeq[Double]) class ShuffleWriteMetricDistributions( - val writeBytes: IndexedSeq[Double], - val writeRecords: IndexedSeq[Double], - val writeTime: IndexedSeq[Double] -) + val writeBytes: IndexedSeq[Double], + val writeRecords: IndexedSeq[Double], + val writeTime: IndexedSeq[Double]) class AccumulableInfo ( - val id: Long, - val name: String, - val update: Option[String], - val value: String) + val id: Long, + val name: String, + val update: Option[String], + val value: String) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index e9eb1f11a41b2..ec711480ebf30 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -26,7 +26,7 @@ import org.apache.spark.scheduler._ * :: DeveloperApi :: * A SparkListener that maintains executor storage status. * - * Unlike JobProgressListener, this class is thread-safe, so users do not need to synchronize + * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi class StorageStatusListener extends SparkListener { diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 490955fcfa6a3..0351749700962 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -36,7 +36,7 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag * :: DeveloperApi :: * A SparkListener that prepares information to be displayed on the BlockManagerUI. * - * Unlike JobProgressListener, this class is thread-safe, so users do not need to synchronize + * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {