Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: KinesisIO source on FlinkRunner initializes the same splits twice #31313

Open
2 of 16 tasks
hlteoh37 opened this issue May 16, 2024 · 28 comments · May be fixed by #33606
Open
2 of 16 tasks

[Bug]: KinesisIO source on FlinkRunner initializes the same splits twice #31313

hlteoh37 opened this issue May 16, 2024 · 28 comments · May be fixed by #33606
Assignees
Labels

Comments

@hlteoh37
Copy link
Contributor

hlteoh37 commented May 16, 2024

What happened?

Bug description

Setup details:

  • FlinkRunner (Flink 1.15.4) (also replicated with Flink 1.18.2)
  • KinesisIO (from beam-sdks-java-io-amazon-web-services2)
  • Beam version 2.56.0
  • Pipeline attached mode (false)

Bug details:

  • When restoring from snapshot on Flink, the org.apache.beam.sdk.io.aws2.kinesis.KinesisReader is assigned the same splits twice, once with snapshot state, and once without. This leads to duplicate data being processed.

Replication steps:

  1. Start a Flink job with KinesisIO source.
  2. Stop the Flink job with a savepoint.
  3. Start the same Flink job from savepoint.

Logs:

  • From Flink Taskmanager (worker node) log dump below, we can see that splits for shardId-000000000000 to shardId-000000000003 are first initialized with checkpoint state AFTER_SEQUENCE_NUMBER (correct).
  • Following that, we see that they are initialized without checkpoint state with AT_TIMESTAMP (not correct).
