From 36b2f23bb706dfa1cc75fcddf5866cd66e6de623 Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Wed, 10 May 2017 15:10:02 -0700 Subject: [PATCH] SAMZA-1277: Add a static merge() operator that takes all streams to merge as input Also updated documentation for join and partitionBy. Author: Prateek Maheshwari Reviewers: Jacob Maes Closes #182 from prateekm/documentation-cleanup --- .../apache/samza/operators/MessageStream.java | 44 ++++++++++++-- .../apache/samza/example/MergeExample.java | 52 +++++++++++++++++ .../operators/TestMessageStreamImpl.java | 58 ++++++++++++++++++- 3 files changed, 147 insertions(+), 7 deletions(-) create mode 100644 samza-core/src/test/java/org/apache/samza/example/MergeExample.java diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index 6d5b784f63f0b..b081869a03aa3 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -28,6 +28,7 @@ import org.apache.samza.operators.windows.WindowPane; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.function.Function; @@ -104,7 +105,7 @@ public interface MessageStream { *

* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows. *

- * Note: As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts. + * Warning: As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts. * * @param window the window to group and process messages from this {@link MessageStream} * @param the type of key in the message in this {@link MessageStream}. If a key is specified, @@ -121,7 +122,9 @@ public interface MessageStream { * Messages in each stream are retained for the provided {@code ttl} and join results are * emitted as matches are found. *

- * Note: As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts. + * Both inputs being joined must have the same number of partitions, and should be partitioned by the join key. + *

+ * Warning: As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts. * * @param otherStream the other {@link MessageStream} to be joined with * @param joinFn the function to join messages from this and the other {@link MessageStream} @@ -136,6 +139,8 @@ MessageStream join(MessageStream otherStream, /** * Merges all {@code otherStreams} with this {@link MessageStream}. + *

+ * The merged stream contains messages from all streams in the order they arrive. * * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream} * @return the merged {@link MessageStream} @@ -143,11 +148,38 @@ MessageStream join(MessageStream otherStream, MessageStream merge(Collection> otherStreams); /** - * Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes - * them as an input {@link MessageStream} again. Uses keys returned by the {@code keyExtractor} as the partition key. + * Merges all {@code streams}. + *

+ * The merged {@link MessageStream} contains messages from all {@code streams} in the order they arrive. + * + * @param streams {@link MessageStream}s to be merged + * @return the merged {@link MessageStream} + * @throws IllegalArgumentException if {@code streams} is empty + */ + static MessageStream mergeAll(Collection> streams) { + if (streams.isEmpty()) { + throw new IllegalArgumentException("No streams to merge."); + } + ArrayList> messageStreams = new ArrayList<>((Collection>) streams); + MessageStream firstStream = messageStreams.remove(0); + return firstStream.merge(messageStreams); + } + + /** + * Re-partitions this {@link MessageStream} using keys from the {@code keyExtractor} by creating a new + * intermediate stream on the {@code job.default.system}. This intermediate stream is both an output and + * input to the job. + *

+ * The key and message Serdes configured for the default system must be able to serialize and deserialize + * types K and M respectively. *

- * Note: Repartitioned streams are created automatically in the default system. The key and message Serdes - * configured for the default system must be able to serialize and deserialize types K and M respectively. + * The number of partitions for this intermediate stream is determined as follows: + * If the stream is an eventual input to a {@link #join}, and the number of partitions for the other stream is known, + * then number of partitions for this stream is set to the number of partitions in the other input stream. + * Else, the number of partitions is set to the value of the {@code job.intermediate.stream.partitions} + * configuration, if present. + * Else, the number of partitions is set to to the max of number of partitions for all input and output streams + * (excluding intermediate streams). * * @param keyExtractor the {@link Function} to extract the output message key and partition key from * the input message diff --git a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java new file mode 100644 index 0000000000000..9fbf6d1d8e2d7 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import com.google.common.collect.ImmutableList; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.util.CommandLine; + +public class MergeExample implements StreamApplication { + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream inputStream1 = graph.getInputStream("inputStream1", (k, m) -> m); + MessageStream inputStream2 = graph.getInputStream("inputStream2", (k, m) -> m); + MessageStream inputStream3 = graph.getInputStream("inputStream3", (k, m) -> m); + OutputStream outputStream = graph + .getOutputStream("outputStream", Object::hashCode, m -> m); + + MessageStream.mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3)) + .sendTo(outputStream); + } + + // local execution mode + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + LocalApplicationRunner localRunner = new LocalApplicationRunner(config); + localRunner.run(new MergeExample()); + } +} \ No newline at end of file diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index 7402b4a17cba2..b2a5e2af86b73 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -22,7 +22,11 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.operators.data.*; +import org.apache.samza.operators.data.MessageType; +import org.apache.samza.operators.data.TestExtOutputMessageEnvelope; +import org.apache.samza.operators.data.TestInputMessageEnvelope; +import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; @@ -39,6 +43,7 @@ import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.time.Duration; import java.util.ArrayList; @@ -54,6 +59,8 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -276,6 +283,7 @@ public void testMergeWithRelaxedTypes() { new MessageStreamImpl(mockGraph), new MessageStreamImpl(mockGraph)); + // should compile MessageStream mergeOutput = input1.merge(others); validateMergeOperator(input1, mergeOutput); @@ -289,6 +297,8 @@ class MessageEnvelope { } MessageStream> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class)); MessageStream> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class)); Collection>> otherStreams = ImmutableList.of(ms2, ms3); + + // should compile ms1.merge(otherStreams); } @@ -305,6 +315,52 @@ private void validateMergeOperator(MessageStream mergeSourc assertEquals(outputs.iterator().next(), mockMsg); } + @Test + public void testMergeAll() { + MessageStream input1 = mock(MessageStreamImpl.class); + MessageStream input2 = mock(MessageStreamImpl.class); + MessageStream input3 = mock(MessageStreamImpl.class); + + MessageStream.mergeAll(ImmutableList.of(input1, input2, input3)); + + ArgumentCaptor otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class); + verify(input1, times(1)).merge(otherStreamsCaptor.capture()); + assertEquals(2, otherStreamsCaptor.getValue().size()); + assertTrue(otherStreamsCaptor.getValue().contains(input2)); + assertTrue(otherStreamsCaptor.getValue().contains(input3)); + } + + @Test + public void testMergeAllWithRelaxedTypes() { + MessageStreamImpl input1 = mock(MessageStreamImpl.class); + MessageStreamImpl input2 = mock(MessageStreamImpl.class); + Collection> streams = ImmutableList.of(input1, input2); + + // should compile + MessageStream.mergeAll(streams); + ArgumentCaptor otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class); + verify(input1, times(1)).merge(otherStreamsCaptor.capture()); + assertEquals(1, otherStreamsCaptor.getValue().size()); + assertTrue(otherStreamsCaptor.getValue().contains(input2)); + } + + @Test + public void testMergeAllWithNestedTypes() { + class MessageEnvelope { } + MessageStream> input1 = mock(MessageStreamImpl.class); + MessageStream> input2 = mock(MessageStreamImpl.class); + MessageStream> input3 = mock(MessageStreamImpl.class); + + // should compile + MessageStream.mergeAll(ImmutableList.of(input1, input2, input3)); + + ArgumentCaptor otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class); + verify(input1, times(1)).merge(otherStreamsCaptor.capture()); + assertEquals(2, otherStreamsCaptor.getValue().size()); + assertTrue(otherStreamsCaptor.getValue().contains(input2)); + assertTrue(otherStreamsCaptor.getValue().contains(input3)); + } + @Test public void testPartitionBy() { Map map = new HashMap<>();