diff --git a/examples/kotlin/build.gradle b/examples/kotlin/build.gradle index 3abea64b03cb..0aa3dc257b09 100644 --- a/examples/kotlin/build.gradle +++ b/examples/kotlin/build.gradle @@ -19,7 +19,7 @@ import groovy.json.JsonOutput plugins { id 'org.apache.beam.module' - id 'org.jetbrains.kotlin.jvm' version '1.3.72' + id 'org.jetbrains.kotlin.jvm' version '1.6.10' } applyJavaNature(exportJavadoc: false, automaticModuleName: 'org.apache.beam.examples.kotlin') @@ -46,7 +46,7 @@ configurations.sparkRunnerPreCommit { exclude group: "org.slf4j", module: "slf4j-jdk14" } -def kotlin_version = "1.4.32" +def kotlin_version = "1.6.10" dependencies { implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt index f06b5c82d521..601fe2191d0b 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt @@ -45,13 +45,12 @@ class WriteOneFilePerWindow(private val filenamePrefix: String, private val numS override fun expand(input: PCollection): PDone { val resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix) - var write = TextIO.write() + val write = TextIO.write() .to(PerWindowFiles(resource)) .withTempDirectory(resource.currentDirectory) .withWindowedWrites() - write = numShards?.let { write.withNumShards(it) } ?: write - return input.apply(write) + return input.apply(numShards?.let { write.withNumShards(it) } ?: write) } /** diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt index 65dbbd867bb4..f743dd1a7f70 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/DistinctExample.kt @@ -71,8 +71,6 @@ object DistinctExample { override fun create(options: PipelineOptions): String { options.tempLocation.let { return GcsPath.fromUri(it).resolve("deduped.txt").toString() - } ?: run { - throw IllegalArgumentException("Must specify --output or --tempLocation") } } } diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt index 62ef705f569b..3fdc71d0b58a 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt @@ -111,7 +111,7 @@ object Snippets { .apply>( MapElements.into(TypeDescriptors.doubles()) .via(SerializableFunction { - it["max_temperature"] as Double + it["max_temperature"] as Double? }) ) // [END BigQueryReadTable] @@ -121,7 +121,7 @@ object Snippets { val tableSpec = "clouddataflow-readonly:samples.weather_stations" // [START BigQueryReadFunction] val maxTemperatures = pipeline.apply( - BigQueryIO.read { it.record["max_temperature"] as Double } + BigQueryIO.read { it.record["max_temperature"] as Double? } .from(tableSpec) .withCoder(DoubleCoder.of())) // [END BigQueryReadFunction] @@ -130,7 +130,7 @@ object Snippets { run { // [START BigQueryReadQuery] val maxTemperatures = pipeline.apply( - BigQueryIO.read { it.record["max_temperature"] as Double } + BigQueryIO.read { it.record["max_temperature"] as Double? } .fromQuery( "SELECT max_temperature FROM [clouddataflow-readonly:samples.weather_stations]") .withCoder(DoubleCoder.of())) @@ -140,7 +140,7 @@ object Snippets { run { // [START BigQueryReadQueryStdSQL] val maxTemperatures = pipeline.apply( - BigQueryIO.read { it.record["max_temperature"] as Double } + BigQueryIO.read { it.record["max_temperature"] as Double? } .fromQuery( "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`") .usingStandardSql()