2024-05-16 12:29:43,263 INFO  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=org.apache.beam.sdk.io.aws2.kinesis.KinesisSource@269f1286, splitState.isNull=false, checkpointMark=null}]
2024-05-16 12:29:43,264 WARN  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader [] - AutoWatermarkInterval is not set, watermarks will be emitted at a default interval of 200 ms
2024-05-16 12:29:43,264 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: KDS Source/Read(KinesisSource) -> Flat Map -> ParDo(Anonymous)/ParMultiDo(Anonymous) -> KDS Sink/ParDo(Anonymous)/ParMultiDo(Anonymous) (1/1)#0 (9e813ffa491b6a4d44e7860742f1576b) switched from INITIALIZING to RUNNING.
2024-05-16 12:29:43,265 WARN  org.apache.beam.sdk.coders.SerializableCoder                 [] - Can't verify serialized elements of type KinesisReaderCheckpoint have well defined equals method. This may produce incorrect results on some PipelineRunner implementations2024-05-16 12:29:43,265 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisSource            [] - Got checkpoint mark [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream,shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 49651326499966927501614829993237864477127027156192329778 0]2024-05-16 12:29:43,266 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisSource            [] - Creating new reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 49651326499966927501614829993237864477127027156192329778 0]
2024-05-16 12:29:43,268 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisReader            [] - Starting reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 496513264999669275016148299932378644771270271561923297780]
2024-05-16 12:29:43,272 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000000, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326501327272958725198009666143092350196858539212802)
2024-05-16 12:29:43,795 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000001, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326500635949857570748692274909048439253764329701394)
2024-05-16 12:29:43,891 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000002, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326503155934065004709107267236287428903985330782242)
2024-05-16 12:29:43,983 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000003, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326499966927501614829993237864477127027156192329778)
2024-05-16 12:29:44,080 INFO  org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool         [] - Starting to read ExampleInputStream stream from [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003] shards
2024-05-16 12:29:44,266 INFO  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=org.apache.beam.sdk.io.aws2.kinesis.KinesisSource@2a249887, splitState.isNull=true, checkpointMark=null}]
2024-05-16 12:29:44,266 INFO  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Received NoMoreSplits signal from enumerator.
2024-05-16 12:29:44,898 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisSource            [] - No checkpointMark specified, fall back to initial [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,898 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisSource            [] - Creating new reader using [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,899 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisReader            [] - Starting reader using [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream,shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,899 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000000, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,311 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000001, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,412 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000002, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,514 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000003, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,620 INFO  org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool         [] - Starting to read ExampleInputStream stream from [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003] shards

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@hlteoh37
Copy link
Contributor Author

Adding dump of replication Flink code here:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.flink.example</groupId>
    <artifactId>beam</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>Apache Flink Beam Application</name>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.15.2</flink.version>
        <logback.version>1.4.14</logback.version>
        <main-class>org.apache.flink.example.BeamApplication</main-class>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-math3</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-flink-1.15</artifactId>
            <version>2.56.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
            <version>2.56.0</version>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:flink-core</exclude>
                                    <exclude>org.apache.flink:flink-annotations</exclude>
                                    <exclude>org.apache.flink:flink-metrics-core</exclude>
                                    <exclude>org.apache.flink:flink-shaded-*</exclude>
                                    <exclude>org.apache.flink:flink-table-*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>${main-class}</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Java class

package org.apache.flink.example;


import io.opentelemetry.instrumentation.annotations.WithSpan;
import lombok.Data;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.joda.time.Duration;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.nio.charset.StandardCharsets;
import java.time.Instant;

public class BeamApplication {

    @Data
    @JsonPropertyOrder({"timestamp", "location", "quantity"})
    public static final class Event {
        private Instant timestamp;
        private String location;
        private long quantity;
    }

    @WithSpan
    public static void main(final String... args) throws Exception {
        FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
        options.setRunner(FlinkRunner.class);
        options.setAttachedMode(false);

        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply("KDS Source", KinesisIO.read()
                .withStreamName("ExampleInputStream")
                        .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                        .withInitialTimestampInStream(org.joda.time.Instant.now().minus(Duration.standardMinutes(30)))
                        .withClientConfiguration(ClientConfiguration.builder()
                                .region(Region.US_EAST_1)
                                .build()))
                .apply(ParDo.of(new DoFn<KinesisRecord, String>() {
                    @ProcessElement
                    public void processElement(@Element KinesisRecord record, OutputReceiver<String> out) {
                        System.out.println(record.toString());
                        out.output(record.toString());
                    }
                }))
                .apply("KDS Sink", KinesisIO.<String>write()
                        .withStreamName("ExampleOutputStream")
                        .withClientConfiguration(ClientConfiguration.builder()
                                .region(Region.US_EAST_1)
                                .build())
                        .withSerializer((SerializableFunction<String, byte[]>) input -> input.getBytes(StandardCharsets.UTF_8))
                        .withPartitioner(KinesisPartitioner.explicitRandomPartitioner(1))
                );


        pipeline.run();

    }
}

@akashk99
Copy link

@je-ik is this the same issue as this #30903

I noticed you fixed it and the problem statement seems to be similar, but please let me know if this is something different as I am getting duplicated data on 2.56 when restoring from a flink save point

@je-ik
Copy link
Contributor

je-ik commented May 24, 2024

I suppose this is (similar, but) different issue, probably caused by the same underlying bug. #30903 fixed Impulse only. Does using --experiments=beam_fn_api fix the issue?

@akashk99
Copy link

@je-ik Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation?

@akashk99
Copy link

I am noticing actually a lot of back pressure using this approach despite downstream operators having low CPU usage. Is the fix to the root cause relatively straight forward in which case I can implement it in a forked version of the repo? or is it more involved?

@je-ik
Copy link
Contributor

je-ik commented May 24, 2024

I don't know the root cause, it seems that Flink does not send the snapshot state after restore from savepoint. I observed this on the Impulse (I suspected that it affects only bounded sources running in unbounded mode, but it seems it is not the case). It might be a Beam bug or a Flink bug.

Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation?

The flag turns on different expansion for Read transform - it uses splittable DoFn (SDF), which uses Impulse which was fixed earlier. Performance should be similar to classical Read.

@akashk99
Copy link

Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part.

I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem?

it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks)

Screenshot 2024-05-24 at 12 48 32 PM

@je-ik
Copy link
Contributor

je-ik commented May 25, 2024

Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part.

Can you please provide a minimal example and setup to reproduce the behavior?

I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem?

You can drain the Pipeline, see https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#terminating-a-job

it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks)

This is related to how Flink computes target splits. It is affected by maximal parallelism (which is computed automatically, if not specified). You can try increasing it via --maxParallelism=32768 (32768 is maximal value), this could make the assignment more balanced.

@akashk99
Copy link

Thanks for the suggestions, will give them a try. I believe the first comment of the ticket provides a simple pipeline that exhibits this behavior on the flink runner but if that doesn’t work, happy to provide another. The example also submits the job in detached mode which may be related, although have seen similar behavior without it. Appreciate your help looking into this, if there’s anything I can assist with, please let me know

@akashk99
Copy link

Just to mimic the local setup I used:

I ran flink/start-cluster.sh

used the flink run command with the -d flag

and then stopped the job with a savepoint ./flink/bin/flink stop -p flink/savepoints cf78a44e6b10ab7062d3c02bb7d4e052

and then restarted using run with the savepoint path.

When doing this, I looked inside the task manager logs and searched for Starting getIterator request and saw 6 logs for the same timestamp that my app restarted. 3 at sequence number and 3 at latest. I am not sure why the latest ones are showing up and didnt see anything in the source code that would cause this.

I also switched to kafka and noticed the same behavior so it seems to be related to the runner. I was unable to fix the performance issues with beam_fn_api and notice the backpressure was causing my data to come in waves. Looking at a cpu chart, it was very cyclic with peaks of 99% cpu and troughs of 8% cpu leading me to believe that this pipeline option was causing some sort of build up and then a rush of data causing the cpu to spike.

I can make do with kafka offset commits for now, but if there are any pointers on how to fix this in the beam source code, id be happy to take a look and even submit a PR to be included in version 2.57. Although still hoping the issue is somewhere on my end that can be fixed fairly easily

@je-ik
Copy link
Contributor

je-ik commented Jun 6, 2024

Hi @akashk99, just to be sure, do you observe the same behavior when not using flink run,, but running the job as "standard" Java app (java -cp <jar> class) and passing the Flink configuration using Beam command-line args (--runner=flink --flinkMaster=... --savepointPath=...)?

@akashk99
Copy link

akashk99 commented Jun 6, 2024

Hi @je-ik , was just able to reproduce the issue by manually running the jar file.

Started the job by running java -cp <jar> <class> --runner=flink --flinkMaster=... and then used flink stop with savepoint to take a savepoint. afterwards, I reran, java -cp <jar> <class> --runner=flink --flinkMaster=... --savepointPath=... with the snapshot I just took. After observing the task manager logs, I see:


Terminal Log: 
Jun 06, 2024 12:55:08 PM org.apache.flink.client.program.rest.RestClusterClient lambda$null$6
INFO: Successfully submitted job <jobName> (6651a9570e4c9d1df81539b07e6e91ce) to 'http://localhost:8081'.

Task Manager Logs:
2024-06-06 12:55:11,979 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=<streamName>, ShardId=shardId-000000000012, ShardIteratorType=AT_SEQUENCE_NUMBER, 

2024-06-06 12:55:13,279 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=<streamName>, ShardId=shardId-000000000012, ShardIteratorType=LATEST)

