-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36851][table] Introduce TemporalProcessTimeJoinOperator in TemporalJoin with Async State API #25777
base: master
Are you sure you want to change the base?
[FLINK-36851][table] Introduce TemporalProcessTimeJoinOperator in TemporalJoin with Async State API #25777
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import org.apache.flink.streaming.api.transformations.TwoInputTransformation; | ||
import org.apache.flink.table.api.TableException; | ||
import org.apache.flink.table.api.ValidationException; | ||
import org.apache.flink.table.api.config.ExecutionConfigOptions; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.table.planner.codegen.CodeGenUtils; | ||
import org.apache.flink.table.planner.codegen.CodeGeneratorContext; | ||
|
@@ -50,6 +51,7 @@ | |
import org.apache.flink.table.runtime.operators.join.FlinkJoinType; | ||
import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator; | ||
import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator; | ||
import org.apache.flink.table.runtime.operators.join.temporal.asyncprocessing.AsyncStateTemporalProcessTimeJoinOperator; | ||
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; | ||
import org.apache.flink.table.types.logical.RowType; | ||
import org.apache.flink.util.Preconditions; | ||
|
@@ -72,6 +74,7 @@ | |
name = "stream-exec-temporal-join", | ||
version = 1, | ||
producedTransformations = StreamExecTemporalJoin.TEMPORAL_JOIN_TRANSFORMATION, | ||
consumedOptions = {"table.exec.async-state.enabled"}, | ||
minPlanVersion = FlinkVersion.v1_15, | ||
minStateVersion = FlinkVersion.v1_15) | ||
public class StreamExecTemporalJoin extends ExecNodeBase<RowData> | ||
|
@@ -276,12 +279,21 @@ private TwoInputStreamOperator<RowData, RowData, RowData> createJoinOperator( | |
isLeftOuterJoin); | ||
} else { | ||
if (isTemporalFunctionJoin) { | ||
return new TemporalProcessTimeJoinOperator( | ||
InternalTypeInfo.of(rightInputType), | ||
generatedJoinCondition, | ||
minRetentionTime, | ||
maxRetentionTime, | ||
isLeftOuterJoin); | ||
if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) { | ||
return new AsyncStateTemporalProcessTimeJoinOperator( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Jira mentions SQL and table and the component in the PR title mentions [TABLE], but the fix and test appear to be datastream only. Is there an intent that this effects table API and SQL? Please could you update the documentation -as this is new changes to the API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for review. This PR is for completing a part of Flip-473. Its intention is similar to #25320 . The original API has not been changed, it will only switch the TemporalProcessTimeJoinOperator under the FLINK-TABLE module to be executed by Asynchronous StateTemporalProcessTimeJoinOperator when table. exec.async-state.enable is set to true. Is there anything else that needs to be modified? If so, please let me know. Thank you There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
ok thanks for the clarification |
||
InternalTypeInfo.of(rightInputType), | ||
generatedJoinCondition, | ||
minRetentionTime, | ||
maxRetentionTime, | ||
isLeftOuterJoin); | ||
} else { | ||
return new TemporalProcessTimeJoinOperator( | ||
InternalTypeInfo.of(rightInputType), | ||
generatedJoinCondition, | ||
minRetentionTime, | ||
maxRetentionTime, | ||
isLeftOuterJoin); | ||
} | ||
} else { | ||
// The exsiting TemporalProcessTimeJoinOperator has already supported temporal table | ||
// join. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.runtime.operators.join.temporal.asyncprocessing; | ||
|
||
import org.apache.flink.api.common.functions.DefaultOpenContext; | ||
import org.apache.flink.api.common.functions.util.FunctionUtils; | ||
import org.apache.flink.api.common.state.v2.StateFuture; | ||
import org.apache.flink.api.common.state.v2.ValueState; | ||
import org.apache.flink.runtime.state.VoidNamespace; | ||
import org.apache.flink.runtime.state.v2.ValueStateDescriptor; | ||
import org.apache.flink.streaming.api.operators.InternalTimer; | ||
import org.apache.flink.streaming.api.operators.TimestampedCollector; | ||
import org.apache.flink.streaming.api.watermark.Watermark; | ||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; | ||
import org.apache.flink.table.data.GenericRowData; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.table.data.util.RowDataUtil; | ||
import org.apache.flink.table.data.utils.JoinedRowData; | ||
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; | ||
import org.apache.flink.table.runtime.generated.JoinCondition; | ||
import org.apache.flink.table.runtime.operators.join.temporal.utils.TemporalProcessTimeJoinHelper; | ||
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; | ||
|
||
/** | ||
* The operator to temporal join a stream on processing time in async state. | ||
* | ||
* <p>For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) and temporal table | ||
* join (FOR SYSTEM_TIME AS OF), they can reuse same processing-time operator implementation, the | ||
* differences between them are: (1) The temporal TableFunction join only supports single column in | ||
* primary key but temporal table join supports arbitrary columns in primary key. (2) The temporal | ||
* TableFunction join only supports inner join, temporal table join supports both inner join and | ||
* left outer join. | ||
*/ | ||
public class AsyncStateTemporalProcessTimeJoinOperator | ||
extends BaseTwoInputAsyncStateStreamOperatorWithStateRetention { | ||
|
||
private static final long serialVersionUID = 1L; | ||
|
||
private final boolean isLeftOuterJoin; | ||
private final InternalTypeInfo<RowData> rightType; | ||
private final GeneratedJoinCondition generatedJoinCondition; | ||
|
||
private transient ValueState<RowData> rightState; | ||
private transient JoinCondition joinCondition; | ||
|
||
private transient JoinedRowData outRow; | ||
private transient GenericRowData rightNullRow; | ||
private transient TimestampedCollector<RowData> collector; | ||
|
||
private transient AsyncStateTemporalProcessTimeJoinHelper temporalProcessTimeJoinHelper; | ||
|
||
public AsyncStateTemporalProcessTimeJoinOperator( | ||
InternalTypeInfo<RowData> rightType, | ||
GeneratedJoinCondition generatedJoinCondition, | ||
long minRetentionTime, | ||
long maxRetentionTime, | ||
boolean isLeftOuterJoin) { | ||
super(minRetentionTime, maxRetentionTime); | ||
this.rightType = rightType; | ||
this.generatedJoinCondition = generatedJoinCondition; | ||
this.isLeftOuterJoin = isLeftOuterJoin; | ||
} | ||
|
||
@Override | ||
public void open() throws Exception { | ||
super.open(); | ||
this.joinCondition = | ||
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); | ||
FunctionUtils.setFunctionRuntimeContext(joinCondition, getRuntimeContext()); | ||
FunctionUtils.openFunction(joinCondition, DefaultOpenContext.INSTANCE); | ||
|
||
ValueStateDescriptor<RowData> rightStateDesc = | ||
new ValueStateDescriptor<>("right", rightType); | ||
this.rightState = getRuntimeContext().getValueState(rightStateDesc); | ||
this.collector = new TimestampedCollector<>(output); | ||
this.outRow = new JoinedRowData(); | ||
this.rightNullRow = new GenericRowData(rightType.toRowSize()); | ||
// consider watermark from left stream only. | ||
super.processWatermark2(Watermark.MAX_WATERMARK); | ||
this.temporalProcessTimeJoinHelper = new AsyncStateTemporalProcessTimeJoinHelper(); | ||
} | ||
|
||
@Override | ||
public void processElement1(StreamRecord<RowData> element) throws Exception { | ||
RowData leftSideRow = element.getValue(); | ||
// RowData rightSideRow = rightState.value(); | ||
StateFuture<RowData> rightSideRowFuture = rightState.asyncValue(); | ||
|
||
rightSideRowFuture.thenAccept( | ||
rightSideRow -> { | ||
temporalProcessTimeJoinHelper.processElement1(leftSideRow, rightSideRow); | ||
if (rightSideRow != null) { | ||
// register a cleanup timer only if the rightSideRow is not null | ||
registerProcessingCleanupTimer(); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public void processElement2(StreamRecord<RowData> element) throws Exception { | ||
if (RowDataUtil.isAccumulateMsg(element.getValue())) { | ||
StateFuture<Void> updateFuture = rightState.asyncUpdate(element.getValue()); | ||
updateFuture.thenAccept(Void -> registerProcessingCleanupTimer()); | ||
} else { | ||
StateFuture<Void> clearFuture = rightState.asyncClear(); | ||
clearFuture.thenAccept(Void -> cleanupLastTimer()); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws Exception { | ||
FunctionUtils.closeFunction(joinCondition); | ||
super.close(); | ||
} | ||
|
||
/** | ||
* The method to be called when a cleanup timer fires. | ||
* | ||
* @param time The timestamp of the fired timer. | ||
*/ | ||
@Override | ||
public StateFuture<Void> cleanupState(long time) { | ||
return rightState.asyncClear(); | ||
} | ||
|
||
/** Invoked when an event-time timer fires. */ | ||
@Override | ||
public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {} | ||
|
||
private class AsyncStateTemporalProcessTimeJoinHelper extends TemporalProcessTimeJoinHelper { | ||
public AsyncStateTemporalProcessTimeJoinHelper() { | ||
super(isLeftOuterJoin, joinCondition, outRow, rightNullRow, collector); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the other PR, As a heads up, changes like this should update consumedOptions for the JSON serialization.
(Sorry for splitting the conversation across PRs.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The settings related to this configuration have been updated in #25320.
I don't quite understand the meaning of what you mean by 'update consumedOptions for the JSON serialization'. I would greatly appreciate it if you could give me some more tips.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The annotations here: https://github.com/apache/flink/pull/25777/files#diff-76717e3a4ed9896387a0961a4ff297ccd8a7dd4ef19b49605cf0755fb1d0d1dbR73-R78 show provide information about how to serialize this node to JSON.
Checkout other examples and you'll see a field named
consumedOptions
which tells which options are used the node.If I understand, you are adding a new option which is used by this class. As such, you need to advertise it in the annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, thanks. I have added the corresponding explanation for consumedOptions.