Skip to content

Commit

Permalink
[BEAM-10529] update KafkaIO Xlang integration test to publish and rec…
Browse files Browse the repository at this point in the history
…eive null keys (apache#17319)

* [BEAM-10529] update test to publish and receive null keys

* [BEAM-10529] add test with a populated key to kafka xlang_kafkaio_it_test.py
  • Loading branch information
johnjcasey authored May 18, 2022
1 parent 857f8d3 commit 4ed320c
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,21 @@ def process(


class CrossLanguageKafkaIO(object):
def __init__(self, bootstrap_servers, topic, expansion_service=None):
def __init__(
self, bootstrap_servers, topic, null_key, expansion_service=None):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.null_key = null_key
self.expansion_service = expansion_service
self.sum_counter = Metrics.counter('source', 'elements_sum')

def build_write_pipeline(self, pipeline):
_ = (
pipeline
| 'Generate' >> beam.Create(range(NUM_RECORDS)) # pylint: disable=bad-option-value
| 'MakeKV' >> beam.Map(lambda x:
(b'', str(x).encode())).with_output_types(
typing.Tuple[bytes, bytes])
| 'MakeKV' >> beam.Map(
lambda x: (None if self.null_key else b'key', str(x).encode())).
with_output_types(typing.Tuple[typing.Optional[bytes], bytes])
| 'WriteToKafka' >> WriteToKafka(
producer_config={'bootstrap.servers': self.bootstrap_servers},
topic=self.topic,
Expand Down Expand Up @@ -112,13 +114,26 @@ def run_xlang_kafkaio(self, pipeline):
os.environ.get('LOCAL_KAFKA_JAR'),
"LOCAL_KAFKA_JAR environment var is not provided.")
class CrossLanguageKafkaIOTest(unittest.TestCase):
def test_kafkaio(self):
kafka_topic = 'xlang_kafkaio_test_{}'.format(uuid.uuid4())
def test_kafkaio_populated_key(self):
kafka_topic = 'xlang_kafkaio_test_populated_key_{}'.format(uuid.uuid4())
local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
with self.local_kafka_service(local_kafka_jar) as kafka_port:
bootstrap_servers = '{}:{}'.format(
self.get_platform_localhost(), kafka_port)
pipeline_creator = CrossLanguageKafkaIO(bootstrap_servers, kafka_topic)
pipeline_creator = CrossLanguageKafkaIO(
bootstrap_servers, kafka_topic, False)

self.run_kafka_write(pipeline_creator)
self.run_kafka_read(pipeline_creator)

def test_kafkaio_null_key(self):
kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4())
local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
with self.local_kafka_service(local_kafka_jar) as kafka_port:
bootstrap_servers = '{}:{}'.format(
self.get_platform_localhost(), kafka_port)
pipeline_creator = CrossLanguageKafkaIO(
bootstrap_servers, kafka_topic, True)

self.run_kafka_write(pipeline_creator)
self.run_kafka_read(pipeline_creator)
Expand Down

0 comments on commit 4ed320c

Please sign in to comment.