Skip to content

Commit

Permalink
SAMZA-1226: relax type parameters in MessageStream functions
Browse files Browse the repository at this point in the history
relax the type parameter in user supplied functions in fluent API

Author: Yi Pan (Data Infrastructure) <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>, Navina Ramesh <[email protected]>, Jacob Maes <[email protected]>

Closes apache#133 from nickpan47/SAMZA-1226 and squashes the following commits:

b8d3461 [Yi Pan (Data Infrastructure)] SAMZA-1226: cleanup code example in StreamApplication javadoc
93fa471 [Yi Pan (Data Infrastructure)] SAMZA-1226: added more unit tests for type-cast functions
18e1e9f [Yi Pan (Data Infrastructure)] SAMZA-1226: address review feedbacks
b5da53b [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1226
7981b83 [Yi Pan (Data Infrastructure)] SAMZA-1226: relax type parameters in MessageStream functions
  • Loading branch information
nickpan47 committed Apr 21, 2017
1 parent e753c8d commit c91da78
Show file tree
Hide file tree
Showing 23 changed files with 601 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,43 @@


/**
* This interface defines a template for stream application that user will implement to create operator DAG in {@link StreamGraph}.
* This interface defines a template for stream application that user will implement to initialize operator DAG in {@link StreamGraph}.
*
* <p>
* User program implements {@link StreamApplication#init(StreamGraph, Config)} method to initialize the transformation logic
* from all input streams to output streams. A simple user code example is shown below:
* </p>
*
* <pre>{@code
* public class PageViewCounterExample implements StreamApplication {
* // max timeout is 60 seconds
* private static final MAX_TIMEOUT = 60000;
*
* public void init(StreamGraph graph, Config config) {
* MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
* OutputStream<String, PageViewEvent, PageViewEvent> pageViewEventFilteredStream = graph
* .getOutputStream("pageViewEventFiltered", m -> m.memberId, m -> m);
*
* pageViewEvents
* .filter(m -> !(m.getMessage().getEventTime() < System.currentTimeMillis() - MAX_TIMEOUT))
* .sendTo(pageViewEventFilteredStream);
* }
*
* // local execution mode
* public static void main(String[] args) {
* CommandLine cmdLine = new CommandLine();
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
* PageViewCounterExample userApp = new PageViewCounterExample();
* ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
* localRunner.run(userApp);
* }
*
* }
* }</pre>
*
*/
@InterfaceStability.Unstable
public interface StreamApplication {
static final String APP_CLASS_CONFIG = "app.class";

/**
* Users are required to implement this abstract method to initialize the processing logic of the application, in terms
Expand All @@ -38,4 +70,5 @@ public interface StreamApplication {
* @param config the {@link Config} of the application
*/
void init(StreamGraph graph, Config config);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public interface MessageStream<M> {
* @param <TM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
<TM> MessageStream<TM> map(MapFunction<M, TM> mapFn);
<TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn);

/**
* Applies the provided 1:n function to transform a message in this {@link MessageStream}
Expand All @@ -60,7 +60,7 @@ public interface MessageStream<M> {
* @param <TM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
<TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
<TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn);

/**
* Applies the provided function to messages in this {@link MessageStream} and returns the
Expand All @@ -72,7 +72,7 @@ public interface MessageStream<M> {
* @param filterFn the predicate to filter messages from this {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
MessageStream<M> filter(FilterFunction<M> filterFn);
MessageStream<M> filter(FilterFunction<? super M> filterFn);

/**
* Allows sending messages in this {@link MessageStream} to an output system using the provided {@link SinkFunction}.
Expand All @@ -83,7 +83,7 @@ public interface MessageStream<M> {
*
* @param sinkFn the function to send messages in this stream to an external system
*/
void sink(SinkFunction<M> sinkFn);
void sink(SinkFunction<? super M> sinkFn);

/**
* Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
Expand Down Expand Up @@ -120,10 +120,10 @@ public interface MessageStream<M> {
* @param ttl the ttl for messages in each stream
* @param <K> the type of join key
* @param <OM> the type of messages in the other stream
* @param <RM> the type of messages resulting from the {@code joinFn}
* @param <TM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
<K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn, Duration ttl);
<K, OM, TM> MessageStream<TM> join(MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFn, Duration ttl);

/**
* Merge all {@code otherStreams} with this {@link MessageStream}.
Expand All @@ -133,7 +133,7 @@ public interface MessageStream<M> {
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
* @return the merged {@link MessageStream}
*/
MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);
MessageStream<M> merge(Collection<MessageStream<? extends M>> otherStreams);

/**
* Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
Expand All @@ -144,6 +144,6 @@ public interface MessageStream<M> {
* @param <K> the type of output message key and partition key
* @return the repartitioned {@link MessageStream}
*/
<K> MessageStream<M> partitionBy(Function<M, K> keyExtractor);
<K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor);

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface StreamGraph {
* @param <M> the type of message in the input {@link MessageStream}
* @return the input {@link MessageStream}
*/
<K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<K, V, M> msgBuilder);
<K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder);

