diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java index d9b0ad23706..bdc6163c3be 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java @@ -20,9 +20,12 @@ import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.apache.seatunnel.engine.e2e.SeaTunnelContainer; +import org.apache.seatunnel.engine.server.rest.RestConstant; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; @@ -31,22 +34,36 @@ import org.testcontainers.utility.DockerLoggerFactory; import org.testcontainers.utility.MountableFile; +import io.restassured.response.Response; + import java.io.File; import java.io.IOException; import java.net.URL; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static io.restassured.RestAssured.given; import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; +import static org.hamcrest.Matchers.equalTo; public abstract class ClassLoaderITBase extends SeaTunnelContainer { private static final String CONF_FILE = "/classloader/fake_to_inmemory.conf"; + private static final String http = "http://"; + + private static final String colon = ":"; + abstract boolean cacheMode(); + private static final Path config = Paths.get(SEATUNNEL_HOME, "config"); + + private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL); + abstract String seatunnelConfigFileName(); @Test @@ -65,6 +82,96 @@ public void testFakeSourceToInMemorySink() throws IOException, InterruptedExcept } } + @Test + public void testFakeSourceToInMemorySinkForRestApi() throws IOException, InterruptedException { + LOG.info("test classloader with cache mode: {}", cacheMode()); + ContainerUtil.copyConnectorJarToContainer( + server, + CONF_FILE, + getConnectorModulePath(), + getConnectorNamePrefix(), + getConnectorType(), + SEATUNNEL_HOME); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + Response response = + given().get( + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + "/hazelcast/rest/cluster"); + response.then().statusCode(200); + Thread.sleep(10000); + Assertions.assertEquals( + 1, response.jsonPath().getList("members").size()); + }); + for (int i = 0; i < 10; i++) { + // load in memory sink which already leak thread with classloader + given().body( + "{\n" + + "\t\"env\": {\n" + + "\t\t\"parallelism\": 10,\n" + + "\t\t\"job.mode\": \"BATCH\"\n" + + "\t},\n" + + "\t\"source\": [\n" + + "\t\t{\n" + + "\t\t\t\"plugin_name\": \"FakeSource\",\n" + + "\t\t\t\"result_table_name\": \"fake\",\n" + + "\t\t\t\"parallelism\": 10,\n" + + "\t\t\t\"schema\": {\n" + + "\t\t\t\t\"fields\": {\n" + + "\t\t\t\t\t\"name\": \"string\",\n" + + "\t\t\t\t\t\"age\": \"int\",\n" + + "\t\t\t\t\t\"score\": \"double\"\n" + + "\t\t\t\t}\n" + + "\t\t\t}\n" + + "\t\t}\n" + + "\t],\n" + + "\t\"transform\": [],\n" + + "\t\"sink\": [\n" + + "\t\t{\n" + + "\t\t\t\"plugin_name\": \"InMemory\",\n" + + "\t\t\t\"source_table_name\": \"fake\"\n" + + "\t\t}\n" + + "\t]\n" + + "}") + .header("Content-Type", "application/json; charset=utf-8") + .post( + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + RestConstant.SUBMIT_JOB_URL) + .then() + .statusCode(200); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + server.getHost() + + colon + + server.getFirstMappedPort() + + RestConstant.FINISHED_JOBS_INFO + + "/FINISHED") + .then() + .statusCode(200) + .body("[0].jobStatus", equalTo("FINISHED"))); + Thread.sleep(5000); + Assertions.assertTrue(containsDaemonThread()); + if (cacheMode()) { + Assertions.assertEquals(3, getClassLoaderCount()); + } else { + Assertions.assertEquals(2 + i, getClassLoaderCount()); + } + } + } + private int getClassLoaderCount() throws IOException, InterruptedException { Map objects = ContainerUtil.getJVMLiveObject(server); String className = @@ -79,7 +186,7 @@ private boolean containsDaemonThread() throws IOException, InterruptedException } @Override - @BeforeAll + @BeforeEach public void startUp() throws Exception { server = new GenericContainer<>(getDockerImage()) @@ -96,7 +203,7 @@ public void startUp() throws Exception { "seatunnel-engine:" + JDK_DOCKER_IMAGE))) .waitingFor(Wait.forListeningPort()); copySeaTunnelStarterToContainer(server); - server.setPortBindings(Collections.singletonList("5801:5801")); + server.setExposedPorts(Collections.singletonList(5801)); server.withCopyFileToContainer( MountableFile.forHostPath( @@ -148,4 +255,10 @@ public void startUp() throws Exception { + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties"), Paths.get(SEATUNNEL_HOME, "connectors", "plugin-mapping.properties").toString()); } + + @AfterEach + @Override + public void tearDown() throws Exception { + super.tearDown(); + } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java index 8e0f0c689bd..d92f9722dc3 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java @@ -85,7 +85,7 @@ protected MultipleTableJobConfigParser getJobConfigParser() { @Override protected LogicalDag getLogicalDag() { - ImmutablePair, Set> immutablePair = getJobConfigParser().parse(); + ImmutablePair, Set> immutablePair = getJobConfigParser().parse(null); actions.addAll(immutablePair.getLeft()); // Enable upload connector jar package to engine server, automatically upload connector Jar // packages and dependent third-party Jar packages to the server before job execution. diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java index fc9f2cb72f7..3cd9ed7604c 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java @@ -49,7 +49,7 @@ public void testLogicalGenerator() { IdGenerator idGenerator = new IdGenerator(); ImmutablePair, Set> immutablePair = - new MultipleTableJobConfigParser(filePath, idGenerator, jobConfig).parse(); + new MultipleTableJobConfigParser(filePath, idGenerator, jobConfig).parse(null); LogicalDagGenerator logicalDagGenerator = new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, idGenerator); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java index 319e7515496..abc81903dee 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java @@ -51,7 +51,7 @@ public void testSimpleJobParse() { jobConfig.setJobContext(new JobContext()); MultipleTableJobConfigParser jobConfigParser = new MultipleTableJobConfigParser(filePath, new IdGenerator(), jobConfig); - ImmutablePair, Set> parse = jobConfigParser.parse(); + ImmutablePair, Set> parse = jobConfigParser.parse(null); List actions = parse.getLeft(); Assertions.assertEquals(1, actions.size()); Assertions.assertEquals("Sink[0]-LocalFile-MultiTableSink", actions.get(0).getName()); @@ -71,7 +71,7 @@ public void testComplexJobParse() { jobConfig.setJobContext(new JobContext()); MultipleTableJobConfigParser jobConfigParser = new MultipleTableJobConfigParser(filePath, new IdGenerator(), jobConfig); - ImmutablePair, Set> parse = jobConfigParser.parse(); + ImmutablePair, Set> parse = jobConfigParser.parse(null); List actions = parse.getLeft(); Assertions.assertEquals(1, actions.size()); @@ -102,7 +102,7 @@ public void testMultipleSinkName() { jobConfig.setJobContext(new JobContext()); MultipleTableJobConfigParser jobConfigParser = new MultipleTableJobConfigParser(filePath, new IdGenerator(), jobConfig); - ImmutablePair, Set> parse = jobConfigParser.parse(); + ImmutablePair, Set> parse = jobConfigParser.parse(null); List actions = parse.getLeft(); Assertions.assertEquals(2, actions.size()); @@ -122,7 +122,7 @@ public void testMultipleTableSourceWithMultiTableSinkParse() throws IOException Config config = ConfigBuilder.of(Paths.get(filePath)); MultipleTableJobConfigParser jobConfigParser = new MultipleTableJobConfigParser(config, new IdGenerator(), jobConfig); - ImmutablePair, Set> parse = jobConfigParser.parse(); + ImmutablePair, Set> parse = jobConfigParser.parse(null); List actions = parse.getLeft(); Assertions.assertEquals(1, actions.size()); Assertions.assertEquals("Sink[0]-console-MultiTableSink", actions.get(0).getName()); @@ -142,7 +142,7 @@ public void testDuplicatedTransformInOnePipeline() { Config config = ConfigBuilder.of(Paths.get(filePath)); MultipleTableJobConfigParser jobConfigParser = new MultipleTableJobConfigParser(config, new IdGenerator(), jobConfig); - ImmutablePair, Set> parse = jobConfigParser.parse(); + ImmutablePair, Set> parse = jobConfigParser.parse(null); List actions = parse.getLeft(); Assertions.assertEquals("Transform[0]-sql", actions.get(0).getUpstream().get(0).getName()); Assertions.assertEquals("Transform[1]-sql", actions.get(1).getUpstream().get(0).getName()); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderService.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderService.java similarity index 96% rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderService.java rename to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderService.java index 2a596c39769..b832882733c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderService.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.engine.server.service.classloader; +package org.apache.seatunnel.engine.core.classloader; import java.net.URL; import java.util.Collection; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/classloader/DefaultClassLoaderService.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java similarity index 98% rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/classloader/DefaultClassLoaderService.java rename to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java index 6c647a82d26..36c7ae2f029 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/classloader/DefaultClassLoaderService.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.engine.server.service.classloader; +package org.apache.seatunnel.engine.core.classloader; import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 0a32b0cf00c..395f8b4a1ac 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -46,8 +46,10 @@ import org.apache.seatunnel.core.starter.utils.ConfigBuilder; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.exception.JobDefineCheckException; +import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil; import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader; import org.apache.seatunnel.engine.common.utils.IdGenerator; +import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; import org.apache.seatunnel.engine.core.dag.actions.Action; import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.core.dag.actions.SinkConfig; @@ -149,7 +151,7 @@ public MultipleTableJobConfigParser( new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint); } - public ImmutablePair, Set> parse() { + public ImmutablePair, Set> parse(ClassLoaderService classLoaderService) { List sourceConfigs = TypesafeConfigUtils.getConfigList( seaTunnelJobConfig, "source", Collections.emptyList()); @@ -165,8 +167,15 @@ public ImmutablePair, Set> parse() { connectorJars.addAll(commonPluginJars); } ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader(); - ClassLoader classLoader = - new SeaTunnelChildFirstClassLoader(connectorJars, parentClassLoader); + + ClassLoader classLoader; + if (classLoaderService == null) { + classLoader = new SeaTunnelChildFirstClassLoader(connectorJars, parentClassLoader); + } else { + classLoader = + classLoaderService.getClassLoader( + Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars); + } try { Thread.currentThread().setContextClassLoader(classLoader); ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs); @@ -196,6 +205,11 @@ public ImmutablePair, Set> parse() { return new ImmutablePair<>(sinkActions, factoryUrls); } finally { Thread.currentThread().setContextClassLoader(parentClassLoader); + if (classLoaderService != null) { + classLoaderService.releaseClassLoader( + Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars); + } + ClassLoaderUtil.recycleClassLoaderFromThread(classLoader); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/AbstractClassLoaderServiceTest.java b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java similarity index 97% rename from seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/AbstractClassLoaderServiceTest.java rename to seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java index 87360951c23..779ab63c5a5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/AbstractClassLoaderServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package org.apache.seatunnel.engine.server.service.classloader; +package org.apache.seatunnel.engine.core.classloader; import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader; -import org.apache.curator.shaded.com.google.common.collect.Lists; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.google.common.collect.Lists; + import java.net.MalformedURLException; import java.net.URL; import java.util.Collections; diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderServiceCacheModeTest.java b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceCacheModeTest.java similarity index 95% rename from seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderServiceCacheModeTest.java rename to seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceCacheModeTest.java index 60f249b950c..7c6c569c84e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderServiceCacheModeTest.java +++ b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceCacheModeTest.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.seatunnel.engine.server.service.classloader; - -import org.apache.curator.shaded.com.google.common.collect.Lists; +package org.apache.seatunnel.engine.core.classloader; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.google.common.collect.Lists; + import java.net.MalformedURLException; import java.net.URL; diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderServiceTest.java b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java similarity index 95% rename from seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderServiceTest.java rename to seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java index af72f6ee9bd..0e2fe90af2b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/service/classloader/ClassLoaderServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.seatunnel.engine.server.service.classloader; - -import org.apache.curator.shaded.com.google.common.collect.Lists; +package org.apache.seatunnel.engine.core.classloader; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.google.common.collect.Lists; + import java.net.MalformedURLException; import java.net.URL; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index 4bb041e211c..e9dcdca779f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -21,10 +21,10 @@ import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; +import org.apache.seatunnel.engine.core.classloader.DefaultClassLoaderService; import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; -import org.apache.seatunnel.engine.server.service.classloader.ClassLoaderService; -import org.apache.seatunnel.engine.server.service.classloader.DefaultClassLoaderService; import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService; import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService; import org.apache.seatunnel.engine.server.service.slot.SlotService; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 68fa111db99..e4ff187bb22 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.common.exception.JobNotFoundException; import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier; import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException; import org.apache.seatunnel.engine.server.execution.ExecutionState; @@ -42,7 +43,6 @@ import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.execution.TaskTracker; import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext; -import org.apache.seatunnel.engine.server.service.classloader.ClassLoaderService; import org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient; import org.apache.seatunnel.engine.server.task.SeaTunnelTask; import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 08d75891201..81a1047c749 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.api.common.metrics.JobMetrics; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.job.JobDAGInfo; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; @@ -35,7 +36,6 @@ import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation; import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation; import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation; -import org.apache.seatunnel.engine.server.service.classloader.ClassLoaderService; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; import com.hazelcast.cluster.Address; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java index 6f09e0aadf1..777cb609a02 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java @@ -124,8 +124,10 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) boolean startWithSavePoint = Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)); + SeaTunnelServer seaTunnelServer = getSeaTunnelServer(); RestJobExecutionEnvironment restJobExecutionEnvironment = new RestJobExecutionEnvironment( + seaTunnelServer, jobConfig, config, textCommandService.getNode(), @@ -135,7 +137,6 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) : null); JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build(); Long jobId = jobImmutableInformation.getJobId(); - SeaTunnelServer seaTunnelServer = getSeaTunnelServer(); if (seaTunnelServer == null) { NodeEngineUtil.sendOperationToMasterNode( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java index c1aa84dd6d2..a166d0a4d5a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; +import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -47,13 +48,17 @@ public class RestJobExecutionEnvironment extends AbstractJobEnvironment { private final Long jobId; + private final SeaTunnelServer seaTunnelServer; + public RestJobExecutionEnvironment( + SeaTunnelServer seaTunnelServer, JobConfig jobConfig, Config seaTunnelJobConfig, Node node, boolean isStartWithSavePoint, Long jobId) { super(jobConfig, isStartWithSavePoint); + this.seaTunnelServer = seaTunnelServer; this.seaTunnelJobConfig = seaTunnelJobConfig; this.nodeEngine = node.getNodeEngine(); this.jobConfig.setJobContext( @@ -73,7 +78,8 @@ public Long getJobId() { @Override protected LogicalDag getLogicalDag() { - ImmutablePair, Set> immutablePair = getJobConfigParser().parse(); + ImmutablePair, Set> immutablePair = + getJobConfigParser().parse(seaTunnelServer.getClassLoaderService()); actions.addAll(immutablePair.getLeft()); jarUrls.addAll(commonPluginJars); jarUrls.addAll(immutablePair.getRight()); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java index a5c0569d608..a61715aeafc 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java @@ -199,7 +199,7 @@ public void testRestoreWhenMasterNodeSwitch() throws InterruptedException, IOExc fillJobConfig(jobConfig, envOptions); List commonPluginJars = new ArrayList<>(searchPluginJars()); commonPluginJars.addAll( - new ArrayList( + new ArrayList<>( Common.getThirdPartyJars( jobConfig .getEnvOptions() @@ -220,7 +220,8 @@ public void testRestoreWhenMasterNodeSwitch() throws InterruptedException, IOExc MultipleTableJobConfigParser multipleTableJobConfigParser = new MultipleTableJobConfigParser( filePath, new IdGenerator(), jobConfig, commonPluginJars, false); - ImmutablePair, Set> immutablePair = multipleTableJobConfigParser.parse(); + ImmutablePair, Set> immutablePair = + multipleTableJobConfigParser.parse(null); Set commonJarIdentifiers = new HashSet<>(); // Upload commonPluginJar diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java index 6dac20a34d0..79d487d50a7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java @@ -119,7 +119,7 @@ public static LogicalDag createTestLogicalPlan( IdGenerator idGenerator = new IdGenerator(); ImmutablePair, Set> immutablePair = - new MultipleTableJobConfigParser(filePath, idGenerator, jobConfig).parse(); + new MultipleTableJobConfigParser(filePath, idGenerator, jobConfig).parse(null); LogicalDagGenerator logicalDagGenerator = new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, idGenerator);