Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink 1.18.1 upgrade #48

Draft
wants to merge 30 commits into
base: 4.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ecf7d2c
WIP 1.18 upgrade initial commit - added dynamic PSC source and finish…
jeffxiang Nov 26, 2024
034ab2c
Finish sink code changes
jeffxiang Nov 27, 2024
39f067f
Finish source changes; add pause + resume API's to PscConsumer in cor…
jeffxiang Nov 28, 2024
f5c45ae
Finish streaming.connectors.psc (mainly table) code changes
jeffxiang Dec 3, 2024
9e01a18
Finish first round code changes for sink tests
jeffxiang Dec 3, 2024
54a6c4f
Finish code changes for source tests
jeffxiang Dec 4, 2024
78c7b73
WIP finish test code changes for streaming.connectors.psc top level
jeffxiang Dec 4, 2024
d588c71
WIP finish test code changes for PscDynamicTableFactoryTest
jeffxiang Dec 4, 2024
fdecfdc
Finish table tests refactor
jeffxiang Dec 5, 2024
4004bef
Finish test refactor
jeffxiang Dec 5, 2024
16d3155
Compiles
jeffxiang Dec 5, 2024
153d685
All dynamic.source tests pass except DynamicPscSourceITTest specific …
jeffxiang Dec 10, 2024
b50fcbf
WIP fixing basic cluster read dynamic source
jeffxiang Dec 11, 2024
bf4b858
Remove extra debugging logic
jeffxiang Dec 11, 2024
d325fb4
Fixed DynamicPscSourceITTest
jeffxiang Dec 11, 2024
5c95746
Finish fixing dynamic.source tests
jeffxiang Dec 12, 2024
d8d23e8
WIP all sink tests pass except one test checkProducerLeak()
jeffxiang Dec 12, 2024
8b6cea8
Finish source tests
jeffxiang Dec 12, 2024
a789999
WIP finish fixing streaming.connectors.psc internals, shuffle, table …
jeffxiang Dec 12, 2024
9f95a6b
WIP fixing PscSerializerUpgradeTest
jeffxiang Dec 12, 2024
bf362ae
Generated test files for migration/snapshot
jeffxiang Dec 12, 2024
18ced74
Everything should pass except testBrokerFailure
jeffxiang Dec 12, 2024
7406f35
Fix testIteratorFor and testTransactionalProducerWithKafkaBackendCann…
jeffxiang Dec 13, 2024
e30306f
Force close pscProducer in testTransactionalProducerWithKafkaBackendC…
jeffxiang Dec 13, 2024
3bd478d
Update log4j settings to prevent flood of build logs
jeffxiang Dec 13, 2024
c7aa188
Add repository io.confluent for kafka-schema-registry-client; change …
jeffxiang Dec 13, 2024
70d83e2
Add thread.sleep(5s) in FlinkPscProducerITCase.testFlinkPscProducerFa…
jeffxiang Dec 13, 2024
30a5b22
Wrap metrics() in sychronized block to try and prevent CME
jeffxiang Dec 13, 2024
7195742
Split build to parallelize
jeffxiang Dec 13, 2024
4c5f16b
Rename build workflow
jeffxiang Dec 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/core-build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: PSC-Java Build

on: [pull_request]

jobs:
build:

runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Build with Maven
run: mvn -B install --file pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: PSC-Java Build
name: ITCase Tests

on: [pull_request]

Expand All @@ -13,6 +13,6 @@ jobs:
with:
java-version: 1.8
- name: Build with Maven
run: mvn -B install --file pom.xml
run: mvn -B install --file pom.xml -DskipTests
- name: Run psc-flink *ITCase tests
run: mvn clean test -pl psc-flink -Dtest=*ITCase
76 changes: 43 additions & 33 deletions psc-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
</parent>
<artifactId>psc-flink</artifactId>
<properties>
<flink.version>1.15.1</flink.version>
<kafka.version>2.8.1</kafka.version>
<flink.version>1.18.1</flink.version>
<flink.kafka.connector.version>3.2.0-1.18</flink.kafka.connector.version>
<kafka.version>3.4.0</kafka.version>
<zookeeper.version>3.4.10</zookeeper.version>
<curator.version>2.12.0</curator.version>
<scala.version>2.12.7</scala.version>
Expand All @@ -21,6 +22,19 @@
<javadoc.opts>-Xdoclint:none</javadoc.opts>
</properties>

