Skip to content

Commit

Permalink
issue with full list copies in pekko-stream javadsl (#1491)
Browse files Browse the repository at this point in the history
* commit

CollectionUtil

scalafmt

* build issue

* inline

* scala 3 compile issue

* continue use if Util.immutableSeq in Scala 2.12

* Update CollectionUtil.scala
  • Loading branch information
pjfanning authored Sep 30, 2024
1 parent 6c9f1a6 commit 090390a
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream
package javadsl

import scala.collection.immutable

import org.apache.pekko
import pekko.japi.Util.immutableSeq

/**
* INTERNAL API
*
* Utility methods for converting Java collections to Scala collections.
*/
private[javadsl] object CollectionUtil {
@inline def toSeq[T](jlist: java.util.List[T]): immutable.Seq[T] =
immutableSeq(jlist)

@inline def toSeq[T](jiterable: java.lang.Iterable[T]): immutable.Seq[T] =
immutableSeq(jiterable)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream
package javadsl

import scala.collection.immutable

import org.apache.pekko
import pekko.util.ccompat.JavaConverters._

/**
* INTERNAL API
*
* Utility methods for converting Java collections to Scala collections.
*/
private[javadsl] object CollectionUtil {
@inline def toSeq[T](jlist: java.util.List[T]): immutable.Seq[T] =
jlist.asScala.toSeq

@inline def toSeq[T](jiterable: java.lang.Iterable[T]): immutable.Seq[T] =
jiterable.asScala.toSeq
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import pekko.annotation.ApiMayChange
import pekko.dispatch.ExecutionContexts
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.Pair
import pekko.japi.Util
import pekko.japi.function
import pekko.japi.function.Creator
import pekko.stream.{ javadsl, _ }
Expand Down Expand Up @@ -3245,7 +3244,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
Expand Down Expand Up @@ -3327,7 +3326,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.annotation.unchecked.uncheckedVariance
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, Pair, Util }
import pekko.japi.{ function, Pair }
import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
Expand Down Expand Up @@ -273,7 +273,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
*/
def mapConcat[Out2](
f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] =
viaScala(_.mapConcat(elem => Util.immutableSeq(f.apply(elem))))
viaScala(_.mapConcat(elem => f.apply(elem).asScala))

/**
* Apply the given function to each context element (leaving the data elements unchanged).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.pekko
import pekko._
import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status }
import pekko.dispatch.ExecutionContexts
import pekko.japi.{ function, Util }
import pekko.japi.function
import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.LinearTraversalBuilder
Expand Down Expand Up @@ -461,7 +461,7 @@ object Sink {
sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, java.util.List[M]] = {
val seq = if (sinks != null) Util.immutableSeq(sinks).collect {
val seq = if (sinks != null) CollectionUtil.toSeq(sinks).collect {
case sink: Sink[U @unchecked, M @unchecked] => sink.asScala
case other => other
}
Expand Down
23 changes: 11 additions & 12 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

import org.apache.pekko
import org.apache.pekko.stream.impl.fusing.ArraySource
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import pekko.annotation.ApiMayChange
import pekko.dispatch.ExecutionContexts
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, JavaPartialFunction, Pair, Util }
import pekko.japi.{ function, JavaPartialFunction, Pair }
import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava }
import pekko.stream.impl.fusing.ArraySource
import pekko.util.{ unused, _ }
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
Expand Down Expand Up @@ -718,7 +718,7 @@ object Source {
@deprecatedName(Symbol("strategy"))
fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, NotUsed] = {
val seq = if (rest != null) Util.immutableSeq(rest).map(_.asScala) else immutable.Seq()
val seq = if (rest != null) CollectionUtil.toSeq(rest).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => fanInStrategy.apply(num)))
}

Expand All @@ -745,20 +745,19 @@ object Source {
sources: java.util.List[_ <: Graph[SourceShape[T], M]],
fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, java.util.List[M]] = {
val seq = if (sources != null) Util.immutableSeq(sources).collect {
val seq = if (sources != null) CollectionUtil.toSeq(sources).collect {
case source: Source[T @unchecked, M @unchecked] => source.asScala
case other => other
}
else immutable.Seq()
import org.apache.pekko.util.ccompat.JavaConverters._
new Source(scaladsl.Source.combine(seq)(size => fanInStrategy(size)).mapMaterializedValue(_.asJava))
}

/**
* Combine the elements of multiple streams into a stream of lists.
*/
def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
val seq = if (sources != null) Util.immutableSeq(sources).map(_.asScala) else immutable.Seq()
val seq = if (sources != null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
}

Expand All @@ -768,7 +767,7 @@ object Source {
def zipWithN[T, O](
zipper: function.Function[java.util.List[T], O],
sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
val seq = if (sources != null) Util.immutableSeq(sources).map(_.asScala) else immutable.Seq()
val seq = if (sources != null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
}

Expand Down Expand Up @@ -980,7 +979,7 @@ object Source {
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
val seq =
if (sourcesAndPriorities != null)
Util.immutableSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue()))
CollectionUtil.toSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue()))
else
immutable.Seq()
new Source(scaladsl.Source.mergePrioritizedN(seq, eagerComplete))
Expand Down Expand Up @@ -1743,7 +1742,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
Expand Down Expand Up @@ -1823,7 +1822,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
Expand Down Expand Up @@ -2517,7 +2516,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapConcat(elem => Util.immutableSeq(f.apply(elem))))
new Source(delegate.mapConcat(elem => f.apply(elem).asScala))