/**
* Gets the {@link OutputStream} corresponding to the logical {@code streamId}.
Expand All @@ -54,7 +54,7 @@ public interface StreamGraph {
* @return the output {@link MessageStream}
*/
<K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
Function<M, K> keyExtractor, Function<M, V> msgExtractor);
Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor);

/**
* Sets the {@link ContextManager} for this {@link StreamGraph}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ private Windows() { }
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function.
*/
public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<M, K> keyFn, Duration interval,
Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) {
public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval,
Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) {

Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
return new WindowInternal<M, K, WV>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.TUMBLING);
return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
(Function<M, K>) keyFn, null, WindowType.TUMBLING);
}


Expand All @@ -147,10 +148,10 @@ public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<M, K> key
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval) {
public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval) {
FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();

Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
Supplier<Collection<M>> initialValue = ArrayList::new;
return keyedTumblingWindow(keyFn, interval, initialValue, aggregator);
}

Expand All @@ -175,10 +176,11 @@ public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M,
* @param <WV> the type of the {@link WindowPane} output value
* @return the created {@link Window} function
*/
public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<WV> initialValue,
FoldLeftFunction<M, WV> foldFn) {
public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<? extends WV> initialValue,
FoldLeftFunction<? super M, WV> foldFn) {
Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration));
return new WindowInternal<>(defaultTrigger, initialValue, foldFn, null, null, WindowType.TUMBLING);
return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
null, null, WindowType.TUMBLING);
}

/**
Expand All @@ -203,7 +205,7 @@ public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supp
public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) {
FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();

Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
Supplier<Collection<M>> initialValue = ArrayList::new;
return tumblingWindow(duration, initialValue, aggregator);
}

Expand Down Expand Up @@ -235,10 +237,11 @@ public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duratio
* @param <WV> the type of the output value in the {@link WindowPane}
* @return the created {@link Window} function
*/
public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap,
Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) {
public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap,
Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) {
Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
return new WindowInternal<>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.SESSION);
return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn, (Function<M, K>) keyFn,
null, WindowType.SESSION);
}

