-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 191: Add at-least-once stream processing example #192
base: dev
Are you sure you want to change the base?
Changes from all commits
85ea475
666c86e
eded361
13c3931
178957e
90af788
c2c5fe6
666ea50
c76d165
18b8fb4
6905085
adb61b8
6bb05b1
9584ce0
4f11177
7eac911
9daa883
f17131b
d2f3b26
39a9f92
2af2fb7
c18bfe9
353b346
f31a483
753c7f6
faddc14
0d6ebd1
a1f3a5f
82a074c
13076f2
bf5a9b0
7836d27
5629ffc
ca59a61
36b18cd
b47ed44
ad26df4
f43a2cd
5f9aeb0
f2bdf1a
4601f08
5f46551
da84a01
d409f44
bb00ed4
52e5caa
b2b2873
0fc8783
a429894
e34bb90
265fd13
6aab6bb
0439f09
1b5d275
f4a0be8
dc9dfcf
f329c6b
c3eae91
c80ef0d
ca1d896
3a9978a
5c56356
6b4f1bb
6aebc88
f8155cf
2ed765d
913ef6e
ec39792
097fb8b
a06dbc6
5c09d55
bd6029c
9215290
33902d8
843d4c2
8c880cd
4dc1aae
1ad8163
afa5139
40675a4
f67ea43
55a8607
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/* | ||
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
*/ | ||
package io.pravega.example.streamprocessing; | ||
|
||
import java.net.URI; | ||
import java.util.UUID; | ||
|
||
/** | ||
* All parameters will come from environment variables. | ||
* This makes it easy to configure on Docker, Kubernetes, etc. | ||
*/ | ||
class AppConfiguration { | ||
AppConfiguration(String[] args) { | ||
} | ||
|
||
// By default, we will connect to a standalone Pravega running on localhost. | ||
public URI getControllerURI() { | ||
return URI.create(getEnvVar("PRAVEGA_CONTROLLER", "tcp://localhost:9090")); | ||
} | ||
|
||
public String getScope() { | ||
return getEnvVar("PRAVEGA_SCOPE", "examples"); | ||
} | ||
|
||
public String getInstanceId() { | ||
return getEnvVar("INSTANCE_ID", UUID.randomUUID().toString()); | ||
} | ||
|
||
/** | ||
* The output of EventGenerator and the input of AtLeastOnceApp. | ||
*/ | ||
public String getStream1Name() { | ||
return getEnvVar("PRAVEGA_STREAM_1", "streamprocessing1c"); | ||
} | ||
|
||
/** | ||
* The output of AtLeastOnceApp and the input of EventDebugSink. | ||
*/ | ||
public String getStream2Name() { | ||
return getEnvVar("PRAVEGA_STREAM_2", "streamprocessing2c"); | ||
} | ||
|
||
public String getReaderGroup() { | ||
return getEnvVar("PRAVEGA_READER_GROUP", "streamprocessing1c-rg1"); | ||
} | ||
|
||
public String getMembershipSynchronizerStreamName() { | ||
return getReaderGroup() + "-membership"; | ||
} | ||
|
||
public int getTargetRateEventsPerSec() { | ||
return Integer.parseInt(getEnvVar("PRAVEGA_TARGET_RATE_EVENTS_PER_SEC", "10")); | ||
} | ||
|
||
public int getScaleFactor() { | ||
return Integer.parseInt(getEnvVar("PRAVEGA_SCALE_FACTOR", "2")); | ||
} | ||
|
||
public int getMinNumSegments() { | ||
return Integer.parseInt(getEnvVar("PRAVEGA_MIN_NUM_SEGMENTS", "6")); | ||
} | ||
|
||
public long getCheckpointPeriodMs() { | ||
return Long.parseLong(getEnvVar("CHECKPOINT_PERIOD_MS", "3000")); | ||
} | ||
|
||
public long getCheckpointTimeoutMs() { | ||
return Long.parseLong(getEnvVar("CHECKPOINT_TIMEOUT_MS", "120000")); | ||
} | ||
|
||
public long getTransactionTimeoutMs() { | ||
return Long.parseLong(getEnvVar("TRANSACTION_TIMEOUT_MS", "120000")); | ||
} | ||
|
||
public long getHeartbeatIntervalMillis() { | ||
return Long.parseLong(getEnvVar("HEARTBEAT_INTERVAL_MS", "500")); | ||
} | ||
|
||
private static String getEnvVar(String name, String defaultValue) { | ||
String value = System.getenv(name); | ||
if (value == null || value.isEmpty()) { | ||
return defaultValue; | ||
} | ||
return value; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
/* | ||
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
*/ | ||
package io.pravega.example.streamprocessing; | ||
|
||
import com.google.gson.reflect.TypeToken; | ||
import io.pravega.client.ClientConfig; | ||
import io.pravega.client.EventStreamClientFactory; | ||
import io.pravega.client.SynchronizerClientFactory; | ||
import io.pravega.client.admin.ReaderGroupManager; | ||
import io.pravega.client.admin.StreamManager; | ||
import io.pravega.client.stream.EventRead; | ||
import io.pravega.client.stream.EventStreamWriter; | ||
import io.pravega.client.stream.EventWriterConfig; | ||
import io.pravega.client.stream.ReaderConfig; | ||
import io.pravega.client.stream.ReaderGroup; | ||
import io.pravega.client.stream.ReaderGroupConfig; | ||
import io.pravega.client.stream.ScalingPolicy; | ||
import io.pravega.client.stream.Stream; | ||
import io.pravega.client.stream.StreamConfiguration; | ||
import lombok.Cleanup; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.Executors; | ||
|
||
/** | ||
* This demonstrates reading events from a Pravega stream, processing each event, | ||
* and writing each output event to another Pravega stream. | ||
* It guarantees that each event is processed at least once. | ||
* If multiple instances of this application are executed using the same readerGroupName parameter, | ||
* each instance will get a distinct subset of events. | ||
* | ||
* Use {@link EventGenerator} to generate input events and {@link EventDebugSink} | ||
* to view the output events. | ||
*/ | ||
public class AtLeastOnceApp { | ||
private static final Logger log = LoggerFactory.getLogger(AtLeastOnceApp.class); | ||
|
||
private final AppConfiguration config; | ||
private final ClientConfig clientConfig; | ||
|
||
public static void main(String[] args) throws Exception { | ||
final AtLeastOnceApp app = new AtLeastOnceApp(new AppConfiguration(args)); | ||
app.run(); | ||
} | ||
|
||
public AtLeastOnceApp(AppConfiguration config) { | ||
this.config = config; | ||
this.clientConfig = ClientConfig.builder().controllerURI(getConfig().getControllerURI()).build(); | ||
} | ||
|
||
public AppConfiguration getConfig() { | ||
return config; | ||
} | ||
|
||
private void run() { | ||
// Get the provided instanceId that uniquely identifes this instances of AtLeastOnceApp. | ||
// It will be randomly generated if not provided by the user. | ||
final String instanceId = getConfig().getInstanceId(); | ||
log.info("instanceId={}", instanceId); | ||
|
||
createStreams(); | ||
|
||
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | ||
.stream(Stream.of(getConfig().getScope(), getConfig().getStream1Name())) | ||
.automaticCheckpointIntervalMillis(getConfig().getCheckpointPeriodMs()) | ||
.build(); | ||
@Cleanup | ||
ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(getConfig().getScope(), clientConfig); | ||
// Create the Reader Group (ignored if it already exists) | ||
readerGroupManager.createReaderGroup(getConfig().getReaderGroup(), readerGroupConfig); | ||
@Cleanup | ||
final ReaderGroup readerGroup = readerGroupManager.getReaderGroup(getConfig().getReaderGroup()); | ||
@Cleanup | ||
final EventStreamClientFactory eventStreamClientFactory = EventStreamClientFactory.withScope(getConfig().getScope(), clientConfig); | ||
@Cleanup | ||
final SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(getConfig().getScope(), clientConfig); | ||
// Create a Pravega stream writer that we will send our processed output to. | ||
@Cleanup | ||
final EventStreamWriter<SampleEvent> writer = eventStreamClientFactory.createEventWriter( | ||
getConfig().getStream2Name(), | ||
new JSONSerializer<>(new TypeToken<SampleEvent>(){}.getType()), | ||
EventWriterConfig.builder().build()); | ||
|
||
final SampleEventProcessor processor = new SampleEventProcessor( | ||
() -> ReaderGroupPruner.create( | ||
readerGroup, | ||
getConfig().getMembershipSynchronizerStreamName(), | ||
instanceId, | ||
synchronizerClientFactory, | ||
Executors.newScheduledThreadPool(1), | ||
getConfig().getHeartbeatIntervalMillis()), | ||
() -> eventStreamClientFactory.<SampleEvent>createReader( | ||
instanceId, | ||
readerGroup.getGroupName(), | ||
new JSONSerializer<>(new TypeToken<SampleEvent>(){}.getType()), | ||
ReaderConfig.builder().build()), | ||
1000, | ||
instanceId, | ||
writer); | ||
|
||
processor.startAsync(); | ||
|
||
// Add shutdown hook for graceful shutdown. | ||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't the processor do this itself if this is needed.? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It gets messy to do this in the AtLeastOnceProcessor because then it would need to deal with removing the shutdown hook, but only doing that when the JVM is not actually shutting down. The point of this shutdown hook is to shutdown the whole application gracefully, which in this case happens to consist of only one AtLeastOnceProcessor service. But in general, an application may consist of many services that all need to be shutdown in a particular order. So it seems appropriate that AtLeastOnceApp should coordinate the shutdown. |
||
log.info("Running shutdown hook."); | ||
processor.stopAsync(); | ||
log.info("Waiting for processor to terminate."); | ||
processor.awaitTerminated(); | ||
log.info("Processor terminated."); | ||
})); | ||
|
||
processor.awaitTerminated(); | ||
} | ||
|
||
/** | ||
* Create the input and output streams (ignored if they already exist). | ||
*/ | ||
private void createStreams() { | ||
try (StreamManager streamManager = StreamManager.create(clientConfig)) { | ||
streamManager.createScope(getConfig().getScope()); | ||
final StreamConfiguration streamConfig = StreamConfiguration.builder() | ||
.scalingPolicy(ScalingPolicy.byEventRate( | ||
getConfig().getTargetRateEventsPerSec(), | ||
getConfig().getScaleFactor(), | ||
getConfig().getMinNumSegments())) | ||
.build(); | ||
streamManager.createStream(getConfig().getScope(), getConfig().getStream1Name(), streamConfig); | ||
streamManager.createStream(getConfig().getScope(), getConfig().getStream2Name(), streamConfig); | ||
// Create stream for the membership state synchronizer. | ||
streamManager.createStream( | ||
getConfig().getScope(), | ||
getConfig().getMembershipSynchronizerStreamName(), | ||
StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed(1)).build()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment, but could we use input stream and output stream rather than stream 1 and 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AppConfiguration is shared by EventGenerator, AtLeastOnceApp, and EventDebugSink. Stream 1 is the output for EventGenerator and the input for AtLeastOnceApp. Stream 2 is the output for AtLeastOnceApp and the input for EventDebugSink. To avoid the confusion from this point of view, I chose Stream 1 and Stream 2. It is a little odd, I admit. I added comments to the code to clarify this.