Skip to content

Commit

Permalink
[YAML] Kafka Read Provider (apache#28865)
Browse files Browse the repository at this point in the history
* [YAML] Kafka provider
  • Loading branch information
ffernandez92 authored Oct 27, 2023
1 parent 3b89039 commit e98e37f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class KafkaReadSchemaTransformConfiguration {

public static final Set<String> VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest");

public static final String VALID_FORMATS_STR = "AVRO,JSON";
public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON";
public static final Set<String> VALID_DATA_FORMATS =
Sets.newHashSet(VALID_FORMATS_STR.split(","));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
Expand Down Expand Up @@ -112,16 +113,56 @@ protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configurati
consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

String format = configuration.getFormat();

if (format != null && format.equals("RAW")) {
if (inputSchema != null) {
throw new IllegalArgumentException(
"To read from Kafka in RAW format, you can't provide a schema.");
}
Schema rawSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
SerializableFunction<byte[], Row> valueMapper = getRawBytesToRowFunction(rawSchema);
return new SchemaTransform() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
KafkaIO.Read<byte[], byte[]> kafkaRead =
KafkaIO.readBytes()
.withConsumerConfigUpdates(consumerConfigs)
.withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores())
.withTopic(configuration.getTopic())
.withBootstrapServers(configuration.getBootstrapServers());
if (isTest) {
kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs));
}

PCollection<byte[]> kafkaValues =
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());

PCollectionTuple outputTuple =
kafkaValues.apply(
ParDo.of(new ErrorFn("Kafka-read-error-counter", valueMapper))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));

return PCollectionRowTuple.of(
"output",
outputTuple.get(OUTPUT_TAG).setRowSchema(rawSchema),
"errors",
outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
}
};
}

if (inputSchema != null && !inputSchema.isEmpty()) {
assert Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
: "To read from Kafka, a schema must be provided directly or though Confluent "
+ "Schema Registry, but not both.";

final Schema beamSchema =
Objects.equals(configuration.getFormat(), "JSON")
Objects.equals(format, "JSON")
? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
: AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
SerializableFunction<byte[], Row> valueMapper =
Objects.equals(configuration.getFormat(), "JSON")
Objects.equals(format, "JSON")
? JsonUtils.getJsonBytesToRowFunction(beamSchema)
: AvroUtils.getAvroBytesToRowFunction(beamSchema);
return new SchemaTransform() {
Expand Down Expand Up @@ -193,6 +234,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}

public static SerializableFunction<byte[], Row> getRawBytesToRowFunction(Schema rawSchema) {
return new SimpleFunction<byte[], Row>() {
@Override
public Row apply(byte[] input) {
return Row.withSchema(rawSchema).addValue(input).build();
}
};
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:kafka_read:v1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,22 @@ public void testBuildTransformWithJsonSchema() throws IOException {
StandardCharsets.UTF_8))
.build());
}

@Test
public void testBuildTransformWithRawFormat() throws IOException {
ServiceLoader<SchemaTransformProvider> serviceLoader =
ServiceLoader.load(SchemaTransformProvider.class);
List<SchemaTransformProvider> providers =
StreamSupport.stream(serviceLoader.spliterator(), false)
.filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
.collect(Collectors.toList());
KafkaReadSchemaTransformProvider kafkaProvider =
(KafkaReadSchemaTransformProvider) providers.get(0);
kafkaProvider.from(
KafkaReadSchemaTransformConfiguration.builder()
.setTopic("anytopic")
.setBootstrapServers("anybootstrap")
.setFormat("RAW")
.build());
}
}
21 changes: 21 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@
config:
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'

- type: renaming
transforms:
'ReadFromKafka': 'ReadFromKafka'
config:
mappings:
'ReadFromKafka':
'schema': 'schema'
'consumer_config': 'consumerConfigUpdates'
'format': 'format'
'topic': 'topic'
'bootstrap_servers': 'bootstrapServers'
'confluent_schema_registry_url': 'confluentSchemaRegistryUrl'
'confluent_schema_registry_subject': 'confluentSchemaRegistrySubject'
'auto_offset_reset_config': 'autoOffsetResetConfig'
underlying_provider:
type: beamJar
transforms:
'ReadFromKafka': 'beam:schematransform:org.apache.beam:kafka_read:v1'
config:
gradle_target: 'sdks:java:io:expansion-service:shadowJar'

- type: python
transforms:
'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery'
Expand Down

0 comments on commit e98e37f

Please sign in to comment.