From a647f88ba6bf3ada14ccbee6a7f9018102f1be16 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Wed, 10 Apr 2024 03:19:32 +0000 Subject: [PATCH] [#30870]: support consumer polling timeout in KafkaIO expansion service --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 20 +++++++++++++++++-- .../sdk/io/kafka/KafkaIOExternalTest.java | 5 ++++- sdks/python/apache_beam/io/kafka.py | 9 +++++++-- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index c56071e85adb..d8c0df2a16e8 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -831,6 +831,17 @@ static void setupExternalBuilder( // We can expose dynamic read to external build when ReadFromKafkaDoFn is the default // implementation. builder.setDynamicRead(false); + + if(config.consumerPollingTimeoutSeconds != null) { + if(config.consumerPollingTimeoutSeconds <= 0) { + throw new IllegalArgumentException("consumerPollingTimeoutSeconds should be > 0."); + } + builder.setConsumerPollingTimeout( + Duration.standardSeconds(config.consumerPollingTimeoutSeconds)); + } else { + builder.setConsumerPollingTimeout( + Duration.standardSeconds(2L)); + } } private static Coder resolveCoder(Class> deserializer) { @@ -893,6 +904,7 @@ public static class Configuration { private Long maxNumRecords; private Long maxReadTime; private Boolean commitOffsetInFinalize; + private Long consumerPollingTimeoutSeconds; private String timestampPolicy; public void setConsumerConfig(Map consumerConfig) { @@ -934,6 +946,10 @@ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) { public void setTimestampPolicy(String timestampPolicy) { this.timestampPolicy = timestampPolicy; } + + public void setConsumerPollingTimeoutSeconds(Long consumerPollingTimeoutSeconds) { + this.consumerPollingTimeoutSeconds = consumerPollingTimeoutSeconds; + } } } @@ -1342,7 +1358,7 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord /** * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. - * The default is 2 second. + * The default is 2 seconds. */ public Read withConsumerPollingTimeout(Duration duration) { checkState( @@ -2387,7 +2403,7 @@ public ReadSourceDescriptors withBadRecordErrorHandler( /** * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. - * The default is 2 second. + * The default is 2 seconds. */ public ReadSourceDescriptors withConsumerPollingTimeout(@Nullable Duration duration) { return toBuilder().setConsumerPollingTimeout(duration).build(); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index dd859af50864..920cfd7095a8 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -107,7 +107,8 @@ public void testConstructKafkaRead() throws Exception { Field.of("value_deserializer", FieldType.STRING), Field.of("start_read_time", FieldType.INT64), Field.of("commit_offset_in_finalize", FieldType.BOOLEAN), - Field.of("timestamp_policy", FieldType.STRING))) + Field.of("timestamp_policy", FieldType.STRING), + Field.of("consumer_polling_timeout_seconds", FieldType.INT64))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) @@ -115,6 +116,7 @@ public void testConstructKafkaRead() throws Exception { .withFieldValue("start_read_time", startReadTime) .withFieldValue("commit_offset_in_finalize", false) .withFieldValue("timestamp_policy", "ProcessingTime") + .withFieldValue("consumer_polling_timeout_seconds", 5L) .build()); RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); @@ -265,6 +267,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception { expansionService.expand(request, observer); ExpansionApi.ExpansionResponse result = observer.result; RunnerApi.PTransform transform = result.getTransform(); + assertThat( transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*"))); diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index b96576b4efb3..362bb2f6f5c8 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -93,7 +93,8 @@ ('value_deserializer', str), ('start_read_time', typing.Optional[int]), ('max_num_records', typing.Optional[int]), ('max_read_time', typing.Optional[int]), - ('commit_offset_in_finalize', bool), ('timestamp_policy', str)]) + ('commit_offset_in_finalize', bool), ('timestamp_policy', str), + ('consumer_polling_timeout_seconds', typing.Optional[int])]) def default_io_expansion_service(append_args=None): @@ -134,6 +135,7 @@ def __init__( max_read_time=None, commit_offset_in_finalize=False, timestamp_policy=processing_time_policy, + consumer_polling_timeout_seconds=None, with_metadata=False, expansion_service=None, ): @@ -159,6 +161,8 @@ def __init__( :param commit_offset_in_finalize: Whether to commit offsets when finalizing. :param timestamp_policy: The built-in timestamp policy which is used for extracting timestamp from KafkaRecord. + :param consumer_polling_timeout_seconds: Kafka client polling request + timeout time in seconds. Default is 2 seconds. :param with_metadata: whether the returned PCollection should contain Kafka related metadata or not. If False (default), elements of the returned PCollection will be of type 'bytes' if True, elements of the @@ -186,7 +190,8 @@ def __init__( max_read_time=max_read_time, start_read_time=start_read_time, commit_offset_in_finalize=commit_offset_in_finalize, - timestamp_policy=timestamp_policy)), + timestamp_policy=timestamp_policy, + consumer_polling_timeout_seconds=consumer_polling_timeout_seconds)), expansion_service or default_io_expansion_service())