Skip to content

Commit

Permalink
Merge pull request #82 from meta-soul/flink_cdc_test
Browse files Browse the repository at this point in the history
Fix flink cdc write event order
  • Loading branch information
moresun authored Sep 6, 2022
2 parents 4872054 + 9807837 commit 1746d20
Show file tree
Hide file tree
Showing 18 changed files with 181 additions and 121 deletions.
2 changes: 1 addition & 1 deletion lakesoul-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<properties>
<postgresql.version>42.4.1</postgresql.version>
<spark.version>3.1.1</spark.version>
<spark.version>3.1.2</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<local.scope>provided</local.scope>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ public static DataBaseProperty getDBInfo() {
}
DataBaseProperty dataBaseProperty = new DataBaseProperty();
dataBaseProperty.setDriver(properties.getProperty("lakesoul.pg.driver", "org.postgresql.Driver"));
dataBaseProperty.setUrl(properties.getProperty("lakesoul.pg.url", "jdbc:postgresql://127.0.0.1:5433/test_lakesoul_meta?stringtype=unspecified"));
dataBaseProperty.setUsername(properties.getProperty("lakesoul.pg.username", "yugabyte"));
dataBaseProperty.setPassword(properties.getProperty("lakesoul.pg.password", "yugabyte"));
dataBaseProperty.setUrl(properties.getProperty("lakesoul.pg.url", "jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified"));
dataBaseProperty.setUsername(properties.getProperty("lakesoul.pg.username", "lakesoul_test"));
dataBaseProperty.setPassword(properties.getProperty("lakesoul.pg.password", "lakesoul_test"));
return dataBaseProperty;
}