/**
Expand All @@ -265,11 +268,11 @@ public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyF
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) {
public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap) {

FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();

Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
Supplier<Collection<M>> initialValue = ArrayList::new;
return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ public interface TaskContext {
/**
* Method to allow user to return customized context
*
* @param <T> the type of user-defined task context
* @return user-defined task context object
*/
default <T> T getUserDefinedContext() {
default Object getUserDefinedContext() {
return null;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ApplicationConfig extends MapConfig {
public static final String APP_COORDINATION_SERVICE_FACTORY_CLASS = "app.coordination.service.factory.class";
public static final String APP_NAME = "app.name";
public static final String APP_ID = "app.id";
public static final String APP_CLASS = "app.class";

public ApplicationConfig(Config config) {
super(config);
Expand All @@ -67,6 +68,10 @@ public String getAppId() {
return get(APP_ID, get(JobConfig.JOB_ID(), "1"));
}

public String getAppClass() {
return get(APP_CLASS, null);
}

@Deprecated
public String getProcessorId() {
return get(PROCESSOR_ID, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,31 @@ public MessageStreamImpl(StreamGraphImpl graph) {
}

@Override
public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) {
public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
OperatorSpec<TM> op = OperatorSpecs.createMapOperatorSpec(
mapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
this.registeredOperatorSpecs.add(op);
return op.getNextStream();
}

@Override
public MessageStream<M> filter(FilterFunction<M> filterFn) {
public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(
filterFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
this.registeredOperatorSpecs.add(op);
return op.getNextStream();
}

@Override
public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) {
public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(
flatMapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
this.registeredOperatorSpecs.add(op);
return op.getNextStream();
}

@Override
public void sink(SinkFunction<M> sinkFn) {
public void sink(SinkFunction<? super M> sinkFn) {
SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId());
this.registeredOperatorSpecs.add(op);
}
Expand All @@ -110,22 +110,22 @@ public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {

@Override
public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec(
(WindowInternal<M, K, WV>) window, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
this.registeredOperatorSpecs.add(wndOp);
return wndOp.getNextStream();
}

@Override
public <K, JM, RM> MessageStream<RM> join(
MessageStream<JM> otherStream, JoinFunction<K, M, JM, RM> joinFn, Duration ttl) {
MessageStreamImpl<RM> nextStream = new MessageStreamImpl<>(this.graph);
public <K, OM, TM> MessageStream<TM> join(
MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFn, Duration ttl) {
MessageStreamImpl<TM> nextStream = new MessageStreamImpl<>(this.graph);

PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn = new PartialJoinFunction<K, M, JM, RM>() {
private KeyValueStore<K, PartialJoinMessage<M>> thisStreamState;
PartialJoinFunction<K, M, OM, TM> thisPartialJoinFn = new PartialJoinFunction<K, M, OM, TM>() {
private KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<M>> thisStreamState;

@Override
public RM apply(M m, JM jm) {
public TM apply(M m, OM jm) {
return joinFn.apply(m, jm);
}

Expand All @@ -148,21 +148,21 @@ public void init(Config config, TaskContext context) {
}
};

PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn = new PartialJoinFunction<K, JM, M, RM>() {
private KeyValueStore<K, PartialJoinMessage<JM>> otherStreamState;
PartialJoinFunction<K, OM, M, TM> otherPartialJoinFn = new PartialJoinFunction<K, OM, M, TM>() {
private KeyValueStore<K, PartialJoinMessage<OM>> otherStreamState;

@Override
public RM apply(JM om, M m) {
public TM apply(OM om, M m) {
return joinFn.apply(m, om);
}

@Override
public K getKey(JM message) {
public K getKey(OM message) {
return joinFn.getSecondKey(message);
}

@Override
public KeyValueStore<K, PartialJoinMessage<JM>> getState() {
public KeyValueStore<K, PartialJoinMessage<OM>> getState() {
return otherStreamState;
}

Expand All @@ -175,15 +175,15 @@ public void init(Config config, TaskContext taskContext) {
this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(
thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));

((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs
((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs
.add(OperatorSpecs.createPartialJoinOperatorSpec(
otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));

return nextStream;
}

@Override
public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
public MessageStream<M> merge(Collection<MessageStream<? extends M>> otherStreams) {
MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);

otherStreams.add(this);
Expand All @@ -193,7 +193,7 @@ public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
}

@Override
public <K> MessageStream<M> partitionBy(Function<M, K> keyExtractor) {
public <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor) {
int opId = this.graph.getNextOpId();
String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
MessageStreamImpl<M> intermediateStream =
Expand Down
Loading

0 comments on commit c91da78

Please sign in to comment.