Skip to content

Commit

Permalink
[bugfix] [Zeta] Fix the problem of class loader not releasing when us…
Browse files Browse the repository at this point in the history
…ing REST API to submit jobs (#6477)
  • Loading branch information
liugddx authored Mar 12, 2024
1 parent bccf9a1 commit 7c0ea2e
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
import org.apache.seatunnel.engine.server.rest.RestConstant;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -31,22 +34,36 @@
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

import io.restassured.response.Response;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static io.restassured.RestAssured.given;
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
import static org.hamcrest.Matchers.equalTo;

public abstract class ClassLoaderITBase extends SeaTunnelContainer {

private static final String CONF_FILE = "/classloader/fake_to_inmemory.conf";

private static final String http = "http://";

private static final String colon = ":";

abstract boolean cacheMode();

private static final Path config = Paths.get(SEATUNNEL_HOME, "config");

private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL);

abstract String seatunnelConfigFileName();

@Test
Expand All @@ -65,6 +82,96 @@ public void testFakeSourceToInMemorySink() throws IOException, InterruptedExcept
}
}

@Test
public void testFakeSourceToInMemorySinkForRestApi() throws IOException, InterruptedException {
LOG.info("test classloader with cache mode: {}", cacheMode());
ContainerUtil.copyConnectorJarToContainer(
server,
CONF_FILE,
getConnectorModulePath(),
getConnectorNamePrefix(),
getConnectorType(),
SEATUNNEL_HOME);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() -> {
Response response =
given().get(
http
+ server.getHost()
+ colon
+ server.getFirstMappedPort()
+ "/hazelcast/rest/cluster");
response.then().statusCode(200);
Thread.sleep(10000);
Assertions.assertEquals(
1, response.jsonPath().getList("members").size());
});
for (int i = 0; i < 10; i++) {
// load in memory sink which already leak thread with classloader
given().body(
"{\n"
+ "\t\"env\": {\n"
+ "\t\t\"parallelism\": 10,\n"
+ "\t\t\"job.mode\": \"BATCH\"\n"
+ "\t},\n"
+ "\t\"source\": [\n"
+ "\t\t{\n"
+ "\t\t\t\"plugin_name\": \"FakeSource\",\n"
+ "\t\t\t\"result_table_name\": \"fake\",\n"
+ "\t\t\t\"parallelism\": 10,\n"
+ "\t\t\t\"schema\": {\n"
+ "\t\t\t\t\"fields\": {\n"
+ "\t\t\t\t\t\"name\": \"string\",\n"
+ "\t\t\t\t\t\"age\": \"int\",\n"
+ "\t\t\t\t\t\"score\": \"double\"\n"
+ "\t\t\t\t}\n"
+ "\t\t\t}\n"
+ "\t\t}\n"
+ "\t],\n"
+ "\t\"transform\": [],\n"
+ "\t\"sink\": [\n"
+ "\t\t{\n"
+ "\t\t\t\"plugin_name\": \"InMemory\",\n"
+ "\t\t\t\"source_table_name\": \"fake\"\n"
+ "\t\t}\n"
+ "\t]\n"
+ "}")
.header("Content-Type", "application/json; charset=utf-8")
.post(
http
+ server.getHost()
+ colon
+ server.getFirstMappedPort()
+ RestConstant.SUBMIT_JOB_URL)
.then()
.statusCode(200);

Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
given().get(
http
+ server.getHost()
+ colon
+ server.getFirstMappedPort()
+ RestConstant.FINISHED_JOBS_INFO
+ "/FINISHED")
.then()
.statusCode(200)
.body("[0].jobStatus", equalTo("FINISHED")));
Thread.sleep(5000);
Assertions.assertTrue(containsDaemonThread());
if (cacheMode()) {
Assertions.assertEquals(3, getClassLoaderCount());
} else {
Assertions.assertEquals(2 + i, getClassLoaderCount());
}
}
}

private int getClassLoaderCount() throws IOException, InterruptedException {
Map<String, Integer> objects = ContainerUtil.getJVMLiveObject(server);
String className =
Expand All @@ -79,7 +186,7 @@ private boolean containsDaemonThread() throws IOException, InterruptedException
}