Expand Down
8 changes: 7 additions & 1 deletion lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<version>2.1.0-lakesoul-flink-1.14-SNAPSHOT</version>
<properties>
<flink.version>1.14.3</flink.version>
<spark.version>3.1.1</spark.version>
<spark.version>3.1.2</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<log4j.version>2.17.2</log4j.version>
Expand Down Expand Up @@ -91,6 +91,12 @@
<version>${flink.version}</version>
<scope>${local.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
<scope>${local.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public class LakSoulFileWriter<IN> extends LakesSoulAbstractStreamingWriter<IN,
private final Configuration flinkConf;
private final OutputFileConfig outputFileConfig;
private final LakeSoulKeyGen keyGen;
private transient Set<String> currentNewBuckets;
private transient TreeMap<Long, Set<String>> newBuckets;
private transient Set<String> committableBuckets;
private transient Map<String, Long> inProgressBuckets;
//private transient Set<String> currentNewBuckets;
// private transient TreeMap<Long, Set<String>> newBuckets;
// private transient Set<String> committableBuckets;
//private transient Map<String, Long> inProgressBuckets;
private transient PartitionCommitPredicate partitionCommitPredicate;

public LakSoulFileWriter(long bucketCheckInterval,
Expand All @@ -71,16 +71,12 @@ public void initializeState(StateInitializationContext context) throws Exception
super.initializeState(context);
this.partitionCommitPredicate =
PartitionCommitPredicate.create(flinkConf, getUserCodeClassloader(), partitionKeyList);
this.currentNewBuckets = new HashSet<>();
this.newBuckets = new TreeMap<>();
this.committableBuckets = new HashSet<>();
this.inProgressBuckets = new HashMap<>();
ClassLoader userCodeClassLoader = getContainingTask().getUserCodeClassLoader();
RecordComparator recordComparator = this.keyGen.getComparator().newInstance(userCodeClassLoader);
this.keyGen.setCompareFunction(recordComparator);
}

@Override
/* @Override
protected void partitionCreated(String partition) {
this.currentNewBuckets.add(partition);
this.inProgressBuckets.putIfAbsent(partition, getProcessingTimeService().getCurrentProcessingTime());
Expand All @@ -90,7 +86,7 @@ protected void partitionCreated(String partition) {
protected void partitionInactive(String partition) {
this.committableBuckets.add(partition);
this.inProgressBuckets.remove(partition);
}
}*/

@Override
protected void onPartFileOpened(String s, Path newPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;

import java.util.*;

public abstract class LakesSoulAbstractStreamingWriter<IN, OUT> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
Expand All @@ -43,6 +46,10 @@ public abstract class LakesSoulAbstractStreamingWriter<IN, OUT> extends Abstract
protected transient LakeSoulBuckets<IN, String> buckets;
private transient LakeSoulFileSinkHelper<IN> helper;
protected transient long currentWatermark;
protected transient Set<String> currentNewBuckets;
protected transient Map<String, Long> inProgressBuckets;
protected transient Set<String> committableBuckets;
protected transient TreeMap<Long, Set<String>> newBuckets;

public LakesSoulAbstractStreamingWriter(long bucketCheckInterval,
LakeSoulBucketsBuilder<IN, String, ? extends LakeSoulBucketsBuilder<IN, String, ?>> bucketsBuilder) {
Expand All @@ -51,9 +58,15 @@ public LakesSoulAbstractStreamingWriter(long bucketCheckInterval,
setChainingStrategy(ChainingStrategy.ALWAYS);
}

protected abstract void partitionCreated(String partition);
protected void partitionCreated(String partition){
this.currentNewBuckets.add(partition);
this.inProgressBuckets.putIfAbsent(partition, getProcessingTimeService().getCurrentProcessingTime());
};

protected abstract void partitionInactive(String partition);
protected void partitionInactive(String partition){
this.committableBuckets.add(partition);
this.inProgressBuckets.remove(partition);
};

protected abstract void onPartFileOpened(String partition, Path newPath);

Expand All @@ -67,8 +80,12 @@ protected void commitUpToCheckpoint(long checkpointId) throws Exception {
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
this.currentNewBuckets = new HashSet<>();
this.committableBuckets = new HashSet<>();
this.inProgressBuckets = new HashMap<>();
this.newBuckets = new TreeMap<>();

buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
// Set listener before the initialization of LakeSoulBuckets.
buckets.setBucketLifeCycleListener(
new LakeSoulBucketLifeCycleListener<IN, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public class LakeSoulBucket<IN, BucketID> {
private final NavigableMap<Long, InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverablesPerCheckpoint;
private final NavigableMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
private final OutputFileConfig outputFileConfig;
private PriorityQueue<RowData> sortQueue;
private PriorityQueue<LakeSoulCDCElement> sortQueue;
private LakeSoulCDCComparator lakeCompare;
private AtomicLong bucketRowCount;
private final LakeSoulKeyGen keyGen;
@Nullable
Expand Down Expand Up @@ -87,7 +88,9 @@ private LakeSoulBucket(int subtaskIndex,
this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
LakeSoulRollingPolicyImpl lakesoulRollingPolicy = (LakeSoulRollingPolicyImpl) rollingPolicy;
this.keyGen = lakesoulRollingPolicy.getKeyGen();
this.sortQueue = new PriorityQueue<>(keyGen.getCompareFunction());
lakeCompare=new LakeSoulCDCComparator(keyGen.getCompareFunction());
this.sortQueue = new PriorityQueue<>(lakeCompare);
//this.sortQueue = new PriorityQueue<>(keyGen.getCompareFunction());
bucketRowCount = new AtomicLong(0L);
}

Expand Down Expand Up @@ -182,8 +185,7 @@ void write(IN element, long currentTime) throws IOException {
this.inProgressPart = this.rollPartFile(currentTime);
bucketRowCount = new AtomicLong(0L);
}
sortQueue.add((RowData) element);

sortQueue.add(new LakeSoulCDCElement((RowData) element, currentTime));
}

private boolean checkRollingPolicy() {
Expand All @@ -197,7 +199,8 @@ private boolean checkRollingPolicy() {

void sortWrite(long currentTime) throws IOException {
while (!sortQueue.isEmpty()) {
RowData poll = sortQueue.poll();
LakeSoulCDCElement le = sortQueue.poll();
RowData poll = le.element;
this.inProgressPart.write((IN) poll, currentTime);
}
}
Expand All @@ -223,7 +226,8 @@ private Path assembleNewPartPath() {
return new Path(this.bucketPath,
this.outputFileConfig.getPartPrefix()
+ '-' + subTask + '-' + uuid + '_' + this.subtaskIndex
+ '.' + 'c' + count + this.outputFileConfig.getPartSuffix());
+ '.' + 'c' + count + this.outputFileConfig.getPartSuffix()
+ ".parquet");
}

InProgressFileWriter.PendingFileRecoverable closePartFile(long currentTime) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.flink.lakeSoul.sink.bucket;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.RecordComparator;

import java.io.Serializable;
import java.util.Comparator;

public class LakeSoulCDCComparator implements Comparator<LakeSoulCDCElement>, Serializable {
private RecordComparator rc;
public LakeSoulCDCComparator(RecordComparator rc){
this.rc=rc;
}
@Override
public int compare(LakeSoulCDCElement E1, LakeSoulCDCElement E2) {
int res=rc.compare(E1.element,E2.element);
if(res!=0){
return res;
}else{
res=compareLong(E1.timedata,E2.timedata);
if(res!=0){
return res;
}else{
return compareEvent(E1.element,E2.element);
}
}
}
public int compareLong(long e1,long e2){
long res=e1-e2;
if(res==0){
return 0;
}else{
if(res<0){
return -1;
}else{
return 1;
}
}
}
public int compareEvent(RowData e1,RowData e2){
return Byte.compare(e1.getRowKind().toByteValue(),e2.getRowKind().toByteValue());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* Copyright [2022] [DMetaSoul Team]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package org.apache.flink.lakeSoul.sink.bucket;

import org.apache.flink.table.data.RowData;

public class LakeSoulCDCElement {
public RowData element;
public long timedata;
public LakeSoulCDCElement(RowData rd, long td){
this.element=rd;
this.timedata=td;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@

import org.apache.flink.api.common.functions.Partitioner;

public class DataPartitioner<String> implements Partitioner<String> {
public class DataPartitioner<Long> implements Partitioner<Long> {

@Override
public int partition(String key, int numPartitions) {
int hash = Integer.parseInt((java.lang.String) key) % numPartitions;
return hash < 0 ? (hash + numPartitions) % numPartitions : hash;
public int partition(Long key, int numPartitions) {
long hash = (long) key;
int part = (int) (hash % (long) numPartitions);
return part < 0 ? (part + numPartitions) % numPartitions : part;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ private DataStreamSink<?> createStreamingSink(DataStream<RowData> dataStream,
TypeInformation.of(DataFileMetaData.class),
lakeSoulFileWriter).name("DataWrite")
.setParallelism(bucketParallelism);

//metadata upload Task
DataStream<Void> commitStream = writeResultStream.transform(
MetaDataCommit.class.getSimpleName(),
Expand Down
Loading

0 comments on commit 1746d20

Please sign in to comment.