/**
* Transform each stream element with the help of a state.
Expand Down Expand Up @@ -2670,7 +2669,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat { () =>
val fun = f.create()
elem => Util.immutableSeq(fun(elem))
elem => fun(elem).asScala
})

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import pekko.actor.ClassicActorSystemProvider
import pekko.annotation.ApiMayChange
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.Pair
import pekko.japi.Util
import pekko.japi.function
import pekko.stream._
import pekko.util.ConstantFun
Expand Down Expand Up @@ -267,7 +266,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
* @see [[pekko.stream.javadsl.Source.mapConcat]]
*/
def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Out2, Ctx, Mat] =
viaScala(_.mapConcat(elem => Util.immutableSeq(f.apply(elem))))
viaScala(_.mapConcat(elem => f.apply(elem).asScala))

/**
* Apply the given function to each context element (leaving the data elements unchanged).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, Pair, Util }
import pekko.japi.{ function, Pair }
import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
Expand Down Expand Up @@ -202,7 +202,7 @@ class SubFlow[In, Out, Mat](
*/
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapConcat { elem =>
Util.immutableSeq(f(elem))
f(elem).asScala
})

/**
Expand Down Expand Up @@ -356,7 +356,7 @@ class SubFlow[In, Out, Mat](
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat { () =>
val fun = f.create()
elem => Util.immutableSeq(fun(elem))
elem => fun(elem).asScala
})

/**
Expand Down Expand Up @@ -2048,7 +2048,7 @@ class SubFlow[In, Out, Mat](
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
Expand Down Expand Up @@ -2106,7 +2106,7 @@ class SubFlow[In, Out, Mat](
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubFlow[In, Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, Pair, Util }
import pekko.japi.{ function, Pair }
import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
Expand Down Expand Up @@ -193,7 +193,7 @@ class SubSource[Out, Mat](
*/
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapConcat { elem =>
Util.immutableSeq(f(elem))
f(elem).asScala
})

/**
Expand Down Expand Up @@ -347,7 +347,7 @@ class SubSource[Out, Mat](
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat { () =>
val fun = f.create()
elem => Util.immutableSeq(fun(elem))
elem => fun(elem).asScala
})

/**
Expand Down Expand Up @@ -2022,7 +2022,7 @@ class SubSource[Out, Mat](
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubSource[Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
Expand Down Expand Up @@ -2081,7 +2081,7 @@ class SubSource[Out, Mat](
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubSource[Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
Expand Down
Loading

0 comments on commit 090390a

Please sign in to comment.