Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Flink sample for HDInsight on AKS #130

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ Implement a stream processing architecture using:
Implement a stream processing architecture using:

- Event Hubs Kafka (Ingest / Immutable Log)
- Flink on HDInsight or Azure Kubernetes Service (Stream Process)
- Flink on HDInsight (Stream Process)
- Event Hubs Kafka (Serve)

### [Event Hubs Kafka + Azure Functions + Cosmos DB](https://github.com/Azure-Samples/streaming-at-scale/tree/main/eventhubskafka-functions-cosmosdb)
Expand Down
Binary file modified _doc/_images/flink-job-manager.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 12 additions & 0 deletions assert/has-local-unzip.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

# Strict mode, fail on any error
set -euo pipefail

HAS_UNZIP=$(command -v unzip || true)
if [ -z "$HAS_UNZIP" ]; then
echo "unzip not found"
echo "please install it using your package manager, for example, on Ubuntu:"
echo " sudo apt install unzip"
exit 1
fi
40 changes: 25 additions & 15 deletions components/apache-flink/flink-kafka-consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.1</flink.version>
<flink.version>1.16.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>0.12</kafka.version>
<junit.version>4.13.1</junit.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
Expand All @@ -35,32 +36,32 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>


<!-- Add connector and adapter dependencies here. They must be in the default scope (compile). -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>

<dependency>
Expand All @@ -70,16 +71,25 @@
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,26 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.microsoft.samples.flink.StreamingJobCommon.getParams;

public class ComplexEventProcessingJob {
private static final int MAX_EVENT_DELAY = 60; // max delay for out of order events
private static final Logger LOG = LoggerFactory.getLogger(ComplexEventProcessingJob.class);

public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
ParameterTool params = getParams(args);

StreamExecutionEnvironment env = StreamingJobCommon.createStreamExecutionEnvironment(params);
JsonMapperSchema<SampleRecord> schema = new JsonMapperSchema(SampleRecord.class);
FlinkKafkaConsumerBase<ConsumerRecord<byte[], SampleRecord>> consumer = StreamingJobCommon.createKafkaConsumer(params, schema);
JsonMapperSchema<SampleTag> schema2 = new JsonMapperSchema(SampleTag.class);
FlinkKafkaProducer011<SampleTag> producer = StreamingJobCommon.createKafkaProducer(params, schema2);
FlinkKafkaProducer<SampleTag> producer = StreamingJobCommon.createKafkaProducer(params, schema2);

// setup streaming execution environment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Expand All @@ -50,7 +51,6 @@ public static void main(String[] args) throws Exception {

}


static void buildStream(DataStream<ConsumerRecord<byte[], SampleRecord>> source, SinkFunction<SampleTag> producer, KeyedProcessFunction<String, SampleRecord, SampleTag> logic) {
DataStream<SampleTag> stream = source
.rebalance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public void processElement(SampleRecord receivedRecord, Context context, Collect
state = new SampleState();
}

LOG.debug(String.format("DBG-3 %s, %d, %d", receivedRecord.deviceId, state.recordsSize(), state.tagsSize()));

// add latest record to the state
state.addRecord(receivedRecord);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import java.time.Instant;

import static com.microsoft.samples.flink.StreamingJobCommon.getParams;

public class SimpleRelayStreamingJob {
private static final int MAX_EVENT_DELAY = 60; // max delay for out of order events

public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
ParameterTool params = getParams(args);

StreamExecutionEnvironment env = StreamingJobCommon.createStreamExecutionEnvironment(params);
JsonMapperSchema<SampleRecord> schema = new JsonMapperSchema<>(SampleRecord.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;


Expand Down Expand Up @@ -44,18 +46,17 @@ static <T> FlinkKafkaConsumerBase<T> createKafkaConsumer(ParameterTool params, K
LOG.info("Consuming from Kafka topic: {}", topicIn);

// Create Kafka consumer deserializing from JSON.
// Flink recommends using Kafka 0.11 consumer as Kafka 1.0 consumer is not stable.
return new FlinkKafkaConsumer011<>(topicIn, schema, properties);
return new FlinkKafkaConsumer<>(topicIn, schema, properties);
}

static <T> FlinkKafkaProducer011<T> createKafkaProducer(ParameterTool params, SerializationSchema<T> schema) {
static <T> FlinkKafkaProducer<T> createKafkaProducer(ParameterTool params, SerializationSchema<T> schema) {
Properties propertiesOut = new Properties();
setProperties(params, "kafka.out.", propertiesOut);
String topicOut = (String) propertiesOut.remove("topic");
if (topicOut == null) throw new IllegalArgumentException("Missing configuration value kafka.topic.out");
LOG.info("Writing into Kafka topic: {}", topicOut);

FlinkKafkaProducer011<T> kafkaOut = new FlinkKafkaProducer011<>(
FlinkKafkaProducer<T> kafkaOut = new FlinkKafkaProducer<>(
topicOut,
schema,
propertiesOut
Expand All @@ -70,6 +71,12 @@ private static void setProperties(ParameterTool params, String prefix, Propertie
});
}


static ParameterTool getParams(String[] args) throws IOException {
InputStream resourceAsStream = StreamingJobCommon.class.getClassLoader().getResourceAsStream("params.properties");
if (resourceAsStream != null) {
return ParameterTool.fromPropertiesFile(resourceAsStream);
}
return ParameterTool.fromArgs(args);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ public static class ComplexData implements Serializable {
public double moreData20;
public double moreData21;
public double moreData22;
public double moreData23;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.*;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;

import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Optional;


public class JsonMapperSchema<T> implements KafkaDeserializationSchema<ConsumerRecord<byte[], T>>, SerializationSchema<T> {

private static final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX").withZone(ZoneOffset.UTC);
private final Class<T> type;
private final ObjectMapper mapper;

private static final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ISO_INSTANT.withZone(ZoneOffset.UTC);

public JsonMapperSchema(Class<T> type) {
this.type = type;

Expand Down Expand Up @@ -57,7 +57,7 @@ public ConsumerRecord<byte[], T> deserialize(ConsumerRecord<byte[], byte[]> r) t
byte[] message = r.value();
T v = mapper.readValue(message, type);
return new
ConsumerRecord<byte[], T>(r.topic(), r.partition(), r. offset(), r. timestamp(), r. timestampType(), null, r. serializedKeySize(), r. serializedValueSize(), r. key(), v, r. headers());
ConsumerRecord<>(r.topic(), r.partition(), r.offset(), r.timestamp(), r.timestampType(), r.serializedKeySize(), r.serializedValueSize(), r.key(), v, r.headers(), Optional.empty());
}

public TypeInformation<ConsumerRecord<byte[], T>> getProducedType() {
Expand All @@ -76,7 +76,7 @@ private static class InstantDeserializer extends JsonDeserializer<Instant> imple

@Override
public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
return dateTimeFormatter.parse(jsonParser.getText(), Instant::from);
return Instant.parse(jsonParser.getText());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void setupTestHarness() throws Exception {
//instantiate user-defined function
ComplexEventProcessingLogic logic = new ComplexEventProcessingLogic();

// wrap user defined function into a the corresponding operator
// wrap user defined function into the corresponding operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedProcessOperator<>(logic),
(KeySelector<SampleRecord, String>) value -> value.deviceId,
Expand Down
Loading