Skip to content

Commit

Permalink
SAMZA-1277: Add a static merge() operator that takes all streams to m…
Browse files Browse the repository at this point in the history
…erge as input

Also updated documentation for join and partitionBy.

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jacob Maes <[email protected]>

Closes apache#182 from prateekm/documentation-cleanup
  • Loading branch information
prateekm authored and Jacob Maes committed May 10, 2017
1 parent 601c11d commit 36b2f23
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.samza.operators.windows.WindowPane;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.function.Function;

Expand Down Expand Up @@ -104,7 +105,7 @@ public interface MessageStream<M> {
* <p>
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
* <p>
* <b>Note:</b> As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts.
* <b>Warning:</b> As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts.
*
* @param window the window to group and process messages from this {@link MessageStream}
* @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
Expand All @@ -121,7 +122,9 @@ public interface MessageStream<M> {
* Messages in each stream are retained for the provided {@code ttl} and join results are
* emitted as matches are found.
* <p>
* <b>Note:</b> As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts.
* Both inputs being joined must have the same number of partitions, and should be partitioned by the join key.
* <p>
* <b>Warning:</b> As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts.
*
* @param otherStream the other {@link MessageStream} to be joined with
* @param joinFn the function to join messages from this and the other {@link MessageStream}
Expand All @@ -136,18 +139,47 @@ <K, JM, OM> MessageStream<OM> join(MessageStream<JM> otherStream,

/**
* Merges all {@code otherStreams} with this {@link MessageStream}.
* <p>
* The merged stream contains messages from all streams in the order they arrive.
*
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
* @return the merged {@link MessageStream}
*/
MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams);

/**
* Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
* them as an input {@link MessageStream} again. Uses keys returned by the {@code keyExtractor} as the partition key.
* Merges all {@code streams}.
* <p>
* The merged {@link MessageStream} contains messages from all {@code streams} in the order they arrive.
*
* @param streams {@link MessageStream}s to be merged
* @return the merged {@link MessageStream}
* @throws IllegalArgumentException if {@code streams} is empty
*/
static <T> MessageStream<T> mergeAll(Collection<? extends MessageStream<? extends T>> streams) {
if (streams.isEmpty()) {
throw new IllegalArgumentException("No streams to merge.");
}
ArrayList<MessageStream<T>> messageStreams = new ArrayList<>((Collection<MessageStream<T>>) streams);
MessageStream<T> firstStream = messageStreams.remove(0);
return firstStream.merge(messageStreams);
}

/**
* Re-partitions this {@link MessageStream} using keys from the {@code keyExtractor} by creating a new
* intermediate stream on the {@code job.default.system}. This intermediate stream is both an output and
* input to the job.
* <p>
* The key and message Serdes configured for the default system must be able to serialize and deserialize
* types K and M respectively.
* <p>
* <b>Note</b>: Repartitioned streams are created automatically in the default system. The key and message Serdes
* configured for the default system must be able to serialize and deserialize types K and M respectively.
* The number of partitions for this intermediate stream is determined as follows:
* If the stream is an eventual input to a {@link #join}, and the number of partitions for the other stream is known,
* then number of partitions for this stream is set to the number of partitions in the other input stream.
* Else, the number of partitions is set to the value of the {@code job.intermediate.stream.partitions}
* configuration, if present.
* Else, the number of partitions is set to to the max of number of partitions for all input and output streams
* (excluding intermediate streams).
*
* @param keyExtractor the {@link Function} to extract the output message key and partition key from
* the input message
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.example;

import com.google.common.collect.ImmutableList;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.util.CommandLine;

public class MergeExample implements StreamApplication {

@Override
public void init(StreamGraph graph, Config config) {
MessageStream<Object> inputStream1 = graph.getInputStream("inputStream1", (k, m) -> m);
MessageStream<Object> inputStream2 = graph.getInputStream("inputStream2", (k, m) -> m);
MessageStream<Object> inputStream3 = graph.getInputStream("inputStream3", (k, m) -> m);
OutputStream<Integer, Object, Object> outputStream = graph
.getOutputStream("outputStream", Object::hashCode, m -> m);

MessageStream.mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3))
.sendTo(outputStream);
}

// local execution mode
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
localRunner.run(new MergeExample());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.data.*;
import org.apache.samza.operators.data.MessageType;
import org.apache.samza.operators.data.TestExtOutputMessageEnvelope;
import org.apache.samza.operators.data.TestInputMessageEnvelope;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
Expand All @@ -39,6 +43,7 @@
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -54,6 +59,8 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


Expand Down Expand Up @@ -276,6 +283,7 @@ public void testMergeWithRelaxedTypes() {
new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph),
new MessageStreamImpl<TestMessageEnvelope>(mockGraph));

// should compile
MessageStream<TestMessageEnvelope> mergeOutput = input1.merge(others);
validateMergeOperator(input1, mergeOutput);

Expand All @@ -289,6 +297,8 @@ class MessageEnvelope<TM> { }
MessageStream<MessageEnvelope<T>> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
MessageStream<MessageEnvelope<T>> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
Collection<MessageStream<MessageEnvelope<T>>> otherStreams = ImmutableList.of(ms2, ms3);

// should compile
ms1.merge(otherStreams);
}

Expand All @@ -305,6 +315,52 @@ private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSourc
assertEquals(outputs.iterator().next(), mockMsg);
}

@Test
public void testMergeAll() {
MessageStream<TestMessageEnvelope> input1 = mock(MessageStreamImpl.class);
MessageStream<TestMessageEnvelope> input2 = mock(MessageStreamImpl.class);
MessageStream<TestMessageEnvelope> input3 = mock(MessageStreamImpl.class);

MessageStream.mergeAll(ImmutableList.of(input1, input2, input3));

ArgumentCaptor<Collection> otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class);
verify(input1, times(1)).merge(otherStreamsCaptor.capture());
assertEquals(2, otherStreamsCaptor.getValue().size());
assertTrue(otherStreamsCaptor.getValue().contains(input2));
assertTrue(otherStreamsCaptor.getValue().contains(input3));
}

@Test
public void testMergeAllWithRelaxedTypes() {
MessageStreamImpl<TestInputMessageEnvelope> input1 = mock(MessageStreamImpl.class);
MessageStreamImpl<TestMessageEnvelope> input2 = mock(MessageStreamImpl.class);
Collection<MessageStream<? extends TestMessageEnvelope>> streams = ImmutableList.of(input1, input2);

// should compile
MessageStream.mergeAll(streams);
ArgumentCaptor<Collection> otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class);
verify(input1, times(1)).merge(otherStreamsCaptor.capture());
assertEquals(1, otherStreamsCaptor.getValue().size());
assertTrue(otherStreamsCaptor.getValue().contains(input2));
}

@Test
public <T> void testMergeAllWithNestedTypes() {
class MessageEnvelope<TM> { }
MessageStream<MessageEnvelope<T>> input1 = mock(MessageStreamImpl.class);
MessageStream<MessageEnvelope<T>> input2 = mock(MessageStreamImpl.class);
MessageStream<MessageEnvelope<T>> input3 = mock(MessageStreamImpl.class);

// should compile
MessageStream.mergeAll(ImmutableList.of(input1, input2, input3));

ArgumentCaptor<Collection> otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class);
verify(input1, times(1)).merge(otherStreamsCaptor.capture());
assertEquals(2, otherStreamsCaptor.getValue().size());
assertTrue(otherStreamsCaptor.getValue().contains(input2));
assertTrue(otherStreamsCaptor.getValue().contains(input3));
}

@Test
public void testPartitionBy() {
Map<String, String> map = new HashMap<>();
Expand Down

0 comments on commit 36b2f23

Please sign in to comment.