Skip to content

Commit

Permalink
[Improve][Zeta] Move SaveMode behavior to master (apache#6843)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and chaorongzhi committed Aug 21, 2024
1 parent 46c5ab3 commit 8a735a1
Show file tree
Hide file tree
Showing 20 changed files with 469 additions and 29 deletions.
6 changes: 6 additions & 0 deletions docs/en/concept/JobEnvConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ This parameter configures the parallelism of source and sink.

Used to control the default retry times when a job fails. The default value is 3, and it only works in the Zeta engine.

### savemode.execute.location

This parameter is used to specify the location of the savemode when the job is executed in the Zeta engine.
The default value is `CLUSTER`, which means that the savemode is executed on the cluster. If you want to execute the savemode on the client,
you can set it to `CLIENT`. Please use `CLUSTER` mode as much as possible, because when there are no problems with `CLUSTER` mode, we will remove `CLIENT` mode.

### shade.identifier

Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting config files, this option can be ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
import org.apache.seatunnel.common.constants.JobMode;

import java.util.Map;
Expand Down Expand Up @@ -76,6 +77,12 @@ public interface EnvCommonOptions {
.noDefaultValue()
.withDescription("The timeout (in milliseconds) for a checkpoint.");

Option<SaveModeExecuteLocation> SAVEMODE_EXECUTE_LOCATION =
Options.key("savemode.execute.location")
.enumType(SaveModeExecuteLocation.class)
.defaultValue(SaveModeExecuteLocation.CLUSTER)
.withDescription("The location of save mode execute.");

Option<String> JARS =
Options.key("jars")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public static OptionRule getEnvOptionRules() {
EnvCommonOptions.CHECKPOINT_TIMEOUT,
EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION,
EnvCommonOptions.CUSTOM_PARAMETERS)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

public enum SaveModeExecuteLocation {
@Deprecated
CLIENT,
CLUSTER
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public Optional<Serializer<DorisCommitInfo>> getCommitInfoSerializer() {

@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
// Load the JDBC driver in to DriverManager
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
CatalogFactory catalogFactory =
discoverFactory(
Thread.currentThread().getContextClassLoader(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public Optional<Serializer<XidInfo>> getCommitInfoSerializer() {

@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
try {
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
if (catalogTable != null) {
if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public JdbcSinkWriter(
public MultiTableResourceManager<ConnectionPoolManager> initMultiTableResourceManager(
int tableSize, int queueSize) {
HikariDataSource ds = new HikariDataSource();
try {
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
ds.setIdleTimeout(30 * 1000);
ds.setMaximumPoolSize(queueSize);
ds.setJdbcUrl(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.sink.inmemory;

import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;

import lombok.extern.slf4j.Slf4j;

import java.util.List;

@Slf4j
public class InMemorySaveModeHandler implements SaveModeHandler {

private final CatalogTable catalogTable;

public InMemorySaveModeHandler(CatalogTable catalogTable) {
this.catalogTable = catalogTable;
}

@Override
public void handleSchemaSaveMode() {
log.info("handle schema savemode with table path: {}", catalogTable.getTablePath());
}

@Override
public void handleDataSaveMode() {
log.info("handle data savemode with table path: {}", catalogTable.getTablePath());
}

@Override
public SchemaSaveMode getSchemaSaveMode() {
return SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST;
}

@Override
public DataSaveMode getDataSaveMode() {
return DataSaveMode.APPEND_DATA;
}

@Override
public TablePath getHandleTablePath() {
return catalogTable.getTablePath();
}

@Override
public Catalog getHandleCatalog() {
return new Catalog() {
@Override
public void open() throws CatalogException {}

@Override
public void close() throws CatalogException {}

@Override
public String name() {
return "InMemoryCatalog";
}

@Override
public String getDefaultDatabase() throws CatalogException {
return null;
}

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
return false;
}

@Override
public List<String> listDatabases() throws CatalogException {
return null;
}

@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
return null;
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
return false;
}

@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
return null;
}

@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {}

@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {}

@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {}

@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {}
};
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import java.io.IOException;
Expand All @@ -35,11 +38,14 @@ public class InMemorySink
InMemoryState,
InMemoryCommitInfo,
InMemoryAggregatedCommitInfo>,
SupportMultiTableSink {
SupportMultiTableSink,
SupportSaveMode {

private ReadonlyConfig config;
private CatalogTable catalogTable;

public InMemorySink(ReadonlyConfig config) {
public InMemorySink(CatalogTable catalogTable, ReadonlyConfig config) {
this.catalogTable = catalogTable;
this.config = config;
}

Expand Down Expand Up @@ -69,4 +75,9 @@ public Optional<Serializer<InMemoryCommitInfo>> getCommitInfoSerializer() {
public Optional<Serializer<InMemoryAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
return Optional.of(new InMemorySaveModeHandler(catalogTable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ public OptionRule optionRule() {
@Override
public TableSink<SeaTunnelRow, InMemoryState, InMemoryCommitInfo, InMemoryAggregatedCommitInfo>
createSink(TableSinkFactoryContext context) {
return () -> new InMemorySink(context.getOptions());
return () -> new InMemorySink(context.getCatalogTable(), context.getOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,51 @@ public void testMultiTableSinkFailedWithThrowable() throws IOException, Interrup
Assertions.assertNotEquals(0, execResult.getExitCode());
}

@Test
public void testSaveModeOnMasterOrClient() throws IOException, InterruptedException {
Container.ExecResult execResult =
executeJob(server, "/savemode/fake_to_inmemory_savemode.conf");
Assertions.assertEquals(0, execResult.getExitCode());
int serverLogLength = 0;
String serverLogs = server.getLogs();
Assertions.assertTrue(
serverLogs.contains(
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle schema savemode with table path: test.table1"));
Assertions.assertTrue(
serverLogs.contains(
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data savemode with table path: test.table1"));
Assertions.assertTrue(
serverLogs.contains(
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle schema savemode with table path: test.table2"));
Assertions.assertTrue(
serverLogs.contains(
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data savemode with table path: test.table2"));

// restore will not execute savemode
execResult = restoreJob(server, "/savemode/fake_to_inmemory_savemode.conf", "1");
Assertions.assertEquals(0, execResult.getExitCode());
// clear old logs
serverLogLength += serverLogs.length();
serverLogs = server.getLogs().substring(serverLogLength);
Assertions.assertFalse(serverLogs.contains("handle schema savemode with table path"));
Assertions.assertFalse(serverLogs.contains("handle data savemode with table path"));

// test savemode on client side
Container.ExecResult execResult2 =
executeJob(server, "/savemode/fake_to_inmemory_savemode_client.conf");
Assertions.assertEquals(0, execResult2.getExitCode());
// clear old logs
serverLogLength += serverLogs.length();
serverLogs = server.getLogs().substring(serverLogLength);
Assertions.assertFalse(serverLogs.contains("handle schema savemode with table path"));
Assertions.assertFalse(serverLogs.contains("handle data savemode with table path"));

Assertions.assertTrue(
execResult2.getStdout().contains("handle schema savemode with table path"));
Assertions.assertTrue(
execResult2.getStdout().contains("handle data savemode with table path"));
}

@Test
public void testJobFailedWillThrowException() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelJob("/batch_slot_not_enough.conf");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void testFakeSourceToInMemorySinkForRestApi() throws IOException, Interru
if (cacheMode()) {
Assertions.assertTrue(3 >= getClassLoaderCount());
} else {
Assertions.assertTrue(2 + i >= getClassLoaderCount());
Assertions.assertTrue(3 + 2 * i >= getClassLoaderCount());
}
}
}
Expand Down
Loading

0 comments on commit 8a735a1

Please sign in to comment.