From 2c4b85b3ce3ba3aaada7b619c1f5cc086c270055 Mon Sep 17 00:00:00 2001 From: liugddx Date: Wed, 9 Oct 2024 07:59:46 +0800 Subject: [PATCH 01/13] 1 --- .../parse/MultipleTableJobConfigParser.java | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index cb7f118a6e4..0547e1a5225 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -565,30 +565,6 @@ private static T findLast(LinkedHashMap map) { jarUrls.addAll(getSinkPluginJarPaths(sinkConfig)); List> sinkActions = new ArrayList<>(); - // union - if (inputVertices.size() > 1) { - Set inputActions = - inputVertices.stream() - .flatMap(Collection::stream) - .map(Tuple2::_2) - .collect(Collectors.toCollection(LinkedHashSet::new)); - checkProducedTypeEquals(inputActions); - Tuple2 inputActionSample = inputVertices.get(0).get(0); - SinkAction sinkAction = - createSinkAction( - inputActionSample._1(), - inputActions, - readonlyConfig, - classLoader, - jarUrls, - new HashSet<>(), - factoryId, - inputActionSample._2().getParallelism(), - configIndex); - sinkActions.add(sinkAction); - return sinkActions; - } - // TODO move it into tryGenerateMultiTableSink when we don't support sink template // sink template for (Tuple2 tuple : inputVertices.get(0)) { From 41432ba86fb0d0b217df355866d1683dfc1b47d4 Mon Sep 17 00:00:00 2001 From: liugddx Date: Fri, 11 Oct 2024 20:05:40 +0800 Subject: [PATCH 02/13] 1 --- .../parse/MultipleTableJobConfigParser.java | 24 +++++++++++++++++++ .../server/rest/servlet/BaseServlet.java | 17 +++++++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 0547e1a5225..cb7f118a6e4 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -565,6 +565,30 @@ private static T findLast(LinkedHashMap map) { jarUrls.addAll(getSinkPluginJarPaths(sinkConfig)); List> sinkActions = new ArrayList<>(); + // union + if (inputVertices.size() > 1) { + Set inputActions = + inputVertices.stream() + .flatMap(Collection::stream) + .map(Tuple2::_2) + .collect(Collectors.toCollection(LinkedHashSet::new)); + checkProducedTypeEquals(inputActions); + Tuple2 inputActionSample = inputVertices.get(0).get(0); + SinkAction sinkAction = + createSinkAction( + inputActionSample._1(), + inputActions, + readonlyConfig, + classLoader, + jarUrls, + new HashSet<>(), + factoryId, + inputActionSample._2().getParallelism(), + configIndex); + sinkActions.add(sinkAction); + return sinkActions; + } + // TODO move it into tryGenerateMultiTableSink when we don't support sink template // sink template for (Tuple2 tuple : inputVertices.get(0)) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java index ce5fc74d3c8..15cd4eaecce 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java @@ -57,6 +57,7 @@ import com.hazelcast.internal.json.JsonObject; import com.hazelcast.internal.json.JsonValue; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.util.JsonUtil; import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject; import com.hazelcast.spi.impl.NodeEngineImpl; @@ -74,7 +75,6 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -import static com.hazelcast.internal.util.JsonUtil.toJsonObject; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; @@ -188,7 +188,7 @@ nodeEngine, new GetJobStatusOperation(jobId)) .add(RestConstant.JOB_STATUS, jobStatus.toString()) .add( RestConstant.ENV_OPTIONS, - toJsonObject(logicalDag.getJobConfig().getEnvOptions())) + JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions())) .add( RestConstant.CREATE_TIME, DateTimeUtils.toString( @@ -575,4 +575,17 @@ private void submitJob( jobImmutableInformation.isStartWithSavePoint()); voidPassiveCompletableFuture.join(); } + + private JsonObject toJsonObject(Map jobMetrics) { + JsonObject members = new JsonObject(); + jobMetrics.forEach( + (key, value) -> { + if (value instanceof Map) { + members.add(key, toJsonObject((Map) value)); + } else { + members.add(key, value.toString()); + } + }); + return members; + } } From a223e2637805ec56b1a63051302ce221cbaceaf4 Mon Sep 17 00:00:00 2001 From: liugddx Date: Fri, 11 Oct 2024 22:40:21 +0800 Subject: [PATCH 03/13] 1 --- .../sink/multitablesink/MultiTableSink.java | 14 ++++-- .../seatunnel/engine/server/dag/DAGUtils.java | 8 +++- .../metrics/TaskMetricsCalcContext.java | 3 +- .../server/task/SeaTunnelSourceCollector.java | 2 +- .../server/task/flow/SinkFlowLifeCycle.java | 44 +++++++++++++++++-- 5 files changed, 59 insertions(+), 12 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java index 2c3e9c6582b..b8967a5a654 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java @@ -158,17 +158,23 @@ public Optional> getCommitInfoSerializer() { return Optional.of(new MultiTableSinkAggregatedCommitter(aggCommitters)); } - public List getSinkTables() { + public List> getSinkTables() { + + List> tablePaths = new ArrayList<>(); - List tablePaths = new ArrayList<>(); List values = new ArrayList<>(sinks.values()); for (int i = 0; i < values.size(); i++) { + Map tablePath = new HashMap<>(); if (values.get(i).getWriteCatalogTable().isPresent()) { - tablePaths.add( + tablePath.put( + TablePath.of(sinks.keySet().toArray(new String[0])[i]), ((CatalogTable) values.get(i).getWriteCatalogTable().get()).getTablePath()); } else { - tablePaths.add(TablePath.of(sinks.keySet().toArray(new String[0])[i])); + tablePath.put( + TablePath.of(sinks.keySet().toArray(new String[0])[i]), TablePath.DEFAULT); } + + tablePaths.add(tablePath); } return tablePaths; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java index e7b41de73d9..962e31a3ac9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java @@ -159,8 +159,12 @@ private static List getTablePaths(Action action) { } else if (action instanceof SinkAction) { SeaTunnelSink seaTunnelSink = ((SinkAction) action).getSink(); if (seaTunnelSink instanceof MultiTableSink) { - List sinkTablePaths = - new ArrayList<>(((MultiTableSink) seaTunnelSink).getSinkTables()); + List sinkTablePaths = new ArrayList<>(); + List> tables = + ((MultiTableSink) seaTunnelSink).getSinkTables(); + for (Map table : tables) { + sinkTablePaths.addAll(table.values()); + } tablePaths.addAll(sinkTablePaths); } else { Optional catalogTable = seaTunnelSink.getWriteCatalogTable(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java index eab9ecbd348..6890421f9f1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java @@ -122,14 +122,13 @@ private void initializeMetrics( } } - public void updateMetrics(Object data) { + public void updateMetrics(Object data, String tableId) { count.inc(); QPS.markEvent(); if (data instanceof SeaTunnelRow) { SeaTunnelRow row = (SeaTunnelRow) data; bytes.inc(row.getBytesSize()); bytesPerSeconds.markEvent(row.getBytesSize()); - String tableId = row.getTableId(); if (StringUtils.isNotBlank(tableId)) { String tableName = TablePath.of(tableId).getFullName(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index 7f2e34bdcb8..53d206874a5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -107,7 +107,7 @@ public void collect(T row) { "Unsupported row type: " + rowType.getClass().getName()); } flowControlGate.audit((SeaTunnelRow) row); - taskMetricsCalcContext.updateMetrics(row); + taskMetricsCalcContext.updateMetrics(row, tableId); } sendRecordToNext(new Record<>(row)); emptyThisPollNext = false; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index bce6e9f6376..5b5954b6695 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -23,12 +23,15 @@ import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SinkWriter.Context; import org.apache.seatunnel.api.sink.SupportResourceShare; import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SinkAction; @@ -54,10 +57,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; @@ -70,7 +75,7 @@ public class SinkFlowLifeCycle sinkAction; private SinkWriter writer; - private SinkWriter.Context writerContext; + private Context writerContext; private transient Optional> commitInfoSerializer; private transient Optional> writerStateSerializer; @@ -117,7 +122,11 @@ public SinkFlowLifeCycle( List sinkTables = new ArrayList<>(); boolean isMulti = sinkAction.getSink() instanceof MultiTableSink; if (isMulti) { - sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables(); + List> tablesMaps = + ((MultiTableSink) sinkAction.getSink()).getSinkTables(); + for (Map tableMap : tablesMaps) { + sinkTables.addAll(tableMap.values()); + } } this.taskMetricsCalcContext = new TaskMetricsCalcContext(metricsContext, PluginType.SINK, isMulti, sinkTables); @@ -246,8 +255,37 @@ public void received(Record record) { if (prepareClose) { return; } + AtomicReference tableId = new AtomicReference<>(); writer.write((T) record.getData()); - taskMetricsCalcContext.updateMetrics(record.getData()); + if (record.getData() instanceof SeaTunnelRow) { + if (this.sinkAction.getSink() instanceof MultiTableSink) { + List> tables = + ((MultiTableSink) this.sinkAction.getSink()).getSinkTables(); + tables.forEach( + tablePathTablePathMap -> { + tablePathTablePathMap.forEach( + (k, v) -> { + if (k.equals( + TablePath.of( + ((SeaTunnelRow) record.getData()) + .getTableId()))) { + tableId.set(v.getFullName()); + } + }); + }); + } else { + Optional writeCatalogTable = + this.sinkAction.getSink().getWriteCatalogTable(); + tableId.set( + writeCatalogTable + .map( + catalogTable -> + catalogTable.getTablePath().getFullName()) + .orElseGet(TablePath.DEFAULT::getFullName)); + } + + taskMetricsCalcContext.updateMetrics(record.getData(), tableId.get()); + } } } catch (Exception e) { throw new RuntimeException(e); From e96d28434ec9838a0aa55b41f248309f116f07e3 Mon Sep 17 00:00:00 2001 From: liugddx Date: Sat, 12 Oct 2024 22:30:18 +0800 Subject: [PATCH 04/13] 1 --- .../sink/multitablesink/MultiTableSink.java | 14 +++------- .../seatunnel/engine/server/dag/DAGUtils.java | 8 ++---- .../server/task/flow/SinkFlowLifeCycle.java | 28 ++++++++++++++----- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java index b8967a5a654..2c3e9c6582b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java @@ -158,23 +158,17 @@ public Optional> getCommitInfoSerializer() { return Optional.of(new MultiTableSinkAggregatedCommitter(aggCommitters)); } - public List> getSinkTables() { - - List> tablePaths = new ArrayList<>(); + public List getSinkTables() { + List tablePaths = new ArrayList<>(); List values = new ArrayList<>(sinks.values()); for (int i = 0; i < values.size(); i++) { - Map tablePath = new HashMap<>(); if (values.get(i).getWriteCatalogTable().isPresent()) { - tablePath.put( - TablePath.of(sinks.keySet().toArray(new String[0])[i]), + tablePaths.add( ((CatalogTable) values.get(i).getWriteCatalogTable().get()).getTablePath()); } else { - tablePath.put( - TablePath.of(sinks.keySet().toArray(new String[0])[i]), TablePath.DEFAULT); + tablePaths.add(TablePath.of(sinks.keySet().toArray(new String[0])[i])); } - - tablePaths.add(tablePath); } return tablePaths; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java index 962e31a3ac9..e7b41de73d9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java @@ -159,12 +159,8 @@ private static List getTablePaths(Action action) { } else if (action instanceof SinkAction) { SeaTunnelSink seaTunnelSink = ((SinkAction) action).getSink(); if (seaTunnelSink instanceof MultiTableSink) { - List sinkTablePaths = new ArrayList<>(); - List> tables = - ((MultiTableSink) seaTunnelSink).getSinkTables(); - for (Map table : tables) { - sinkTablePaths.addAll(table.values()); - } + List sinkTablePaths = + new ArrayList<>(((MultiTableSink) seaTunnelSink).getSinkTables()); tablePaths.addAll(sinkTablePaths); } else { Optional catalogTable = seaTunnelSink.getWriteCatalogTable(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 5b5954b6695..0873a6106ce 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -56,6 +56,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -102,6 +103,8 @@ public class SinkFlowLifeCycle> tablesMaps = new ArrayList<>(); + public SinkFlowLifeCycle( SinkAction sinkAction, TaskLocation taskLocation, @@ -122,10 +125,23 @@ public SinkFlowLifeCycle( List sinkTables = new ArrayList<>(); boolean isMulti = sinkAction.getSink() instanceof MultiTableSink; if (isMulti) { - List> tablesMaps = - ((MultiTableSink) sinkAction.getSink()).getSinkTables(); - for (Map tableMap : tablesMaps) { - sinkTables.addAll(tableMap.values()); + sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables(); + String[] keys = + ((MultiTableSink) sinkAction.getSink()) + .getSinks() + .keySet() + .toArray(new String[0]); + Map tableMap = new HashMap<>(); + for (int i = 0; i < ((MultiTableSink) sinkAction.getSink()).getSinks().size(); i++) { + tableMap.put(TablePath.of(keys[i]), sinkTables.get(i)); + } + tablesMaps.add(tableMap); + } else { + Optional catalogTable = sinkAction.getSink().getWriteCatalogTable(); + if (catalogTable.isPresent()) { + sinkTables.add(catalogTable.get().getTablePath()); + } else { + sinkTables.add(TablePath.DEFAULT); } } this.taskMetricsCalcContext = @@ -259,9 +275,7 @@ public void received(Record record) { writer.write((T) record.getData()); if (record.getData() instanceof SeaTunnelRow) { if (this.sinkAction.getSink() instanceof MultiTableSink) { - List> tables = - ((MultiTableSink) this.sinkAction.getSink()).getSinkTables(); - tables.forEach( + tablesMaps.forEach( tablePathTablePathMap -> { tablePathTablePathMap.forEach( (k, v) -> { From dcb7b883855930d3db8496b2d39fbe0db715dec9 Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 13 Oct 2024 00:10:36 +0800 Subject: [PATCH 05/13] 1 --- .../server/task/flow/SinkFlowLifeCycle.java | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 0873a6106ce..a38373c1ac9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -275,18 +275,25 @@ public void received(Record record) { writer.write((T) record.getData()); if (record.getData() instanceof SeaTunnelRow) { if (this.sinkAction.getSink() instanceof MultiTableSink) { - tablesMaps.forEach( - tablePathTablePathMap -> { - tablePathTablePathMap.forEach( - (k, v) -> { - if (k.equals( - TablePath.of( - ((SeaTunnelRow) record.getData()) - .getTableId()))) { - tableId.set(v.getFullName()); - } - }); - }); + if (((SeaTunnelRow) record.getData()).getTableId().isEmpty()) { + tableId.set(TablePath.DEFAULT.getFullName()); + } else { + tablesMaps.forEach( + tablePathTablePathMap -> { + tablePathTablePathMap.forEach( + (k, v) -> { + if (k.equals( + TablePath.of( + ((SeaTunnelRow) + record + .getData()) + .getTableId()))) { + tableId.set(v.getFullName()); + } + }); + }); + } + } else { Optional writeCatalogTable = this.sinkAction.getSink().getWriteCatalogTable(); From 22a177236e8b4dac7c1f6ae57b16281c6622625f Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 13 Oct 2024 08:46:46 +0800 Subject: [PATCH 06/13] 1 --- .../seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index a38373c1ac9..59034670237 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -276,7 +276,7 @@ public void received(Record record) { if (record.getData() instanceof SeaTunnelRow) { if (this.sinkAction.getSink() instanceof MultiTableSink) { if (((SeaTunnelRow) record.getData()).getTableId().isEmpty()) { - tableId.set(TablePath.DEFAULT.getFullName()); + tableId.set(((SeaTunnelRow) record.getData()).getTableId()); } else { tablesMaps.forEach( tablePathTablePathMap -> { From 5569945ab7cece024f7b9e819583f85ca2a057f9 Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 13 Oct 2024 22:03:13 +0800 Subject: [PATCH 07/13] 1 --- .../seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 59034670237..3a2df302632 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -275,7 +275,8 @@ public void received(Record record) { writer.write((T) record.getData()); if (record.getData() instanceof SeaTunnelRow) { if (this.sinkAction.getSink() instanceof MultiTableSink) { - if (((SeaTunnelRow) record.getData()).getTableId().isEmpty()) { + if (((SeaTunnelRow) record.getData()).getTableId() == null + || ((SeaTunnelRow) record.getData()).getTableId().isEmpty()) { tableId.set(((SeaTunnelRow) record.getData()).getTableId()); } else { tablesMaps.forEach( From 18f7f0284048d46feb6abe7da3cc6f68f45765ff Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 13 Oct 2024 22:16:37 +0800 Subject: [PATCH 08/13] 1 --- .../java/org/apache/seatunnel/engine/e2e/RestApiIT.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index 8e5b15cc3d2..386c47aae8a 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -369,6 +369,14 @@ public void testGetJobInfoByJobId() { .body( "jobDag.vertexInfoMap[1].tablePaths[0]", equalTo("fake")) + .body( + "metrics.TableSourceReceivedCount.fake", + equalTo("5")) + .body( + "metrics.TableSinkWriteCount.fake", + equalTo("5")) + .body("metrics.SinkWriteCount", equalTo("5")) + .body("metrics.SourceReceivedCount", equalTo("5")) .body("jobName", equalTo("fake_to_console")) .body("jobStatus", equalTo("FINISHED")); }); From 23eba00ea2c1517e73c44c9ab3e7dafda4d6ef05 Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 13 Oct 2024 23:02:00 +0800 Subject: [PATCH 09/13] 1 --- .../seatunnel/engine/e2e/RestApiIT.java | 134 ++++++++++++++++++ .../rest/RestHttpGetCommandProcessor.java | 10 +- .../server/rest/servlet/BaseServlet.java | 10 +- 3 files changed, 152 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index 386c47aae8a..abc7798534c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -280,6 +280,49 @@ public void testGetRunningJobs() { + RestConstant.RUNNING_JOBS_URL) .then() .statusCode(200) + .body( + "[0].jobDag.jobId", + equalTo( + Long.toString( + clientJobProxy + .getJobId()))) + .body("[0].jobDag.pipelineEdges", hasKey("1")) + .body( + "[0].jobDag.pipelineEdges['1']", + hasSize(1)) + .body( + "[0].jobDag.pipelineEdges['1'][0].inputVertexId", + equalTo("1")) + .body( + "[0].jobDag.pipelineEdges['1'][0].targetVertexId", + equalTo("2")) + .body("[0].jobDag.vertexInfoMap", hasSize(2)) + .body( + "[0].jobDag.vertexInfoMap[0].vertexId", + equalTo(1)) + .body( + "[0].jobDag.vertexInfoMap[0].type", + equalTo("source")) + .body( + "[0].jobDag.vertexInfoMap[0].vertexName", + equalTo( + "pipeline-1 [Source[0]-FakeSource]")) + .body( + "[0].jobDag.vertexInfoMap[0].tablePaths[0]", + equalTo("fake")) + .body( + "[0].jobDag.vertexInfoMap[1].vertexId", + equalTo(2)) + .body( + "[0].jobDag.vertexInfoMap[1].type", + equalTo("sink")) + .body( + "[0].jobDag.vertexInfoMap[1].vertexName", + equalTo( + "pipeline-1 [Sink[0]-LocalFile-MultiTableSink]")) + .body( + "[0].jobDag.vertexInfoMap[1].tablePaths[0]", + equalTo("fake")) .body("[0].jobName", equalTo("fake_to_file")) .body("[0].jobStatus", equalTo("RUNNING")); @@ -293,6 +336,49 @@ public void testGetRunningJobs() { + RestConstant.RUNNING_JOBS_URL) .then() .statusCode(200) + .body( + "[0].jobDag.jobId", + equalTo( + Long.toString( + clientJobProxy + .getJobId()))) + .body("[0].jobDag.pipelineEdges", hasKey("1")) + .body( + "[0].jobDag.pipelineEdges['1']", + hasSize(1)) + .body( + "[0].jobDag.pipelineEdges['1'][0].inputVertexId", + equalTo("1")) + .body( + "[0].jobDag.pipelineEdges['1'][0].targetVertexId", + equalTo("2")) + .body("[0].jobDag.vertexInfoMap", hasSize(2)) + .body( + "[0].jobDag.vertexInfoMap[0].vertexId", + equalTo(1)) + .body( + "[0].jobDag.vertexInfoMap[0].type", + equalTo("source")) + .body( + "[0].jobDag.vertexInfoMap[0].vertexName", + equalTo( + "pipeline-1 [Source[0]-FakeSource]")) + .body( + "[0].jobDag.vertexInfoMap[0].tablePaths[0]", + equalTo("fake")) + .body( + "[0].jobDag.vertexInfoMap[1].vertexId", + equalTo(2)) + .body( + "[0].jobDag.vertexInfoMap[1].type", + equalTo("sink")) + .body( + "[0].jobDag.vertexInfoMap[1].vertexName", + equalTo( + "pipeline-1 [Sink[0]-LocalFile-MultiTableSink]")) + .body( + "[0].jobDag.vertexInfoMap[1].tablePaths[0]", + equalTo("fake")) .body("[0].jobName", equalTo("fake_to_file")) .body("[0].jobStatus", equalTo("RUNNING")); })); @@ -314,6 +400,54 @@ public void testGetJobInfoByJobId() { + batchJobProxy.getJobId()) .then() .statusCode(200) + .body( + "jobDag.jobId", + equalTo( + Long.toString( + batchJobProxy.getJobId()))) + .body("jobDag.pipelineEdges", hasKey("1")) + .body("jobDag.pipelineEdges['1']", hasSize(1)) + .body( + "jobDag.pipelineEdges['1'][0].inputVertexId", + equalTo("1")) + .body( + "jobDag.pipelineEdges['1'][0].targetVertexId", + equalTo("2")) + .body("jobDag.vertexInfoMap", hasSize(2)) + .body( + "jobDag.vertexInfoMap[0].vertexId", + equalTo(1)) + .body( + "jobDag.vertexInfoMap[0].type", + equalTo("source")) + .body( + "jobDag.vertexInfoMap[0].vertexName", + equalTo( + "pipeline-1 [Source[0]-FakeSource]")) + .body( + "jobDag.vertexInfoMap[0].tablePaths[0]", + equalTo("fake")) + .body( + "jobDag.vertexInfoMap[1].vertexId", + equalTo(2)) + .body( + "jobDag.vertexInfoMap[1].type", + equalTo("sink")) + .body( + "jobDag.vertexInfoMap[1].vertexName", + equalTo( + "pipeline-1 [Sink[0]-console-MultiTableSink]")) + .body( + "jobDag.vertexInfoMap[1].tablePaths[0]", + equalTo("fake")) + .body( + "metrics.TableSourceReceivedCount.fake", + equalTo("5")) + .body( + "metrics.TableSinkWriteCount.fake", + equalTo("5")) + .body("metrics.SinkWriteCount", equalTo("5")) + .body("metrics.SourceReceivedCount", equalTo("5")) .body("jobName", equalTo("fake_to_console")) .body("jobStatus", equalTo("FINISHED")); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index d052629f2e0..8935ee11ae8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.core.job.JobInfo; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.dag.DAGUtils; import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor; import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState; import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation; @@ -692,6 +693,13 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) { jobStatus = seaTunnelServer.getCoordinatorService().getJobStatus(jobId); } + JobDAGInfo jobDAGInfo = + DAGUtils.getJobDAGInfo( + logicalDag, + jobImmutableInformation, + getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(), + true); + jobInfoJson .add(RestConstant.JOB_ID, String.valueOf(jobId)) .add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName()) @@ -704,7 +712,7 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) { DateTimeUtils.toString( jobImmutableInformation.getCreateTime(), DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)) - .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson()) + .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject()) .add( RestConstant.PLUGIN_JARS_URLS, (JsonValue) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java index 15cd4eaecce..c1f126f4226 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java @@ -37,6 +37,7 @@ import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.CoordinatorService; import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.dag.DAGUtils; import org.apache.seatunnel.engine.server.master.JobHistoryService; import org.apache.seatunnel.engine.server.operation.CancelJobOperation; import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation; @@ -182,6 +183,13 @@ nodeEngine, new GetJobStatusOperation(jobId)) jobStatus = seaTunnelServer.getCoordinatorService().getJobStatus(jobId); } + JobDAGInfo jobDAGInfo = + DAGUtils.getJobDAGInfo( + logicalDag, + jobImmutableInformation, + getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(), + true); + jobInfoJson .add(RestConstant.JOB_ID, String.valueOf(jobId)) .add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName()) @@ -194,7 +202,7 @@ nodeEngine, new GetJobStatusOperation(jobId)) DateTimeUtils.toString( jobImmutableInformation.getCreateTime(), DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)) - .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson()) + .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject()) .add( RestConstant.PLUGIN_JARS_URLS, (JsonValue) From 0527a92df0381235b48dca33084b15da0b26908e Mon Sep 17 00:00:00 2001 From: liugddx Date: Mon, 14 Oct 2024 22:19:12 +0800 Subject: [PATCH 10/13] 1 --- .../server/task/flow/SinkFlowLifeCycle.java | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 3a2df302632..fef51859509 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -63,7 +63,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; @@ -103,7 +102,8 @@ public class SinkFlowLifeCycle> tablesMaps = new ArrayList<>(); + /** Mapping relationship between upstream tablepath and downstream tablepath. */ + private final Map tablesMaps = new HashMap<>(); public SinkFlowLifeCycle( SinkAction sinkAction, @@ -126,16 +126,14 @@ public SinkFlowLifeCycle( boolean isMulti = sinkAction.getSink() instanceof MultiTableSink; if (isMulti) { sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables(); - String[] keys = + String[] upstreamTablePaths = ((MultiTableSink) sinkAction.getSink()) .getSinks() .keySet() .toArray(new String[0]); - Map tableMap = new HashMap<>(); for (int i = 0; i < ((MultiTableSink) sinkAction.getSink()).getSinks().size(); i++) { - tableMap.put(TablePath.of(keys[i]), sinkTables.get(i)); + tablesMaps.put(TablePath.of(upstreamTablePaths[i]), sinkTables.get(i)); } - tablesMaps.add(tableMap); } else { Optional catalogTable = sinkAction.getSink().getWriteCatalogTable(); if (catalogTable.isPresent()) { @@ -271,42 +269,36 @@ public void received(Record record) { if (prepareClose) { return; } - AtomicReference tableId = new AtomicReference<>(); + String tableId = ""; writer.write((T) record.getData()); if (record.getData() instanceof SeaTunnelRow) { if (this.sinkAction.getSink() instanceof MultiTableSink) { if (((SeaTunnelRow) record.getData()).getTableId() == null || ((SeaTunnelRow) record.getData()).getTableId().isEmpty()) { - tableId.set(((SeaTunnelRow) record.getData()).getTableId()); + tableId = ((SeaTunnelRow) record.getData()).getTableId(); } else { - tablesMaps.forEach( - tablePathTablePathMap -> { - tablePathTablePathMap.forEach( - (k, v) -> { - if (k.equals( - TablePath.of( - ((SeaTunnelRow) - record - .getData()) - .getTableId()))) { - tableId.set(v.getFullName()); - } - }); - }); + + tableId = + tablesMaps + .get( + TablePath.of( + ((SeaTunnelRow) record.getData()) + .getTableId())) + .getFullName(); } } else { Optional writeCatalogTable = this.sinkAction.getSink().getWriteCatalogTable(); - tableId.set( + tableId = writeCatalogTable .map( catalogTable -> catalogTable.getTablePath().getFullName()) - .orElseGet(TablePath.DEFAULT::getFullName)); + .orElseGet(TablePath.DEFAULT::getFullName); } - taskMetricsCalcContext.updateMetrics(record.getData(), tableId.get()); + taskMetricsCalcContext.updateMetrics(record.getData(), tableId); } } } catch (Exception e) { From 5152c0f536fde025a5b819f1fbb91b8bf514f4dd Mon Sep 17 00:00:00 2001 From: liugddx Date: Tue, 15 Oct 2024 08:22:13 +0800 Subject: [PATCH 11/13] 1 --- .../engine/server/task/flow/SinkFlowLifeCycle.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index fef51859509..3a9a5c78c30 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -278,13 +278,15 @@ public void received(Record record) { tableId = ((SeaTunnelRow) record.getData()).getTableId(); } else { + TablePath tablePath = + tablesMaps.get( + TablePath.of( + ((SeaTunnelRow) record.getData()) + .getTableId())); tableId = - tablesMaps - .get( - TablePath.of( - ((SeaTunnelRow) record.getData()) - .getTableId())) - .getFullName(); + tablePath != null + ? tablePath.getFullName() + : TablePath.DEFAULT.getFullName(); } } else { From b1b6b260719a2f81073331d5efed893944b2d127 Mon Sep 17 00:00:00 2001 From: liugddx Date: Tue, 15 Oct 2024 13:12:22 +0800 Subject: [PATCH 12/13] 1 --- docs/en/seatunnel-engine/rest-api-v1.md | 18 +++++++++++++--- docs/en/seatunnel-engine/rest-api-v2.md | 18 +++++++++++++--- docs/zh/seatunnel-engine/rest-api-v1.md | 21 +++++++++++++++---- docs/zh/seatunnel-engine/rest-api-v2.md | 20 +++++++++++++----- .../seatunnel/engine/e2e/RestApiIT.java | 18 ++++++++++++++++ .../seatunnel/engine/core/job/JobDAGInfo.java | 4 ++++ .../seatunnel/engine/server/dag/DAGUtils.java | 10 +++++++-- .../rest/RestHttpGetCommandProcessor.java | 3 --- .../server/rest/servlet/BaseServlet.java | 8 +++---- .../server/rest/servlet/JobInfoServlet.java | 14 ++++++------- 10 files changed, 103 insertions(+), 31 deletions(-) diff --git a/docs/en/seatunnel-engine/rest-api-v1.md b/docs/en/seatunnel-engine/rest-api-v1.md index ec9d8f13b9b..aaefcbc5faf 100644 --- a/docs/en/seatunnel-engine/rest-api-v1.md +++ b/docs/en/seatunnel-engine/rest-api-v1.md @@ -121,10 +121,19 @@ network: }, "createTime": "", "jobDag": { - "vertices": [ + "jobId": "", + "envOptions": [], + "vertexInfoMap": [ + { + "vertexId": 1, + "type": "", + "vertexName": "", + "tablePaths": [ + "" + ] + } ], - "edges": [ - ] + "pipelineEdges": {} }, "pluginJarsUrls": [ ], @@ -162,6 +171,7 @@ network: "createTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -227,6 +237,7 @@ This API has been deprecated, please use /hazelcast/rest/maps/job-info/:jobId in "createTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -307,6 +318,7 @@ When we can't get the job info, the response will be: "finishTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, diff --git a/docs/en/seatunnel-engine/rest-api-v2.md b/docs/en/seatunnel-engine/rest-api-v2.md index e5b9d5d718d..1e7cf10d4e6 100644 --- a/docs/en/seatunnel-engine/rest-api-v2.md +++ b/docs/en/seatunnel-engine/rest-api-v2.md @@ -88,10 +88,19 @@ seatunnel: }, "createTime": "", "jobDag": { - "vertices": [ + "jobId": "", + "envOptions": [], + "vertexInfoMap": [ + { + "vertexId": 1, + "type": "", + "vertexName": "", + "tablePaths": [ + "" + ] + } ], - "edges": [ - ] + "pipelineEdges": {} }, "pluginJarsUrls": [ ], @@ -129,6 +138,7 @@ seatunnel: "createTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -194,6 +204,7 @@ This API has been deprecated, please use /job-info/:jobId instead "createTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -274,6 +285,7 @@ When we can't get the job info, the response will be: "finishTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md b/docs/zh/seatunnel-engine/rest-api-v1.md index 5154922ec07..a59c6bbde5f 100644 --- a/docs/zh/seatunnel-engine/rest-api-v1.md +++ b/docs/zh/seatunnel-engine/rest-api-v1.md @@ -119,10 +119,19 @@ network: }, "createTime": "", "jobDag": { - "vertices": [ + "jobId": "", + "envOptions": [], + "vertexInfoMap": [ + { + "vertexId": 1, + "type": "", + "vertexName": "", + "tablePaths": [ + "" + ] + } ], - "edges": [ - ] + "pipelineEdges": {} }, "pluginJarsUrls": [ ], @@ -160,6 +169,7 @@ network: "createTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -239,6 +249,7 @@ network: "createTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -305,6 +316,7 @@ network: "finishTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -316,7 +328,8 @@ network: } ], "pipelineEdges": {} - }, "metrics": "" + }, + "metrics": "" } ] ``` diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md b/docs/zh/seatunnel-engine/rest-api-v2.md index df884fa18ec..75a03f93fdb 100644 --- a/docs/zh/seatunnel-engine/rest-api-v2.md +++ b/docs/zh/seatunnel-engine/rest-api-v2.md @@ -80,14 +80,21 @@ seatunnel: "jobId": "", "jobName": "", "jobStatus": "", - "envOptions": { - }, "createTime": "", "jobDag": { - "vertices": [ + "jobId": "", + "envOptions": [], + "vertexInfoMap": [ + { + "vertexId": 1, + "type": "", + "vertexName": "", + "tablePaths": [ + "" + ] + } ], - "edges": [ - ] + "pipelineEdges": {} }, "pluginJarsUrls": [ ], @@ -125,6 +132,7 @@ seatunnel: "createTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -204,6 +212,7 @@ seatunnel: "createTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, @@ -270,6 +279,7 @@ seatunnel: "finishTime": "", "jobDag": { "jobId": "", + "envOptions": [], "vertexInfoMap": [ { "vertexId": 1, diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index abc7798534c..2abc6ba175f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -323,6 +323,12 @@ public void testGetRunningJobs() { .body( "[0].jobDag.vertexInfoMap[1].tablePaths[0]", equalTo("fake")) + .body( + "[0].jobDag.envOptions.'job.mode'", + equalTo("STREAMING")) + .body( + "[0].jobDag.envOptions.'checkpoint.interval'", + equalTo("5000")) .body("[0].jobName", equalTo("fake_to_file")) .body("[0].jobStatus", equalTo("RUNNING")); @@ -379,6 +385,12 @@ public void testGetRunningJobs() { .body( "[0].jobDag.vertexInfoMap[1].tablePaths[0]", equalTo("fake")) + .body( + "[0].jobDag.envOptions.'job.mode'", + equalTo("STREAMING")) + .body( + "[0].jobDag.envOptions.'checkpoint.interval'", + equalTo("5000")) .body("[0].jobName", equalTo("fake_to_file")) .body("[0].jobStatus", equalTo("RUNNING")); })); @@ -448,6 +460,9 @@ public void testGetJobInfoByJobId() { equalTo("5")) .body("metrics.SinkWriteCount", equalTo("5")) .body("metrics.SourceReceivedCount", equalTo("5")) + .body( + "[0].jobDag.envOptions.'job.mode'", + equalTo("BATCH")) .body("jobName", equalTo("fake_to_console")) .body("jobStatus", equalTo("FINISHED")); @@ -511,6 +526,9 @@ public void testGetJobInfoByJobId() { equalTo("5")) .body("metrics.SinkWriteCount", equalTo("5")) .body("metrics.SourceReceivedCount", equalTo("5")) + .body( + "[0].jobDag.envOptions.'job.mode'", + equalTo("BATCH")) .body("jobName", equalTo("fake_to_console")) .body("jobStatus", equalTo("FINISHED")); }); diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java index ee6326acbde..aea57beef33 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java @@ -21,6 +21,7 @@ import com.hazelcast.internal.json.JsonArray; import com.hazelcast.internal.json.JsonObject; +import com.hazelcast.internal.util.JsonUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -34,6 +35,7 @@ @Data public class JobDAGInfo implements Serializable { Long jobId; + Map envOptions; Map> pipelineEdges; Map vertexInfoMap; @@ -54,6 +56,8 @@ public JsonObject toJsonObject() { JsonObject jsonObject = new JsonObject(); jsonObject.add("jobId", jobId.toString()); jsonObject.add("pipelineEdges", pipelineEdgesJsonObject); + jsonObject.add("envOptions", JsonUtil.toJsonObject(envOptions)); + JsonArray vertexInfoMapString = new JsonArray(); for (Map.Entry entry : vertexInfoMap.entrySet()) { JsonObject vertexInfoJsonObj = new JsonObject(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java index e7b41de73d9..bbe6d77e00f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java @@ -86,7 +86,10 @@ public static JobDAGInfo getJobDAGInfo( }); }); return new JobDAGInfo( - jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap); + jobImmutableInformation.getJobId(), + logicalDag.getJobConfig().getEnvOptions(), + pipelineWithEdges, + vertexInfoMap); } else { // Generate LogicalPlan DAG List edges = @@ -130,7 +133,10 @@ public static JobDAGInfo getJobDAGInfo( }, Collectors.toList())); return new JobDAGInfo( - jobImmutableInformation.getJobId(), pipelineWithEdges, vertexInfoMap); + jobImmutableInformation.getJobId(), + logicalDag.getJobConfig().getEnvOptions(), + pipelineWithEdges, + vertexInfoMap); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 8935ee11ae8..b860dbc1c74 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -704,9 +704,6 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) { .add(RestConstant.JOB_ID, String.valueOf(jobId)) .add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName()) .add(RestConstant.JOB_STATUS, jobStatus.toString()) - .add( - RestConstant.ENV_OPTIONS, - JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions())) .add( RestConstant.CREATE_TIME, DateTimeUtils.toString( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java index c1f126f4226..5553e2c85ec 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java @@ -218,7 +218,7 @@ nodeEngine, new GetJobStatusOperation(jobId)) .add( RestConstant.IS_START_WITH_SAVE_POINT, jobImmutableInformation.isStartWithSavePoint()) - .add(RestConstant.METRICS, toJsonObject(getJobMetrics(jobMetrics))); + .add(RestConstant.METRICS, metricsToJsonObject(getJobMetrics(jobMetrics))); return jobInfoJson; } @@ -296,7 +296,7 @@ protected JsonObject getJobInfoJson( DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)) .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject()) .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray()) - .add(RestConstant.METRICS, toJsonObject(getJobMetrics(jobMetrics))); + .add(RestConstant.METRICS, metricsToJsonObject(getJobMetrics(jobMetrics))); } private Map getJobMetrics(String jobMetrics) { @@ -584,12 +584,12 @@ private void submitJob( voidPassiveCompletableFuture.join(); } - private JsonObject toJsonObject(Map jobMetrics) { + private JsonObject metricsToJsonObject(Map jobMetrics) { JsonObject members = new JsonObject(); jobMetrics.forEach( (key, value) -> { if (value instanceof Map) { - members.add(key, toJsonObject((Map) value)); + members.add(key, metricsToJsonObject((Map) value)); } else { members.add(key, value.toString()); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java index 16683e68265..d41635a9eb2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java @@ -48,18 +48,18 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) if (jobId != null && jobId.length() > 1) { jobId = jobId.substring(1); } else { - jobId = ""; + throw new IllegalArgumentException("The jobId must not be empty."); } IMap jobInfoMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO); - JobInfo jobInfo = (JobInfo) jobInfoMap.get(Long.valueOf(jobId)); + Object jobInfo = jobInfoMap.get(Long.valueOf(jobId)); IMap finishedJobStateMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE); - JobState finishedJobState = (JobState) finishedJobStateMap.get(Long.valueOf(jobId)); - if (!jobId.isEmpty() && jobInfo != null) { - writeJson(resp, convertToJson(jobInfo, Long.parseLong(jobId))); - } else if (!jobId.isEmpty() && finishedJobState != null) { + Object finishedJobState = finishedJobStateMap.get(Long.valueOf(jobId)); + if (jobInfo != null) { + writeJson(resp, convertToJson((JobInfo) jobInfo, Long.parseLong(jobId))); + } else if (finishedJobState != null) { JobMetrics finishedJobMetrics = (JobMetrics) nodeEngine @@ -75,7 +75,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) writeJson( resp, getJobInfoJson( - finishedJobState, + (JobState) finishedJobState, finishedJobMetrics.toJsonString(), finishedJobDAGInfo)); } else { From 50dcf4ce0908f1da828c532b7cf89fd1fe2d8662 Mon Sep 17 00:00:00 2001 From: liugddx Date: Tue, 15 Oct 2024 15:17:51 +0800 Subject: [PATCH 13/13] 1 --- .../test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index 2abc6ba175f..0dd90edddad 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -461,7 +461,7 @@ public void testGetJobInfoByJobId() { .body("metrics.SinkWriteCount", equalTo("5")) .body("metrics.SourceReceivedCount", equalTo("5")) .body( - "[0].jobDag.envOptions.'job.mode'", + "jobDag.envOptions.'job.mode'", equalTo("BATCH")) .body("jobName", equalTo("fake_to_console")) .body("jobStatus", equalTo("FINISHED")); @@ -527,7 +527,7 @@ public void testGetJobInfoByJobId() { .body("metrics.SinkWriteCount", equalTo("5")) .body("metrics.SourceReceivedCount", equalTo("5")) .body( - "[0].jobDag.envOptions.'job.mode'", + "jobDag.envOptions.'job.mode'", equalTo("BATCH")) .body("jobName", equalTo("fake_to_console")) .body("jobStatus", equalTo("FINISHED"));