Skip to content

Commit

Permalink
Merge pull request #14879: [BEAM-12402] Optimize PCollectionConsumerR…
Browse files Browse the repository at this point in the history
…egistry$MultiplexingMetricTrackingFnDataReceiver by making immutable list.
  • Loading branch information
kennknowles authored May 25, 2021
2 parents ee89faf + bb80c34 commit e52e9e4
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

Expand Down Expand Up @@ -174,13 +175,18 @@ public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollecti
throw new IllegalArgumentException(
String.format("Unknown PCollectionId %s", pCollectionId));
} else if (consumerAndMetadatas.size() == 1) {
if (consumerAndMetadatas.get(0).getConsumer() instanceof HandlesSplits) {
return new SplittingMetricTrackingFnDataReceiver(pcId, consumerAndMetadatas.get(0));
ConsumerAndMetadata consumerAndMetadata = consumerAndMetadatas.get(0);
if (consumerAndMetadata.getConsumer() instanceof HandlesSplits) {
return new SplittingMetricTrackingFnDataReceiver(pcId, consumerAndMetadata);
}
return new MetricTrackingFnDataReceiver(pcId, consumerAndMetadatas.get(0));
return new MetricTrackingFnDataReceiver(pcId, consumerAndMetadata);
} else {
/* TODO(SDF), Consider supporting splitting each consumer individually. This would never come up in the existing SDF expansion, but might be useful to support fused SDF nodes. This would require dedicated delivery of the split results to each of the consumers separately. */
return new MultiplexingMetricTrackingFnDataReceiver(pcId, consumerAndMetadatas);
/* TODO(SDF), Consider supporting splitting each consumer individually. This would never
come up in the existing SDF expansion, but might be useful to support fused SDF nodes.
This would require dedicated delivery of the split results to each of the consumers
separately. */
return new MultiplexingMetricTrackingFnDataReceiver(
pcId, ImmutableList.copyOf(consumerAndMetadatas));
}
});
}
Expand Down

0 comments on commit e52e9e4

Please sign in to comment.