Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drop scala 2.12 support, add deprecations, remove returns usage #176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
java: [11]
scala: [2.12.20, 2.13.15, 3.3.4]
scala: [2.13.15, 3.3.4]
flink: [1.18.1, 1.19.1]
include:
- scala: 3.3.4
Expand Down
26 changes: 21 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Scala 2.12/2.13/3.x API for Apache Flink
# Scala 2.13/3.x API for Apache Flink

[![CI Status](https://github.com/flink-extended/flink-scala-api/workflows/CI/badge.svg)](https://github.com/flinkextended/flink-scala-api/actions)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.12/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.12)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.13/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.13)
[![License: Apache 2](https://img.shields.io/badge/License-Apache2-green.svg)](https://opensource.org/licenses/Apache-2.0)
![Last commit](https://img.shields.io/github/last-commit/flink-extended/flink-scala-api)
![Last release](https://img.shields.io/github/release/flink-extended/flink-scala-api)

This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.12, 2.13 and 3.x.
This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.13 and 3.x.

## Migration

Expand All @@ -26,19 +26,35 @@ import org.apache.flinkx.api.serializers._

## Usage

`flink-scala-api` is released to Maven-central for 2.12, 2.13 and 3. For SBT, add this snippet to `build.sbt`:
`flink-scala-api` is released to Maven-central for 2.13 and 3. For SBT, add this snippet to `build.sbt`:
```scala
libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.1.6"
```

For Ammonite:
## For Ammonite

```scala
import $ivy.`org.flinkextended::flink-scala-api:1.18.1_1.1.6`
// you might need flink-client too in order to run in the REPL
import $ivy.`org.apache.flink:flink-clients:1.18.1`
```

## For Scala 2.12

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note on Scala 2.12 release has been added.

If you want first to migrate to org.flinkextended:flink-scala-api staying on Scala 2.12, you can use the last build for Scala 2.12:

```scala
libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.2.0"
// or
"org.flinkextended" %% "flink-scala-api" % "1.19.1_1.2.0"
// or
"org.flinkextended" %% "flink-scala-api" % "1.20.0_1.2.0"
```

Build for Scala 2.12 is no longer published.

## SBT Project Template

If you want to create new project easily check this __Giter8 template__ out: [novakov-alexey/flink-scala-api.g8](https://github.com/novakov-alexey/flink-scala-api.g8)

## Supported Flink versions
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ lazy val flinkVersion = System.getProperty("flinkVersion", "1.18.1")
lazy val root = (project in file("."))
.aggregate(`scala-api`, `examples`)
.settings(
scalaVersion := rootScalaVersion,
publish / skip := true
)

Expand All @@ -17,7 +18,7 @@ lazy val `scala-api` = (project in file("modules/scala-api"))
.settings(
name := "flink-scala-api",
scalaVersion := rootScalaVersion,
crossScalaVersions := Seq("2.12.20", "2.13.15", rootScalaVersion),
crossScalaVersions := Seq("2.13.15", rootScalaVersion),
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-streaming-java" % flinkVersion,
"org.apache.flink" % "flink-java" % flinkVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait CommonTaggedDerivation[TypeClass[_]]:
)*
)

@annotation.nowarn
val caseClass = new CaseClass[Typeclass, A](
typeInfo[A],
isObject[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.annotation.tailrec

/** A cleaner that renders closures serializable if they can be done so safely.
*/
Expand Down Expand Up @@ -63,20 +64,27 @@ object ClosureCleaner {
// not a good idea (whereas we can clone closure objects just fine since we
// understand how all their fields are used).
private def getOuterClassesAndObjects(obj: AnyRef): (List[Class[_]], List[AnyRef]) = {
for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
f.setAccessible(true)
val outer = f.get(obj)
// The outer pointer may be null if we have cleaned this closure before
if (outer != null) {
if (isClosure(f.getType)) {
val recurRet = getOuterClassesAndObjects(outer)
return (f.getType :: recurRet._1, outer :: recurRet._2)
} else {
return (f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
}

@tailrec
def loop(fields: List[Field]): (List[Class[_]], List[AnyRef]) =
fields match {
case f :: tail =>
f.setAccessible(true)
val outer = f.get(obj)
// The outer pointer may be null if we have cleaned this closure before
if (outer != null) {
if (isClosure(f.getType)) {
val recurRet = getOuterClassesAndObjects(outer)
(f.getType :: recurRet._1, outer :: recurRet._2)
} else {
(f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
}
} else loop(tail)
case Nil => (Nil, Nil)
}
}
(Nil, Nil)

val fields = obj.getClass.getDeclaredFields.filter(_.getName == "$outer").toList
loop(fields)
}

/** Return a list of classes that represent closures enclosed in the given closure object.
Expand All @@ -89,17 +97,20 @@ object ClosureCleaner {
if (cr != null) {
val set = mutable.Set.empty[Class[_]]
cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) {
for (cls <- set.toSet -- seen) {
seen += cls
stack.push(cls)
}
}
}
(seen - obj.getClass).toList
(seen.toSet - obj.getClass).toList
}

/** Initializes the accessed fields for outer classes and their super classes. */
private def initAccessedFields(accessedFields: mutable.Map[Class[_], mutable.Set[String]], outerClasses: Seq[Class[_]]): Unit = {
private def initAccessedFields(
accessedFields: mutable.Map[Class[_], mutable.Set[String]],
outerClasses: Seq[Class[_]]
): Unit = {
for (cls <- outerClasses) {
var currentClass = cls
assert(currentClass != null, "The outer class can't be null.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return
* The resulting data stream.
*/
def flatMap[R: TypeInformation](fun1: IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
def flatMap[R: TypeInformation](fun1: IN1 => IterableOnce[R], fun2: IN2 => IterableOnce[R]): DataStream[R] = {

if (fun1 == null || fun2 == null) {
throw new NullPointerException("FlatMap functions must not be null.")
Expand All @@ -198,8 +198,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
val cleanFun2 = clean(fun2)

val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).foreach(out.collect) }
def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).foreach(out.collect) }
def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).iterator.foreach(out.collect) }
def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).iterator.foreach(out.collect) }
}

flatMap(flatMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,13 @@ class DataStream[T](stream: JavaStream[T]) {

/** Creates a new DataStream by applying the given function to every element and flattening the results.
*/
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {
def flatMap[R: TypeInformation](fun: T => IterableOnce[R]): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
val cleanFun = clean(fun)
val flatMapper = new FlatMapFunction[T, R] {
def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).foreach(out.collect) }
def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).iterator.foreach(out.collect) }
}
flatMap(flatMapper)
}
Expand Down Expand Up @@ -690,8 +690,10 @@ class DataStream[T](stream: JavaStream[T]) {
* For cases where the timestamps are not monotonously increasing, use the more general methods
* [[assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)]] and
* [[assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)]].
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
*/
@PublicEvolving
@Deprecated
def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = {
val cleanExtractor = clean(extractor)
val extractorFunction = new AscendingTimestampExtractor[T] {
Expand Down Expand Up @@ -754,8 +756,12 @@ class DataStream[T](stream: JavaStream[T]) {
def printToErr(sinkIdentifier: String): DataStreamSink[T] = stream.printToErr(sinkIdentifier)

/** Writes a DataStream using the given [[OutputFormat]].
*/
@PublicEvolving
*
* @deprecated Please use the {@link
* org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} explicitly
* using the {@link #addSink(SinkFunction)} method.
*/
@Deprecated
def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T] = {
stream.writeUsingOutputFormat(format)
}
Expand Down Expand Up @@ -790,7 +796,10 @@ class DataStream[T](stream: JavaStream[T]) {

/** Adds the given sink to this DataStream. Only streams with sinks added will be executed once the
* StreamExecutionEnvironment.execute(...) method is called.
*
* @deprecated Please use the sinkTo(sink: org.apache.flink.api.connector.sink2.Sink[T])
*/
@Deprecated
def sinkTo(sink: org.apache.flink.api.connector.sink.Sink[T, _, _, _]): DataStreamSink[T] =
stream.sinkTo(sink)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* Note that the user state object needs to be serializable.
*/
def flatMapWithState[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (TraversableOnce[R], Option[S])
fun: (T, Option[S]) => (IterableOnce[R], Option[S])
): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Flatmap function must not be null.")
Expand All @@ -470,12 +470,12 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(javaStream.getExecutionConfig)

val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T, TraversableOnce[R], S] {
val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T, IterableOnce[R], S] {

override val stateSerializer: TypeSerializer[S] = serializer

override def flatMap(in: T, out: Collector[R]): Unit = {
applyWithState(in, cleanFun).foreach(out.collect)
applyWithState(in, cleanFun).iterator.foreach(out.collect)
}
}

Expand All @@ -488,8 +488,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* Name under which to the publish the queryable state instance
* @return
* Queryable state instance
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
* future Flink major version.
*/
@PublicEvolving
@Deprecated
def asQueryableState(queryableStateName: String): QueryableStateStream[K, T] = {
val stateDescriptor = new ValueStateDescriptor(queryableStateName, dataType.createSerializer(executionConfig))

Expand All @@ -504,8 +506,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* State descriptor to create state instance from
* @return
* Queryable state instance
*
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
* future Flink major version.
*/
@PublicEvolving
@Deprecated
def asQueryableState(
queryableStateName: String,
stateDescriptor: ValueStateDescriptor[T]
Expand All @@ -529,8 +534,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* State descriptor to create state instance from
* @return
* Queryable state instance
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
* future Flink major version.
*/
@PublicEvolving
@Deprecated
def asQueryableState(
queryableStateName: String,
stateDescriptor: ReducingStateDescriptor[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,19 +487,29 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {

/** Creates a DataStream that represents the Strings produced by reading the given file line wise. The file will be
* read with the system's default character set.
*
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
*/
@Deprecated
def readTextFile(filePath: String): DataStream[String] =
asScalaStream(javaEnv.readTextFile(filePath))

/** Creates a data stream that represents the Strings produced by reading the given file line wise. The character set
* with the given name will be used to read the files.
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
*/
@Deprecated
def readTextFile(filePath: String, charsetName: String): DataStream[String] =
asScalaStream(javaEnv.readTextFile(filePath, charsetName))

/** Reads the given file with the given input format. The file path should be passed as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
*/
@Deprecated
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataStream[T] =
asScalaStream(javaEnv.readFile(inputFormat, filePath))

Expand All @@ -526,8 +536,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
* @return
* The data stream that represents the data read from the given file
*
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
*/
@PublicEvolving
@Deprecated
def readFile[T: TypeInformation](
inputFormat: FileInputFormat[T],
filePath: String,
Expand Down Expand Up @@ -563,7 +576,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* have a parallelism of 1. To enable parallel execution, the user defined source should implement
* ParallelSourceFunction or extend RichParallelSourceFunction. In these cases the resulting source will have the
* parallelism of the environment. To change this afterwards call DataStreamSource.setParallelism(int)
*
* @deprecated This method relies on the {@link
* org.apache.flink.streaming.api.functions.source.SourceFunction} API, which is due to be
* removed. Use the {@link #fromSource[TypeInformation](Source, WatermarkStrategy, String)}
* method based on the new {@link org.apache.flink.api.connector.source.Source} API instead.
*/
@Deprecated
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
require(function != null, "Function must not be null.")

Expand All @@ -573,7 +592,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}

/** Create a DataStream using a user defined source function for arbitrary source functionality.
*/
*
* @deprecated This method relies on the {@link
* org.apache.flink.streaming.api.functions.source.SourceFunction} API, which is due to be
* removed. Use the {@link #fromSource[TypeInformation](Source, WatermarkStrategy, String)}
* method based on the new {@link org.apache.flink.api.connector.source.Source} API instead.
*/
@Deprecated
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
require(function != null, "Function must not be null.")
val sourceFunction = new SourceFunction[T] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class OnConnectedStream[IN1, IN2](stream: ConnectedStreams[IN1, IN2]) {
*/
@PublicEvolving
def flatMapWith[R: TypeInformation](
flatMap1: IN1 => TraversableOnce[R],
flatMap2: IN2 => TraversableOnce[R]
flatMap1: IN1 => IterableOnce[R],
flatMap2: IN2 => IterableOnce[R]
): DataStream[R] =
stream.flatMap(flatMap1, flatMap2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class OnDataStream[T](stream: DataStream[T]) {
* A dataset of R
*/
@PublicEvolving
def flatMapWith[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] =
def flatMapWith[R: TypeInformation](fun: T => IterableOnce[R]): DataStream[R] =
stream.flatMap(fun)

/** Applies a predicate `fun` to each item of the stream, keeping only those for which the predicate holds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction =>
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/** A wrapper function that exposes a Scala Function3 as a Java AllWindowFunction.
*/
Expand Down
Loading
Loading