@Override
@BeforeAll
@BeforeEach
public void startUp() throws Exception {
server =
new GenericContainer<>(getDockerImage())
Expand All @@ -96,7 +203,7 @@ public void startUp() throws Exception {
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
.waitingFor(Wait.forListeningPort());
copySeaTunnelStarterToContainer(server);
server.setPortBindings(Collections.singletonList("5801:5801"));
server.setExposedPorts(Collections.singletonList(5801));

server.withCopyFileToContainer(
MountableFile.forHostPath(
Expand Down Expand Up @@ -148,4 +255,10 @@ public void startUp() throws Exception {
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties"),
Paths.get(SEATUNNEL_HOME, "connectors", "plugin-mapping.properties").toString());
}

@AfterEach
@Override
public void tearDown() throws Exception {
super.tearDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected MultipleTableJobConfigParser getJobConfigParser() {

@Override
protected LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse(null);
actions.addAll(immutablePair.getLeft());
// Enable upload connector jar package to engine server, automatically upload connector Jar
// packages and dependent third-party Jar packages to the server before job execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testLogicalGenerator() {

IdGenerator idGenerator = new IdGenerator();
ImmutablePair<List<Action>, Set<URL>> immutablePair =
new MultipleTableJobConfigParser(filePath, idGenerator, jobConfig).parse();
new MultipleTableJobConfigParser(filePath, idGenerator, jobConfig).parse(null);

LogicalDagGenerator logicalDagGenerator =
new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, idGenerator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testSimpleJobParse() {
jobConfig.setJobContext(new JobContext());
MultipleTableJobConfigParser jobConfigParser =
new MultipleTableJobConfigParser(filePath, new IdGenerator(), jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse(null);
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
Assertions.assertEquals("Sink[0]-LocalFile-MultiTableSink", actions.get(0).getName());
Expand All @@ -71,7 +71,7 @@ public void testComplexJobParse() {
jobConfig.setJobContext(new JobContext());
MultipleTableJobConfigParser jobConfigParser =
new MultipleTableJobConfigParser(filePath, new IdGenerator(), jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse(null);
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());

Expand Down Expand Up @@ -102,7 +102,7 @@ public void testMultipleSinkName() {
jobConfig.setJobContext(new JobContext());
MultipleTableJobConfigParser jobConfigParser =
new MultipleTableJobConfigParser(filePath, new IdGenerator(), jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse(null);
List<Action> actions = parse.getLeft();
Assertions.assertEquals(2, actions.size());

Expand All @@ -122,7 +122,7 @@ public void testMultipleTableSourceWithMultiTableSinkParse() throws IOException
Config config = ConfigBuilder.of(Paths.get(filePath));
MultipleTableJobConfigParser jobConfigParser =
new MultipleTableJobConfigParser(config, new IdGenerator(), jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse(null);
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
Assertions.assertEquals("Sink[0]-console-MultiTableSink", actions.get(0).getName());
Expand All @@ -142,7 +142,7 @@ public void testDuplicatedTransformInOnePipeline() {
Config config = ConfigBuilder.of(Paths.get(filePath));
MultipleTableJobConfigParser jobConfigParser =
new MultipleTableJobConfigParser(config, new IdGenerator(), jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse(null);
List<Action> actions = parse.getLeft();
Assertions.assertEquals("Transform[0]-sql", actions.get(0).getUpstream().get(0).getName());
Assertions.assertEquals("Transform[1]-sql", actions.get(1).getUpstream().get(0).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.service.classloader;
package org.apache.seatunnel.engine.core.classloader;

import java.net.URL;
import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.service.classloader;
package org.apache.seatunnel.engine.core.classloader;

import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
Expand Down Expand Up @@ -149,7 +151,7 @@ public MultipleTableJobConfigParser(
new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint);
}

public ImmutablePair<List<Action>, Set<URL>> parse() {
public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoaderService) {
List<? extends Config> sourceConfigs =
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "source", Collections.emptyList());
Expand All @@ -165,8 +167,15 @@ public ImmutablePair<List<Action>, Set<URL>> parse() {
connectorJars.addAll(commonPluginJars);
}
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader classLoader =
new SeaTunnelChildFirstClassLoader(connectorJars, parentClassLoader);

ClassLoader classLoader;
if (classLoaderService == null) {
classLoader = new SeaTunnelChildFirstClassLoader(connectorJars, parentClassLoader);
} else {
classLoader =
classLoaderService.getClassLoader(
Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars);
}
try {
Thread.currentThread().setContextClassLoader(classLoader);
ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs);
Expand Down Expand Up @@ -196,6 +205,11 @@ public ImmutablePair<List<Action>, Set<URL>> parse() {
return new ImmutablePair<>(sinkActions, factoryUrls);
} finally {
Thread.currentThread().setContextClassLoader(parentClassLoader);
if (classLoaderService != null) {
classLoaderService.releaseClassLoader(
Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars);
}
ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.service.classloader;
package org.apache.seatunnel.engine.core.classloader;

import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;

import org.apache.curator.shaded.com.google.common.collect.Lists;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Lists;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.service.classloader;

import org.apache.curator.shaded.com.google.common.collect.Lists;
package org.apache.seatunnel.engine.core.classloader;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Lists;

import java.net.MalformedURLException;
import java.net.URL;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.service.classloader;

import org.apache.curator.shaded.com.google.common.collect.Lists;
package org.apache.seatunnel.engine.core.classloader;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Lists;

import java.net.MalformedURLException;
import java.net.URL;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.classloader.DefaultClassLoaderService;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.service.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.server.service.classloader.DefaultClassLoaderService;
import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
Expand All @@ -42,7 +43,6 @@
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.service.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
Expand Down
Loading

0 comments on commit 7c0ea2e

Please sign in to comment.