From aebe878f14adad0353fb7f686484dc0345e2847a Mon Sep 17 00:00:00 2001 From: shekhar gupta Date: Mon, 23 Jan 2017 14:06:24 -0800 Subject: [PATCH 1/4] Fixes the issue where Spark fetcher fails to fetch client mode applications. --- .../spark/fetchers/SparkRestClient.scala | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index 48adb9d78..d80e1d9ff 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -68,22 +68,16 @@ class SparkRestClient(sparkConf: SparkConf) { // Limit scope of async. async { - val lastAttemptId = applicationInfo.attempts.maxBy { _.startTime }.attemptId - lastAttemptId match { - case Some(attemptId) => { - val attemptTarget = appTarget.path(attemptId) - val futureJobDatas = async { getJobDatas(attemptTarget) } - val futureStageDatas = async { getStageDatas(attemptTarget) } - val futureExecutorSummaries = async { getExecutorSummaries(attemptTarget) } - SparkRestDerivedData( - applicationInfo, - await(futureJobDatas), - await(futureStageDatas), - await(futureExecutorSummaries) - ) - } - case None => throw new IllegalArgumentException("Spark REST API has no attempt information") - } + val attemptTarget = getAttemptTarget(applicationInfo, appTarget) + val futureJobDatas = async { getJobDatas(attemptTarget) } + val futureStageDatas = async { getStageDatas(attemptTarget) } + val futureExecutorSummaries = async { getExecutorSummaries(attemptTarget) } + SparkRestDerivedData( + applicationInfo, + await(futureJobDatas), + await(futureStageDatas), + await(futureExecutorSummaries) + ) } } @@ -98,6 +92,18 @@ class SparkRestClient(sparkConf: SparkConf) { } } + private def getAttemptTarget(applicationInfo: ApplicationInfo, appTarget: WebTarget): WebTarget = { + val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId + lastAttemptId match { + case Some(attemptId) => { + appTarget.path(attemptId) + } + case None => { + appTarget + } + } + } + private def getJobDatas(attemptTarget: WebTarget): Seq[JobData] = { val target = attemptTarget.path("jobs") try { From 0ba8ce162ad596857ee5cd77bc5241cc8fb4efca Mon Sep 17 00:00:00 2001 From: shekhar gupta Date: Mon, 23 Jan 2017 17:49:54 -0800 Subject: [PATCH 2/4] Removes pattern matching --- .../spark/fetchers/SparkRestClient.scala | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index d80e1d9ff..4a6112248 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -68,7 +68,8 @@ class SparkRestClient(sparkConf: SparkConf) { // Limit scope of async. async { - val attemptTarget = getAttemptTarget(applicationInfo, appTarget) + val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId + val attemptTarget = lastAttemptId.map(appTarget.path).getOrElse(appTarget) val futureJobDatas = async { getJobDatas(attemptTarget) } val futureStageDatas = async { getStageDatas(attemptTarget) } val futureExecutorSummaries = async { getExecutorSummaries(attemptTarget) } @@ -92,18 +93,6 @@ class SparkRestClient(sparkConf: SparkConf) { } } - private def getAttemptTarget(applicationInfo: ApplicationInfo, appTarget: WebTarget): WebTarget = { - val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId - lastAttemptId match { - case Some(attemptId) => { - appTarget.path(attemptId) - } - case None => { - appTarget - } - } - } - private def getJobDatas(attemptTarget: WebTarget): Seq[JobData] = { val target = attemptTarget.path("jobs") try { From 9aa7bd2478dde93f1b115fa5966702866349e5a1 Mon Sep 17 00:00:00 2001 From: shekhar gupta Date: Sun, 29 Jan 2017 23:10:02 -0800 Subject: [PATCH 3/4] Adds unit test --- .../spark/fetchers/SparkRestClientTest.scala | 95 ++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala index 7f325739d..1e4ee8e4a 100644 --- a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala +++ b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala @@ -45,7 +45,7 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers { an[IllegalArgumentException] should be thrownBy(new SparkRestClient(new SparkConf())) } - it("returns the desired data from the Spark REST API") { + it("returns the desired data from the Spark REST API for cluster mode") { import ExecutionContext.Implicits.global val fakeJerseyServer = new FakeJerseyServer() { override def configure(): Application = super.configure() match { @@ -78,6 +78,40 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers { assertion } } + + it("returns the desired data from the Spark REST API for client mode") { + import ExecutionContext.Implicits.global + val fakeJerseyServer = new FakeJerseyServer() { + override def configure(): Application = super.configure() match { + case resourceConfig: ResourceConfig => + resourceConfig + .register(classOf[FetchClientModeDataFixtures.ApiResource]) + .register(classOf[FetchClientModeDataFixtures.ApplicationResource]) + .register(classOf[FetchClientModeDataFixtures.JobsResource]) + .register(classOf[FetchClientModeDataFixtures.StagesResource]) + .register(classOf[FetchClientModeDataFixtures.ExecutorsResource]) + case config => config + } + } + + fakeJerseyServer.setUp() + + val historyServerUri = fakeJerseyServer.target.getUri + + val sparkConf = new SparkConf().set("spark.yarn.historyServer.address", s"${historyServerUri.getHost}:${historyServerUri.getPort}") + val sparkRestClient = new SparkRestClient(sparkConf) + + sparkRestClient.fetchData(FetchDataFixtures.APP_ID) map { restDerivedData => + restDerivedData.applicationInfo.id should be(FetchDataFixtures.APP_ID) + restDerivedData.applicationInfo.name should be(FetchDataFixtures.APP_NAME) + restDerivedData.jobDatas should not be(None) + restDerivedData.stageDatas should not be(None) + restDerivedData.executorSummaries should not be(None) + } andThen { case assertion: Try[Assertion] => + fakeJerseyServer.tearDown() + assertion + } + } } } @@ -174,6 +208,65 @@ object SparkRestClientTest { } } + object FetchClientModeDataFixtures { + val APP_ID = "application_1" + val APP_NAME = "app" + + @Path("/api/v1") + class ApiResource { + @Path("applications/{appId}") + def getApplication(): ApplicationResource = new ApplicationResource() + + @Path("applications/{appId}/jobs") + def getJobs(): JobsResource = new JobsResource() + + @Path("applications/{appId}/stages") + def getStages(): StagesResource = new StagesResource() + + @Path("applications/{appId}/executors") + def getExecutors(): ExecutorsResource = new ExecutorsResource() + } + + @Produces(Array(MediaType.APPLICATION_JSON)) + class ApplicationResource { + @GET + def getApplication(@PathParam("appId") appId: String): ApplicationInfo = { + val t2 = System.currentTimeMillis + val t1 = t2 - 1 + val duration = 8000000L + new ApplicationInfo( + APP_ID, + APP_NAME, + Seq( + newFakeApplicationAttemptInfo(None, startTime = new Date(t2 - duration), endTime = new Date(t2)), + newFakeApplicationAttemptInfo(None, startTime = new Date(t1 - duration), endTime = new Date(t1)) + ) + ) + } + } + + @Produces(Array(MediaType.APPLICATION_JSON)) + class JobsResource { + @GET + def getJobs(@PathParam("appId") appId: String): Seq[JobData] = + Seq.empty + } + + @Produces(Array(MediaType.APPLICATION_JSON)) + class StagesResource { + @GET + def getStages(@PathParam("appId") appId: String): Seq[StageData] = + Seq.empty + } + + @Produces(Array(MediaType.APPLICATION_JSON)) + class ExecutorsResource { + @GET + def getExecutors(@PathParam("appId") appId: String): Seq[ExecutorSummary] = + Seq.empty + } + } + def newFakeApplicationAttemptInfo( attemptId: Option[String], startTime: Date, From f8c93105774af5107e346a35d8e9f0468ef0813b Mon Sep 17 00:00:00 2001 From: shekhar gupta Date: Sun, 29 Jan 2017 23:38:28 -0800 Subject: [PATCH 4/4] Adds unit test --- .../spark/fetchers/SparkRestClientTest.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala index 1e4ee8e4a..a346f9225 100644 --- a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala +++ b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala @@ -45,17 +45,17 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers { an[IllegalArgumentException] should be thrownBy(new SparkRestClient(new SparkConf())) } - it("returns the desired data from the Spark REST API for cluster mode") { + it("returns the desired data from the Spark REST API for cluster mode application") { import ExecutionContext.Implicits.global val fakeJerseyServer = new FakeJerseyServer() { override def configure(): Application = super.configure() match { case resourceConfig: ResourceConfig => resourceConfig - .register(classOf[FetchDataFixtures.ApiResource]) - .register(classOf[FetchDataFixtures.ApplicationResource]) - .register(classOf[FetchDataFixtures.JobsResource]) - .register(classOf[FetchDataFixtures.StagesResource]) - .register(classOf[FetchDataFixtures.ExecutorsResource]) + .register(classOf[FetchClusterModeDataFixtures.ApiResource]) + .register(classOf[FetchClusterModeDataFixtures.ApplicationResource]) + .register(classOf[FetchClusterModeDataFixtures.JobsResource]) + .register(classOf[FetchClusterModeDataFixtures.StagesResource]) + .register(classOf[FetchClusterModeDataFixtures.ExecutorsResource]) case config => config } } @@ -67,9 +67,9 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers { val sparkConf = new SparkConf().set("spark.yarn.historyServer.address", s"${historyServerUri.getHost}:${historyServerUri.getPort}") val sparkRestClient = new SparkRestClient(sparkConf) - sparkRestClient.fetchData(FetchDataFixtures.APP_ID) map { restDerivedData => - restDerivedData.applicationInfo.id should be(FetchDataFixtures.APP_ID) - restDerivedData.applicationInfo.name should be(FetchDataFixtures.APP_NAME) + sparkRestClient.fetchData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData => + restDerivedData.applicationInfo.id should be(FetchClusterModeDataFixtures.APP_ID) + restDerivedData.applicationInfo.name should be(FetchClusterModeDataFixtures.APP_NAME) restDerivedData.jobDatas should not be(None) restDerivedData.stageDatas should not be(None) restDerivedData.executorSummaries should not be(None) @@ -79,7 +79,7 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers { } } - it("returns the desired data from the Spark REST API for client mode") { + it("returns the desired data from the Spark REST API for client mode application") { import ExecutionContext.Implicits.global val fakeJerseyServer = new FakeJerseyServer() { override def configure(): Application = super.configure() match { @@ -101,9 +101,9 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers { val sparkConf = new SparkConf().set("spark.yarn.historyServer.address", s"${historyServerUri.getHost}:${historyServerUri.getPort}") val sparkRestClient = new SparkRestClient(sparkConf) - sparkRestClient.fetchData(FetchDataFixtures.APP_ID) map { restDerivedData => - restDerivedData.applicationInfo.id should be(FetchDataFixtures.APP_ID) - restDerivedData.applicationInfo.name should be(FetchDataFixtures.APP_NAME) + sparkRestClient.fetchData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData => + restDerivedData.applicationInfo.id should be(FetchClusterModeDataFixtures.APP_ID) + restDerivedData.applicationInfo.name should be(FetchClusterModeDataFixtures.APP_NAME) restDerivedData.jobDatas should not be(None) restDerivedData.stageDatas should not be(None) restDerivedData.executorSummaries should not be(None) @@ -149,7 +149,7 @@ object SparkRestClientTest { override def getContext(cls: Class[_]): ObjectMapper = objectMapper } - object FetchDataFixtures { + object FetchClusterModeDataFixtures { val APP_ID = "application_1" val APP_NAME = "app"