this was a few seconds after the job was submitted. I trimmed the output, but these two logs were there for all of my shards.

@dcasado
Copy link

dcasado commented Jul 9, 2024

Hi, we are seeing the same behavior on our pipeline.

Logs from a Task Manager

2024-07-08 14:29:13,867 WARN  org.apache.beam.sdk.coders.SerializableCoder                 [] - Can't verify serialized elements of type KinesisReaderCheckpoint have well defined equals method. This may produce incorrect results on some PipelineRunner implementations
2024-07-08 14:29:13,867 INFO  org.apache.beam.sdk.io.kinesis.KinesisSource                 [] - Creating new reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream <streamName>, shard shardId-000000002111: 49653574367537750625153914864609254626539224692753990642]
2024-07-08 14:29:13,945 INFO  org.apache.beam.sdk.io.kinesis.KinesisReader                 [] - Starting reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream <streamName>, shard shardId-000000002111: 49653574367537750625153914864609254626539224692753990642]
2024-07-08 14:29:14,122 INFO  org.apache.beam.sdk.io.kinesis.ShardReadersPool              [] - Starting to read <streamName> stream from [shardId-000000002111] shards
2024-07-08 14:29:14,772 INFO  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=2, beamSource=org.apache.beam.sdk.io.kinesis.KinesisSource@a09dd60, splitState.isNull=true, checkpointMark=null}]
2024-07-08 14:29:14,773 INFO  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Received NoMoreSplits signal from enumerator.
2024-07-08 14:29:14,775 INFO  org.apache.beam.sdk.io.kinesis.KinesisSource                 [] - Creating new reader using [Checkpoint LATEST for stream <streamName>, shard shardId-000000002111: null]
2024-07-08 14:29:14,778 INFO  org.apache.beam.sdk.io.kinesis.KinesisReader                 [] - Starting reader using [Checkpoint LATEST for stream <streamName>, shard shardId-000000002111: null]
2024-07-08 14:29:14,830 INFO  org.apache.beam.sdk.io.kinesis.ShardReadersPool              [] - Starting to read <streamName> stream from [shardId-000000002111] shards

In our case we are using:

  • FlinkRunner 1.17.2
  • Beam 2.56.0
  • KinesisIO (the deprecated one from beam-sdks-java-io-kinesis)

@weijiequ
Copy link

We met the same issue, is there any workaround available?

FlinkRunner 1.18
Beam 2.57.0
KafkaIO
"Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=[org.apache.beam.sdk.io](http://org.apache.beam.sdk.io/).kafka.KafkaUnboundedSource@5420eacc, splitState.isNull=false, checkpointMark=null}]",
"Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=[org.apache.beam.sdk.io](http://org.apache.beam.sdk.io/).kafka.KafkaUnboundedSource@174df24, splitState.isNull=true, checkpointMark=null}]",

@je-ik
Copy link
Contributor

je-ik commented Jan 14, 2025

