Skip to content

Commit

Permalink
add the support for the sparklens in spark-3.0.0 and later version of…
Browse files Browse the repository at this point in the history
… spark with scala-2.12

  Author:    SaurabhChawla
  Date:      Fri Mar 19 22:37:13 2021 +0530
  Committer: Saurabh Chawla <[email protected]>
  • Loading branch information
SaurabhChawla100 authored and Saurabh Chawla committed Mar 19, 2021
1 parent 8c17881 commit 479468d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 16 deletions.
25 changes: 20 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,31 @@ Note: Apart from the console based report, you can also get an UI based report s
`--conf spark.sparklens.report.email=<email>` along with other relevant confs mentioned below.
This functionality is available in Sparklens 0.3.2 and above.

Use the following arguments to `spark-submit` or `spark-shell`:
Use the following arguments to `spark-submit` or `spark-shell` for spark-3.0.0 and latest version of spark:
```
--packages qubole:sparklens:0.4.0-s_2.12
--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener
```

Use the following arguments to `spark-submit` or `spark-shell` for spark-2.4.x and lower version of spark:
```
--packages qubole:sparklens:0.3.2-s_2.11
--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener
```


#### 2. Run from Sparklens offline data ####

You can choose not to run sparklens inside the app, but at a later time. Run your app as above
with additional configuration parameters:
with additional configuration parameters
For spark-3.0.0 and latest version of spark:
```
--packages qubole:sparklens:0.4.0-s_2.12
--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener
--conf spark.sparklens.reporting.disabled=true
```

For spark-2.4.x and lower version of spark:
```
--packages qubole:sparklens:0.3.2-s_2.11
--conf spark.extraListeners=com.qubole.sparklens.QuboleJobListener
Expand All @@ -111,7 +126,7 @@ with additional configuration parameters:
This will not run reporting, but instead create a Sparklens JSON file for the application which is
stored in the **spark.sparklens.data.dir** directory (by default, **/tmp/sparklens/**). Note that this will be stored on HDFS by default. To save this file to s3, please set **spark.sparklens.data.dir** to s3 path. This data file can now be used to run Sparklens reporting independently, using `spark-submit` command as follows:

`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg <filename>`
`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg <filename>`

`<filename>` should be replaced by the full path of sparklens json file. If the file is on s3 use the full s3 path. For files on local file system, use file:// prefix with the local file location. HDFS is supported as well.

Expand All @@ -124,11 +139,11 @@ running via `sparklens-json-file` above) with another option specifying that is
event history file. This file can be in any of the formats the event history files supports, i.e. **text, snappy, lz4
or lzf**. Note the extra `source=history` parameter in this example:

`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg <filename> source=history`
`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.ReporterApp qubole-dummy-arg <filename> source=history`

It is also possible to convert an event history file to a Sparklens json file using the following command:

`./bin/spark-submit --packages qubole:sparklens:0.3.2-s_2.11 --class com.qubole.sparklens.app.EventHistoryToSparklensJson qubole-dummy-arg <srcDir> <targetDir>`
`./bin/spark-submit --packages qubole:sparklens:0.4.0-s_2.12 --class com.qubole.sparklens.app.EventHistoryToSparklensJson qubole-dummy-arg <srcDir> <targetDir>`

EventHistoryToSparklensJson is designed to work on local file system only. Please make sure that the source and target directories are on local file system.

Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
name := "sparklens"
organization := "com.qubole"

scalaVersion := "2.11.8"
scalaVersion := "2.12.10"

crossScalaVersions := Seq("2.10.6", "2.11.8")
crossScalaVersions := Seq("2.10.6", "2.11.12", "2.12.10")

spName := "qubole/sparklens"

sparkVersion := "2.0.0"
sparkVersion := "3.0.1"

spAppendScalaVersion := true

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/qubole/sparklens/QuboleJobListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
if (stageCompleted.stageInfo.failureReason.isDefined) {
//stage failed
val si = stageCompleted.stageInfo
failedStages += s""" Stage ${si.stageId} attempt ${si.attemptId} in job ${stageIDToJobID(si.stageId)} failed.
// attempt-id is deprecated and attemptNumber is used to get attempt-id from spark-3.0.0
failedStages += s""" Stage ${si.stageId} attempt ${si.attemptNumber} in job ${stageIDToJobID(si.stageId)} failed.
Stage tasks: ${si.numTasks}
"""
stageTimeSpan.finalUpdate()
Expand Down
17 changes: 11 additions & 6 deletions src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ import org.apache.spark.deploy.SparkHadoopUtil
object HDFSConfigHelper {

def getHadoopConf(sparkConfOptional:Option[SparkConf]): Configuration = {
if (sparkConfOptional.isDefined) {
SparkHadoopUtil.get.newConfiguration(sparkConfOptional.get)
}else {
val sparkConf = new SparkConf()
SparkHadoopUtil.get.newConfiguration(sparkConf)
}
// After Spark 3.0.0 SparkHadoopUtil is made private to make it work only within the spark
// using reflection code here to access the newConfiguration method of the SparkHadoopUtil
val sparkHadoopUtilClass = Class.forName("org.apache.spark.deploy.SparkHadoopUtil")
val sparkHadoopUtil = sparkHadoopUtilClass.newInstance()
val newConfigurationMethod = sparkHadoopUtilClass.getMethod("newConfiguration", classOf[SparkConf])
if (sparkConfOptional.isDefined) {
newConfigurationMethod.invoke(sparkHadoopUtil, sparkConfOptional.get).asInstanceOf[Configuration]
} else {
val sparkConf = new SparkConf()
newConfigurationMethod.invoke(sparkHadoopUtil, sparkConf).asInstanceOf[Configuration]
}
}
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.3.2"
version in ThisBuild := "0.4.0"

0 comments on commit 479468d

Please sign in to comment.