<!-- this is needed due to io.confluent:kafka-schema-registry-client not found in mvn central-->
<repositories>
<repository>
<id>io-confluent</id>
<name>io-confluent</name>
<url>https://packages.confluent.io/maven</url>
<releases>
<enabled>true</enabled>
<updatePolicy>interval:60</updatePolicy>
</releases>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -38,6 +52,18 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.kafka.connector.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -96,37 +122,6 @@

<!-- test dependencies -->

<dependency>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-logging</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.logstash.log4j</groupId>
<artifactId>jsonevent-layout</artifactId>
<version>1.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
Expand Down Expand Up @@ -176,6 +171,14 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- force using the latest zkclient -->
<dependency>
<groupId>com.101tec</groupId>
Expand Down Expand Up @@ -221,6 +224,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class PscFlinkConfiguration {

public static TopicUri validateAndGetBaseClusterUri(Properties properties) throws TopicUriSyntaxException {
if (!properties.containsKey(CLUSTER_URI_CONFIG)) {
throw new IllegalArgumentException("Cluster URI is required for transactional producer");
throw new IllegalArgumentException("Cluster URI not found");
}
return BaseTopicUri.validate(properties.getProperty(CLUSTER_URI_CONFIG));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pinterest.flink.connector.psc.dynamic.metadata;

import com.google.common.base.MoreObjects;
import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import org.apache.flink.annotation.Experimental;

import java.io.Serializable;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

/**
* {@link ClusterMetadata} provides readers information about a cluster on what topics to read and
* how to connect to a cluster.
*/
@Experimental
public class ClusterMetadata implements Serializable {
private final Set<String> topics;
private final Set<String> topicUris;
private final Properties properties;
private final String clusterUriString;

/**
* Constructs the {@link ClusterMetadata} with the required properties.
*
* @param topics the topics belonging to a cluster.
* @param properties the properties to access a cluster.
*/
public ClusterMetadata(Set<String> topics, Properties properties) {
this.topics = topics;
this.properties = properties;
try {
this.clusterUriString = PscFlinkConfiguration.validateAndGetBaseClusterUri(properties).getTopicUriAsString();
} catch (TopicUriSyntaxException e) {
throw new RuntimeException("Invalid cluster.uri", e);
}
this.topicUris = topics.stream().map(t -> clusterUriString + t).collect(Collectors.toSet());

}

/**
* Get the topics.
*
* @return the topics.
*/
public Set<String> getTopics() {
return topics;
}

public Set<String> getTopicUris() {
return topicUris;
}

/**
* Get the properties.
*
* @return the properties.
*/
public Properties getProperties() {
return properties;
}

/**
* Get the cluster URI string.
*
* @return the cluster URI string.
*/
public String getClusterUriString() {
return clusterUriString;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("topics", topics)
.add("properties", properties)
.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClusterMetadata that = (ClusterMetadata) o;
return Objects.equals(topics, that.topics) && Objects.equals(properties, that.properties);
}

@Override
public int hashCode() {
return Objects.hash(topics, properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pinterest.flink.connector.psc.dynamic.metadata;

import org.apache.flink.annotation.Experimental;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Set;

/** Metadata service that returns PubSub cluster details. */
@Experimental
public interface PscMetadataService extends AutoCloseable, Serializable {
/**
* Get current metadata for all streams.
*
* @return set of all streams
*/
Set<PscStream> getAllStreams();

/**
* Get current metadata for queried streams.
*
* @param streamIds stream full names
* @return map of stream name to metadata
*/
Map<String, PscStream> describeStreams(Collection<String> streamIds);

/**
* Check if the cluster is active.
*
* @param clusterId cluster id
* @return boolean whether the cluster is active
*/
boolean isClusterActive(String clusterId);
}
Loading
Loading