From fc840dd7ecbc4a63a35aaf72c222bc24ad2ab55e Mon Sep 17 00:00:00 2001 From: Philipp Dallig Date: Tue, 19 Oct 2021 15:51:50 +0200 Subject: [PATCH] Close interpretergroup session if interpreter process crashed --- .../zeppelin/integration/ZeppelinSparkClusterTest.java | 2 ++ .../apache/zeppelin/interpreter/InterpreterSetting.java | 8 ++++++++ .../zeppelin/interpreter/ManagedInterpreterGroup.java | 9 +++++++++ 3 files changed, 19 insertions(+) diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index d3c087f313a..20e1b760cac 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -1141,6 +1141,8 @@ public void testFailtoLaunchSpark() throws IOException { // "Actual error message: " + p1.getReturn().message().get(0).getData()); // run it again, and get the same error + note.run(p.getId(), true); + assertEquals(Status.FINISHED, p.getStatus()); note.run(p1.getId(), true); assertEquals(Status.ERROR, p1.getStatus()); // depends on JVM language diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 26fc4e41236..d560518b5fa 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -460,6 +460,14 @@ public ManagedInterpreterGroup getOrCreateInterpreterGroup(ExecutionContext exec groupId, executionContext); ManagedInterpreterGroup intpGroup = createInterpreterGroup(groupId); interpreterGroups.put(groupId, intpGroup); + } else { + // Check for a crashed interpreter process and restart interpreterGroup in this case + ManagedInterpreterGroup interpreterGroup = interpreterGroups.get(groupId); + if (interpreterGroup.isInterpreterProcessCrashed()) { + interpreterGroup.close(); + interpreterGroups.remove(interpreterGroup.getId()); + return getOrCreateInterpreterGroup(executionContext); + } } return interpreterGroups.get(groupId); } finally { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index 8f2c16c0743..8cda125be00 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -60,6 +60,15 @@ public InterpreterSetting getInterpreterSetting() { return interpreterSetting; } + public boolean isInterpreterProcessCrashed() { + if (remoteInterpreterProcess == null) { + return false; + } + synchronized (interpreterProcessCreationLock) { + return !remoteInterpreterProcess.isRunning(); + } + } + public RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName, Properties properties) throws IOException {