Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36851][table] Introduce TemporalProcessTimeJoinOperator in TemporalJoin with Async State API #25777

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGenUtils;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
Expand All @@ -50,6 +51,7 @@
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
import org.apache.flink.table.runtime.operators.join.temporal.asyncprocessing.AsyncStateTemporalProcessTimeJoinOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
Expand All @@ -72,6 +74,7 @@
name = "stream-exec-temporal-join",
version = 1,
producedTransformations = StreamExecTemporalJoin.TEMPORAL_JOIN_TRANSFORMATION,
consumedOptions = {"table.exec.async-state.enabled"},
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
Expand Down Expand Up @@ -276,12 +279,21 @@ private TwoInputStreamOperator<RowData, RowData, RowData> createJoinOperator(
isLeftOuterJoin);
} else {
if (isTemporalFunctionJoin) {
return new TemporalProcessTimeJoinOperator(
InternalTypeInfo.of(rightInputType),
generatedJoinCondition,
minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the other PR, As a heads up, changes like this should update consumedOptions for the JSON serialization.

(Sorry for splitting the conversation across PRs.)

Copy link
Author

@Au-Miner Au-Miner Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The settings related to this configuration have been updated in #25320.

I don't quite understand the meaning of what you mean by 'update consumedOptions for the JSON serialization'. I would greatly appreciate it if you could give me some more tips.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotations here: https://github.com/apache/flink/pull/25777/files#diff-76717e3a4ed9896387a0961a4ff297ccd8a7dd4ef19b49605cf0755fb1d0d1dbR73-R78 show provide information about how to serialize this node to JSON.

Checkout other examples and you'll see a field named consumedOptions which tells which options are used the node.

If I understand, you are adding a new option which is used by this class. As such, you need to advertise it in the annotation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thanks. I have added the corresponding explanation for consumedOptions.

return new AsyncStateTemporalProcessTimeJoinOperator(
Copy link
Contributor

@davidradl davidradl Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Jira mentions SQL and table and the component in the PR title mentions [TABLE], but the fix and test appear to be datastream only. Is there an intent that this effects table API and SQL?

Please could you update the documentation -as this is new changes to the API.

Copy link
Author

@Au-Miner Au-Miner Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. This PR is for completing a part of Flip-473. Its intention is similar to #25320 .

The original API has not been changed, it will only switch the TemporalProcessTimeJoinOperator under the FLINK-TABLE module to be executed by Asynchronous StateTemporalProcessTimeJoinOperator when table. exec.async-state.enable is set to true.

Is there anything else that needs to be modified? If so, please let me know. Thank you

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. This PR is for completing a part of Flip-473. Its intention is similar to #25320 .

The original API has not been changed, it will only switch the TemporalProcessTimeJoinOperator under the FLINK-TABLE module to be executed by Asynchronous StateTemporalProcessTimeJoinOperator when table. exec.async-state.enable is set to true.

Is there anything else that needs to be modified? If so, please let me know. Thank you

ok thanks for the clarification

InternalTypeInfo.of(rightInputType),
generatedJoinCondition,
minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
} else {
return new TemporalProcessTimeJoinOperator(
InternalTypeInfo.of(rightInputType),
generatedJoinCondition,
minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
}
} else {
// The exsiting TemporalProcessTimeJoinOperator has already supported temporal table
// join.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.factories.TestValuesTableFactory.{getResultsAsStrings, registerData}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.utils.LegacyRowExtension
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
import org.apache.flink.types.Row

import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
Expand All @@ -34,11 +34,22 @@ import org.junit.jupiter.api.extension.{ExtendWith, RegisterExtension}

import java.time.LocalDateTime
import java.time.format.DateTimeParseException
import java.util

import scala.collection.JavaConversions._

@ExtendWith(Array(classOf[ParameterizedTestExtension]))
class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(state) {
class TemporalJoinITCase(state: StateBackendMode, enableAsyncState: Boolean)
extends StreamingWithStateTestBase(state) {

@BeforeEach
override def before(): Unit = {
super.before()

tEnv.getConfig.set(
ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED,
Boolean.box(enableAsyncState))
}

@RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] =
new EachCallbackWrapper[LegacyRowExtension](new LegacyRowExtension)
Expand Down Expand Up @@ -795,3 +806,14 @@ class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTest
TestValuesTableFactory.changelogRow(kind, objects.toArray: _*)
}
}

object TemporalJoinITCase {

@Parameters(name = "StateBackend={0}, EnableAsyncState={1}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](
Array(HEAP_BACKEND, Boolean.box(false)),
Array(HEAP_BACKEND, Boolean.box(true)),
Array(ROCKSDB_BACKEND, Boolean.box(false)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static JoinRecordAsyncStateView create(
inputSideSpec.getUniqueKeySelector(),
ttlConfig);
}

} else {
return new InputSideHasNoUniqueKey(ctx, stateName, recordType, ttlConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import java.util.Optional;

/**
* An abstract {@link TwoInputStreamOperator} that allows its subclasses to clean up their state
* based on a TTL. This TTL should be specified in the provided {@code minRetentionTime} and {@code
* maxRetentionTime}.
* An abstract {@link TwoInputStreamOperator} that allows its subclasses to clean up their state in
* sync state based on a TTL. This TTL should be specified in the provided {@code minRetentionTime}
* and {@code maxRetentionTime}.
*
* <p>For each known key, this operator registers a timer (in processing time) to fire after the TTL
* expires. When the timer fires, the subclass can decide which state to cleanup and what further
Expand Down Expand Up @@ -115,7 +115,7 @@ protected void registerProcessingCleanupTimer() throws IOException {
Optional<Long> currentCleanupTime =
Optional.ofNullable(latestRegisteredCleanupTimer.value());

if (!currentCleanupTime.isPresent()
if (currentCleanupTime.isEmpty()
|| (currentProcessingTime + minRetentionTime) > currentCleanupTime.get()) {

updateCleanupTimer(currentProcessingTime, currentCleanupTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.operators.join.temporal.utils.TemporalProcessTimeJoinHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

/**
* The operator to temporal join a stream on processing time.
* The operator to temporal join a stream on processing time in sync state.
*
* <p>For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) and temporal table
* join (FOR SYSTEM_TIME AS OF), they can reuse same processing-time operator implementation, the
Expand All @@ -60,6 +61,8 @@ public class TemporalProcessTimeJoinOperator extends BaseTwoInputStreamOperatorW
private transient GenericRowData rightNullRow;
private transient TimestampedCollector<RowData> collector;

private transient SyncStateTemporalProcessTimeJoinHelper temporalProcessTimeJoinHelper;

public TemporalProcessTimeJoinOperator(
InternalTypeInfo<RowData> rightType,
GeneratedJoinCondition generatedJoinCondition,
Expand Down Expand Up @@ -88,38 +91,21 @@ public void open() throws Exception {
this.rightNullRow = new GenericRowData(rightType.toRowSize());
// consider watermark from left stream only.
super.processWatermark2(Watermark.MAX_WATERMARK);
this.temporalProcessTimeJoinHelper = new SyncStateTemporalProcessTimeJoinHelper();
}

@Override
public void processElement1(StreamRecord<RowData> element) throws Exception {
RowData leftSideRow = element.getValue();
RowData rightSideRow = rightState.value();

if (rightSideRow == null) {
if (isLeftOuterJoin) {
collectJoinedRow(leftSideRow, rightNullRow);
} else {
return;
}
} else {
if (joinCondition.apply(leftSideRow, rightSideRow)) {
collectJoinedRow(leftSideRow, rightSideRow);
} else {
if (isLeftOuterJoin) {
collectJoinedRow(leftSideRow, rightNullRow);
}
}
temporalProcessTimeJoinHelper.processElement1(leftSideRow, rightSideRow);
if (rightSideRow != null) {
// register a cleanup timer only if the rightSideRow is not null
registerProcessingCleanupTimer();
}
}

private void collectJoinedRow(RowData leftRow, RowData rightRow) {
outRow.setRowKind(leftRow.getRowKind());
outRow.replace(leftRow, rightRow);
collector.collect(outRow);
}

@Override
public void processElement2(StreamRecord<RowData> element) throws Exception {
if (RowDataUtil.isAccumulateMsg(element.getValue())) {
Expand Down Expand Up @@ -150,4 +136,10 @@ public void cleanupState(long time) {
/** Invoked when an event-time timer fires. */
@Override
public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {}

private class SyncStateTemporalProcessTimeJoinHelper extends TemporalProcessTimeJoinHelper {
public SyncStateTemporalProcessTimeJoinHelper() {
super(isLeftOuterJoin, joinCondition, outRow, rightNullRow, collector);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.join.temporal.asyncprocessing;

import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.operators.join.temporal.utils.TemporalProcessTimeJoinHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

/**
* The operator to temporal join a stream on processing time in async state.
*
* <p>For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) and temporal table
* join (FOR SYSTEM_TIME AS OF), they can reuse same processing-time operator implementation, the
* differences between them are: (1) The temporal TableFunction join only supports single column in
* primary key but temporal table join supports arbitrary columns in primary key. (2) The temporal
* TableFunction join only supports inner join, temporal table join supports both inner join and
* left outer join.
*/
public class AsyncStateTemporalProcessTimeJoinOperator
extends BaseTwoInputAsyncStateStreamOperatorWithStateRetention {

private static final long serialVersionUID = 1L;

private final boolean isLeftOuterJoin;
private final InternalTypeInfo<RowData> rightType;
private final GeneratedJoinCondition generatedJoinCondition;

private transient ValueState<RowData> rightState;
private transient JoinCondition joinCondition;

private transient JoinedRowData outRow;
private transient GenericRowData rightNullRow;
private transient TimestampedCollector<RowData> collector;

private transient AsyncStateTemporalProcessTimeJoinHelper temporalProcessTimeJoinHelper;

public AsyncStateTemporalProcessTimeJoinOperator(
InternalTypeInfo<RowData> rightType,
GeneratedJoinCondition generatedJoinCondition,
long minRetentionTime,
long maxRetentionTime,
boolean isLeftOuterJoin) {
super(minRetentionTime, maxRetentionTime);
this.rightType = rightType;
this.generatedJoinCondition = generatedJoinCondition;
this.isLeftOuterJoin = isLeftOuterJoin;
}

@Override
public void open() throws Exception {
super.open();
this.joinCondition =
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
FunctionUtils.setFunctionRuntimeContext(joinCondition, getRuntimeContext());
FunctionUtils.openFunction(joinCondition, DefaultOpenContext.INSTANCE);

ValueStateDescriptor<RowData> rightStateDesc =
new ValueStateDescriptor<>("right", rightType);
this.rightState = getRuntimeContext().getValueState(rightStateDesc);
this.collector = new TimestampedCollector<>(output);
this.outRow = new JoinedRowData();
this.rightNullRow = new GenericRowData(rightType.toRowSize());
// consider watermark from left stream only.
super.processWatermark2(Watermark.MAX_WATERMARK);
this.temporalProcessTimeJoinHelper = new AsyncStateTemporalProcessTimeJoinHelper();
}

@Override
public void processElement1(StreamRecord<RowData> element) throws Exception {
RowData leftSideRow = element.getValue();
// RowData rightSideRow = rightState.value();
StateFuture<RowData> rightSideRowFuture = rightState.asyncValue();

rightSideRowFuture.thenAccept(
rightSideRow -> {
temporalProcessTimeJoinHelper.processElement1(leftSideRow, rightSideRow);
if (rightSideRow != null) {
// register a cleanup timer only if the rightSideRow is not null
registerProcessingCleanupTimer();
}
});
}

@Override
public void processElement2(StreamRecord<RowData> element) throws Exception {
if (RowDataUtil.isAccumulateMsg(element.getValue())) {
StateFuture<Void> updateFuture = rightState.asyncUpdate(element.getValue());
updateFuture.thenAccept(Void -> registerProcessingCleanupTimer());
} else {
StateFuture<Void> clearFuture = rightState.asyncClear();
clearFuture.thenAccept(Void -> cleanupLastTimer());
}
}

@Override
public void close() throws Exception {
FunctionUtils.closeFunction(joinCondition);
super.close();
}

/**
* The method to be called when a cleanup timer fires.
*
* @param time The timestamp of the fired timer.
*/
@Override
public StateFuture<Void> cleanupState(long time) {
return rightState.asyncClear();
}

/** Invoked when an event-time timer fires. */
@Override
public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {}

private class AsyncStateTemporalProcessTimeJoinHelper extends TemporalProcessTimeJoinHelper {
public AsyncStateTemporalProcessTimeJoinHelper() {
super(isLeftOuterJoin, joinCondition, outRow, rightNullRow, collector);
}
}
}
Loading