@weijiequ Do you use --experiments=use_deprecated_read? And does the behavior change if you add/remove it?

@weijiequ
Copy link

weijiequ commented Jan 15, 2025

hi @je-ik , tried with --experiments=use_deprecated_read, the behavior is the same - duplicated Source.Reader are created.
With thread dump, we could observe multiple KafkaConsumerPoll-thread are running - reading from the same partition concurrently.
"KafkaConsumerPoll-thread" Id=153 RUNNABLE
"KafkaConsumerPoll-thread" Id=167 RUNNABLE
If run without savepoint, only one KafkaConsumerPoll-thread is running.

Taking further look into the stacktrace, the first time adding splits is at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase.addSplits(FlinkSourceReaderBase.java:198) org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:344) org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753) org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728) org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693) org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955) org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748) org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) java.base/java.lang.Thread.run(Thread.java:829)

Then the normal code path of adding splits (the same as run without savepoint) will add a duplicate split again.
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase.addSplits(FlinkSourceReaderBase.java:198) org.apache.flink.streaming.api.operators.SourceOperator.handleAddSplitsEvent(SourceOperator.java:590) org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:567) org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) ....

How about adding a duplicate check by split id here?
https://github.com/apache/beam/blob/release-2.57.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java#L241

@je-ik
Copy link
Contributor

je-ik commented Jan 15, 2025

I just find a possibly related SO question: https://stackoverflow.com/questions/79088562/flink-job-processes-kafka-messages-twice-after-jobmanager-failover-in-ha-mode

Could verify if the behavior is the same in your case? I.e. killing TMs after savepoint inicializes splits only once (because they now start from checkpoint, not savepoint)?

@weijiequ
Copy link

weijiequ commented Jan 15, 2025

Looks like very similar issue. We do normal shutdown with savepoint and then restore from savepoint. It causes the duplicate message issue (all new produced messages after restart will be consumed twice, existing messages are fine).
Similar to the SO post, we had also verified that the expected number of tasks are up and running. We additionally took a thread dump of the subtask then we found two running Kakfa poll threads. It's not the Kafka offset commit issue, we checked the status of the consumers - the committed offset is up to date. All new produced messages to the topic will be consumed twice by the task.
I didn't try to kill the TMs, it's possible that the issue could be gone by doing that as it's now restore from checkpoint.

@je-ik je-ik self-assigned this Jan 15, 2025
@je-ik je-ik added P1 and removed P3 labels Jan 15, 2025
@je-ik je-ik added this to the 2.63.0 Release milestone Jan 15, 2025
@je-ik je-ik added P2 and removed P1 labels Jan 15, 2025
@je-ik je-ik linked a pull request Jan 15, 2025 that will close this issue
3 tasks
@je-ik
Copy link
Contributor

je-ik commented Jan 15, 2025

@weijiequ I created PR that seems to fix the issue in my local setup. Can you apply the patch and verify it at your side, please?

#33606

@weijiequ
Copy link

@je-ik verified, it works, thank you!!
When starting without savepoint, the splits are added with "Starting source".
When starting with savepoint, this start is skipped thus no duplicate splits are added.

@weijiequ
Copy link

@je-ik After applying this change, I noticed a side-effect: when I scale out my job (for example, increasing the parallelism from 4 to 8) and then restart from a savepoint, the additional splits (indexes 4, 5, 6, 7) never start, while the original four splits (indexes 0, 1, 2, 3) continue running as expected.

Image

@je-ik
Copy link
Contributor

je-ik commented Jan 16, 2025

Good catch! 👍
The reason is that the splits are statically assigned to the workers after the initial split. Seems the implementation of addSplitsBack is wrong, I'll look into that.

@weijiequ
Copy link

Thanks @je-ik , once you have the updated patch, I can also verify on my local.

je-ik added a commit to je-ik/beam that referenced this issue Jan 16, 2025
je-ik added a commit to je-ik/beam that referenced this issue Jan 16, 2025
@je-ik
Copy link
Contributor

je-ik commented Jan 16, 2025

@weijiequ can you try setting --maxParallelism=32768, start and rescale the job again?

@weijiequ
Copy link

@je-ik Confirmed with --maxParallelism=32768 (set this before scale out), the additional new splits after scale out could up and run. So shall we apply this in our env or there will be a permanent fix for this as well?

@je-ik
Copy link
Contributor

je-ik commented Jan 17, 2025

You can actually use lower number. Something that can fit into maximal scale you can reach.

@weijiequ
Copy link

Got it, double confirm on the suggested solution at the moment

  1. applying the previous shared patch two days ago
  2. set --maxParallelism as a start option
    Am I right? @je-ik

@je-ik
Copy link
Contributor

je-ik commented Jan 17, 2025

Yes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants