From 20c02089102255304658f8d1859d540a1d1a41e1 Mon Sep 17 00:00:00 2001 From: Hector Miuler Malpica Gallegos Date: Thu, 26 Jan 2023 22:22:21 -0500 Subject: [PATCH 1/4] feat(scio-cosmosdb): feat(azure-cosmosdb): Add support for cosmosdb with Core (SQL) API Refs: #4675 --- README.md | 1 + build.sbt | 189 +++++++----------- .../spotify/scio/cosmosdb/CosmosDbIOIT.scala | 93 +++++++++ .../com/spotify/scio/cosmosdb/Utils.scala | 72 +++++++ .../spotify/scio/cosmosdb/CosmosDbIO.scala | 35 ++++ .../com/spotify/scio/cosmosdb/package.scala | 5 + .../cosmosdb/read/CosmosDbBoundedReader.scala | 57 ++++++ .../cosmosdb/read/CosmosDbBoundedSource.scala | 40 ++++ .../scio/cosmosdb/read/CosmosDbRead.scala | 49 +++++ .../scio/cosmosdb/syntax/AllSyntax.scala | 3 + .../cosmosdb/syntax/ScioContextSyntax.scala | 22 ++ 11 files changed, 448 insertions(+), 118 deletions(-) create mode 100644 scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/CosmosDbIOIT.scala create mode 100644 scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/Utils.scala create mode 100644 scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala create mode 100644 scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/package.scala create mode 100644 scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala create mode 100644 scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala create mode 100644 scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala create mode 100644 scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/AllSyntax.scala create mode 100644 scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala diff --git a/README.md b/README.md index fefb119a41..2f999e5636 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ Scio includes the following artifacts: - `scio-avro`: add-on for Avro, can also be used standalone - `scio-google-cloud-platform`: add-on for Google Cloud IO's: BigQuery, Bigtable, Pub/Sub, Datastore, Spanner - `scio-cassandra*`: add-ons for Cassandra +- `scio-cosmosdb`: add-ons for Azure CosmosDB Core (SQL) API - `scio-elasticsearch*`: add-ons for Elasticsearch - `scio-extra`: extra utilities for working with collections, Breeze, etc., best effort support - `scio-jdbc`: add-on for JDBC IO diff --git a/build.sbt b/build.sbt index 364a8b3720..50ae445774 100644 --- a/build.sbt +++ b/build.sbt @@ -86,6 +86,10 @@ val algebraVersion = "2.9.0" val annoy4sVersion = "0.10.0" val annoyVersion = "0.2.6" val breezeVersion = "2.1.0" +val bsonVersion = "4.8.1" +val cosmosVersion = "4.37.1" +val cosmosContainerVersion = "1.17.5" +val scribeVersion = "3.10.7" val caffeineVersion = "2.9.3" val cassandraDriverVersion = "3.11.3" val cassandraVersion = "3.11.13" @@ -156,9 +160,7 @@ ThisBuild / tpolecatDevModeOptions ~= { opts => opts.filterNot(excludes).union(extras) } -ThisBuild / doc / tpolecatDevModeOptions ++= Set( - Scalac.docNoJavaCommentOption -) +ThisBuild / doc / tpolecatDevModeOptions ++= Set(Scalac.docNoJavaCommentOption) ThisBuild / scalafixScalaBinaryVersion := CrossVersion.binaryScalaVersion(scalaVersion.value) val excludeLint = SettingKey[Set[Def.KeyedInitialize[_]]]("excludeLintKeys") @@ -199,9 +201,7 @@ lazy val keepExistingHeader = HeaderCommentStyle.cStyleBlockComment.copy(commentCreator = new CommentCreator() { override def apply(text: String, existingText: Option[String]): String = existingText - .getOrElse( - HeaderCommentStyle.cStyleBlockComment.commentCreator(text) - ) + .getOrElse(HeaderCommentStyle.cStyleBlockComment.commentCreator(text)) .trim() }) @@ -217,9 +217,7 @@ val commonSettings = Def inTask(doc)(TpolecatPlugin.projectSettings), javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint:unchecked"), Compile / doc / javacOptions := Seq("-source", "1.8"), - excludeDependencies ++= Seq( - "org.apache.beam" % "beam-sdks-java-io-kafka" - ), + excludeDependencies ++= Seq("org.apache.beam" % "beam-sdks-java-io-kafka"), resolvers ++= Resolver.sonatypeOssRepos("public"), Test / javaOptions += "-Dscio.ignoreVersionWarning=true", Test / testOptions += Tests.Argument("-oD"), @@ -240,9 +238,7 @@ val commonSettings = Def coverageHighlighting := true, licenses := Seq("Apache 2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")), homepage := Some(url("https://github.com/spotify/scio")), - scmInfo := Some( - ScmInfo(url("https://github.com/spotify/scio"), "scm:git:git@github.com:spotify/scio.git") - ), + scmInfo := Some(ScmInfo(url("https://github.com/spotify/scio"), "scm:git:git@github.com:spotify/scio.git")), developers := List( Developer( id = "sinisa_lyh", @@ -367,11 +363,7 @@ lazy val macroSettings = Def.settings( libraryDependencies ++= { VersionNumber(scalaVersion.value) match { case v if v.matchesSemVer(SemanticSelector("2.12.x")) => - Seq( - compilerPlugin( - ("org.scalamacros" % "paradise" % scalaMacrosVersion).cross(CrossVersion.full) - ) - ) + Seq(compilerPlugin(("org.scalamacros" % "paradise" % scalaMacrosVersion).cross(CrossVersion.full))) case _ => Nil } }, @@ -379,9 +371,7 @@ lazy val macroSettings = Def.settings( scalacOptions += "-Xmacro-settings:cache-implicit-schemas=true" ) -lazy val directRunnerDependencies = Seq( - "org.apache.beam" % "beam-runners-direct-java" % beamVersion % Runtime -) +lazy val directRunnerDependencies = Seq("org.apache.beam" % "beam-runners-direct-java" % beamVersion % Runtime) lazy val dataflowRunnerDependencies = Seq( "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Runtime ) @@ -402,27 +392,28 @@ lazy val flinkRunnerDependencies = Seq( lazy val beamRunners = settingKey[String]("beam runners") lazy val beamRunnersEval = settingKey[Seq[ModuleID]]("beam runners") -def beamRunnerSettings: Seq[Setting[_]] = Seq( - beamRunners := "", - beamRunnersEval := { - Option(beamRunners.value) - .filter(_.nonEmpty) - .orElse(sys.props.get("beamRunners")) - .orElse(sys.env.get("BEAM_RUNNERS")) - .map(_.split(",")) - .map { - _.flatMap { - case "DirectRunner" => directRunnerDependencies - case "DataflowRunner" => dataflowRunnerDependencies - case "SparkRunner" => sparkRunnerDependencies - case "FlinkRunner" => flinkRunnerDependencies - case _ => Nil - }.toSeq - } - .getOrElse(directRunnerDependencies) - }, - libraryDependencies ++= beamRunnersEval.value -) +def beamRunnerSettings: Seq[Setting[_]] = + Seq( + beamRunners := "", + beamRunnersEval := { + Option(beamRunners.value) + .filter(_.nonEmpty) + .orElse(sys.props.get("beamRunners")) + .orElse(sys.env.get("BEAM_RUNNERS")) + .map(_.split(",")) + .map { + _.flatMap { + case "DirectRunner" => directRunnerDependencies + case "DataflowRunner" => dataflowRunnerDependencies + case "SparkRunner" => sparkRunnerDependencies + case "FlinkRunner" => flinkRunnerDependencies + case _ => Nil + }.toSeq + } + .getOrElse(directRunnerDependencies) + }, + libraryDependencies ++= beamRunnersEval.value + ) ThisBuild / PB.protocVersion := protobufVersion lazy val scopedProtobufSettings = Def.settings( @@ -454,15 +445,12 @@ def splitTests(tests: Seq[TestDefinition], filter: Seq[String], forkOptions: For lazy val root: Project = Project("scio", file(".")) .settings(commonSettings) - .settings( - publish / skip := true, - mimaPreviousArtifacts := Set.empty, - assembly / aggregate := false - ) + .settings(publish / skip := true, mimaPreviousArtifacts := Set.empty, assembly / aggregate := false) .aggregate( `scio-avro`, `scio-cassandra3`, `scio-core`, + `scio-cosmosdb`, `scio-elasticsearch6`, `scio-elasticsearch7`, `scio-elasticsearch8`, @@ -548,6 +536,25 @@ lazy val `scio-core`: Project = project buildInfoPackage := "com.spotify.scio" ) +lazy val `scio-cosmosdb`: Project = project + .in(file("scio-cosmosdb")) + .configs(IntegrationTest) + .settings(itSettings) + .settings(commonSettings) + .settings(publishSettings) + .dependsOn(`scio-core`, `scio-test` % "test;it") + .settings( + //scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xsource:3"), // , "-Ymacro-annotations" + scalacOptions ++= Seq("-Xsource:3"), + libraryDependencies ++= Seq( + "org.mongodb" % "bson" % bsonVersion, + "com.azure" % "azure-cosmos" % cosmosVersion, + "org.testcontainers" % "azure" % cosmosContainerVersion % IntegrationTest, + "com.outr" %% "scribe" % scribeVersion % IntegrationTest, + "com.outr" %% "scribe-slf4j" % scribeVersion % IntegrationTest + ) + ) + lazy val `scio-test`: Project = project .in(file("scio-test")) .settings(commonSettings) @@ -597,10 +604,7 @@ lazy val `scio-test`: Project = project ) ) .configs(IntegrationTest) - .dependsOn( - `scio-core` % "test->test;compile->compile;it->it", - `scio-avro` % "compile->test;it->it" - ) + .dependsOn(`scio-core` % "test->test;compile->compile;it->it", `scio-avro` % "compile->test;it->it") lazy val `scio-macros`: Project = project .in(file("scio-macros")) @@ -621,9 +625,7 @@ lazy val `scio-macros`: Project = project lazy val `scio-avro`: Project = project .in(file("scio-avro")) - .dependsOn( - `scio-core` % "compile;it->it" - ) + .dependsOn(`scio-core` % "compile;it->it") .configs(IntegrationTest) .settings(commonSettings) .settings(publishSettings) @@ -658,11 +660,7 @@ lazy val `scio-avro`: Project = project lazy val `scio-google-cloud-platform`: Project = project .in(file("scio-google-cloud-platform")) - .dependsOn( - `scio-core` % "compile;it->it", - `scio-avro` % "test", - `scio-test` % "test;it" - ) + .dependsOn(`scio-core` % "compile;it->it", `scio-avro` % "test", `scio-test` % "test;it") .configs(IntegrationTest) .settings(commonSettings) .settings(publishSettings) @@ -721,10 +719,7 @@ lazy val `scio-google-cloud-platform`: Project = project lazy val `scio-cassandra3`: Project = project .in(file("scio-cassandra/cassandra3")) - .dependsOn( - `scio-core`, - `scio-test` % "test;it" - ) + .dependsOn(`scio-core`, `scio-test` % "test;it") .configs(IntegrationTest) .settings(commonSettings) .settings(publishSettings) @@ -755,10 +750,7 @@ lazy val `scio-cassandra3`: Project = project lazy val `scio-elasticsearch6`: Project = project .in(file("scio-elasticsearch/es6")) - .dependsOn( - `scio-core`, - `scio-test` % "test" - ) + .dependsOn(`scio-core`, `scio-test` % "test") .settings(commonSettings) .settings(publishSettings) .settings( @@ -800,17 +792,11 @@ lazy val `scio-elasticsearch7`: Project = project "org.scalatest" %% "scalatest" % scalatestVersion % Test ) ) - .dependsOn( - `scio-core`, - `scio-test` % "test" - ) + .dependsOn(`scio-core`, `scio-test` % "test") lazy val `scio-elasticsearch8`: Project = project .in(file("scio-elasticsearch/es8")) - .dependsOn( - `scio-core`, - `scio-test` % "test,it" - ) + .dependsOn(`scio-core`, `scio-test` % "test,it") .configs(IntegrationTest) .settings(commonSettings) .settings(publishSettings) @@ -893,10 +879,7 @@ lazy val `scio-extra`: Project = project lazy val `scio-grpc`: Project = project .in(file("scio-grpc")) - .dependsOn( - `scio-core`, - `scio-test` % "test" - ) + .dependsOn(`scio-core`, `scio-test` % "test") .settings(commonSettings) .settings(publishSettings) .settings(protobufSettings) @@ -913,10 +896,7 @@ lazy val `scio-grpc`: Project = project lazy val `scio-jdbc`: Project = project .in(file("scio-jdbc")) - .dependsOn( - `scio-core`, - `scio-test` % "test" - ) + .dependsOn(`scio-core`, `scio-test` % "test") .settings(commonSettings) .settings(publishSettings) .settings( @@ -930,10 +910,7 @@ lazy val `scio-jdbc`: Project = project lazy val `scio-neo4j`: Project = project .in(file("scio-neo4j")) - .dependsOn( - `scio-core`, - `scio-test` % "test,it" - ) + .dependsOn(`scio-core`, `scio-test` % "test,it") .configs(IntegrationTest) .settings(commonSettings) .settings(itSettings) @@ -956,11 +933,7 @@ val ensureSourceManaged = taskKey[Unit]("ensureSourceManaged") lazy val `scio-parquet`: Project = project .in(file("scio-parquet")) - .dependsOn( - `scio-core`, - `scio-avro`, - `scio-test` % "test->test" - ) + .dependsOn(`scio-core`, `scio-avro`, `scio-test` % "test->test") .settings(commonSettings) .settings(publishSettings) .settings( @@ -1005,11 +978,7 @@ lazy val `scio-parquet`: Project = project lazy val `scio-tensorflow`: Project = project .in(file("scio-tensorflow")) - .dependsOn( - `scio-avro`, - `scio-core`, - `scio-test` % "test->test" - ) + .dependsOn(`scio-avro`, `scio-core`, `scio-test` % "test->test") .settings(commonSettings) .settings(publishSettings) .settings(itSettings) @@ -1064,10 +1033,7 @@ lazy val `scio-examples`: Project = project .settings( publish / skip := true, mimaPreviousArtifacts := Set.empty, - tpolecatExcludeOptions ++= Set( - ScalacOptions.warnUnusedLocals, - ScalacOptions.privateWarnUnusedLocals - ), + tpolecatExcludeOptions ++= Set(ScalacOptions.warnUnusedLocals, ScalacOptions.privateWarnUnusedLocals), libraryDependencies ++= Seq( // compile "com.chuusai" %% "shapeless" % shapelessVersion, @@ -1131,11 +1097,7 @@ lazy val `scio-examples`: Project = project lazy val `scio-repl`: Project = project .in(file("scio-repl")) - .dependsOn( - `scio-core`, - `scio-google-cloud-platform`, - `scio-extra` - ) + .dependsOn(`scio-core`, `scio-google-cloud-platform`, `scio-extra`) .settings(commonSettings) .settings(publishSettings) .settings(assemblySettings) @@ -1185,10 +1147,7 @@ lazy val `scio-repl`: Project = project lazy val `scio-jmh`: Project = project .in(file("scio-jmh")) .enablePlugins(JmhPlugin) - .dependsOn( - `scio-core`, - `scio-avro` - ) + .dependsOn(`scio-core`, `scio-avro`) .settings(commonSettings) .settings(macroSettings) .settings( @@ -1209,11 +1168,7 @@ lazy val `scio-jmh`: Project = project lazy val `scio-smb`: Project = project .in(file("scio-smb")) - .dependsOn( - `scio-core`, - `scio-test` % "test;it", - `scio-avro` % IntegrationTest - ) + .dependsOn(`scio-core`, `scio-test` % "test;it", `scio-avro` % IntegrationTest) .configs(IntegrationTest) .settings(commonSettings) .settings(publishSettings) @@ -1273,10 +1228,7 @@ lazy val `scio-smb`: Project = project lazy val `scio-redis`: Project = project .in(file("scio-redis")) - .dependsOn( - `scio-core`, - `scio-test` % "test" - ) + .dependsOn(`scio-core`, `scio-test` % "test") .settings(commonSettings) .settings(publishSettings) .settings(itSettings) @@ -1359,6 +1311,7 @@ lazy val siteSettings = Def.settings( `scio-avro`, `scio-google-cloud-platform`, `scio-cassandra3`, + `scio-cosmosdb`, `scio-elasticsearch8`, `scio-extra`, `scio-jdbc`, diff --git a/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/CosmosDbIOIT.scala b/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/CosmosDbIOIT.scala new file mode 100644 index 0000000000..04ce682eb8 --- /dev/null +++ b/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/CosmosDbIOIT.scala @@ -0,0 +1,93 @@ +package com.spotify.scio.cosmosdb + +import com.azure.cosmos.CosmosClientBuilder +import com.spotify.scio.ContextAndArgs +import com.spotify.scio.cosmosdb.Utils.initLog +import org.bson.Document +import org.junit.rules.TemporaryFolder +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.testcontainers.containers.CosmosDBEmulatorContainer +import org.testcontainers.utility.DockerImageName + +import java.nio.file.Files +import scala.util.Using + +/** sbt scio-cosmosdb/IntegrationTest/test */ +class CosmosDbIOIT extends AnyFlatSpec with Matchers with BeforeAndAfterAll { + private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest" + private val DATABASE = "test" + private val CONTAINER = "test" + private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer( + DockerImageName.parse(DOCKER_NAME) + ) + private val tempFolder = new TemporaryFolder + tempFolder.create() + initLog() + + override def beforeAll(): Unit = { + scribe.info("Star CosmosDB emulator") + cosmosDBEmulatorContainer.start() + + val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath + val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore + keyStore.store( + Files.newOutputStream(keyStoreFile.toFile.toPath), + cosmosDBEmulatorContainer.getEmulatorKey.toCharArray + ) + System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString) + System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey) + System.setProperty("javax.net.ssl.trustStoreType", "PKCS12") + + scribe.info("Creando la data -------------------------------------------------------->") + val triedCreateData = Using( + new CosmosClientBuilder().gatewayMode + .endpointDiscoveryEnabled(false) + .endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint) + .key(cosmosDBEmulatorContainer.getEmulatorKey) + .buildClient + ) { client => + client.createDatabase(DATABASE) + val db = client.getDatabase(DATABASE) + db.createContainer(CONTAINER, "/id") + val container = db.getContainer(CONTAINER) + for (i <- 1 to 10) { + container.createItem(new Document("id", i.toString)) + } + } + if (triedCreateData.isFailure) { + val throwable = triedCreateData.failed.get + scribe.error("Error creando la data", throwable) + throw throwable + } + scribe.info("Data creada ------------------------------------------------------------<") + } + + override protected def afterAll(): Unit = { + scribe.info("Stop CosmosDB emulator") + cosmosDBEmulatorContainer.stop() + } + + behavior of "CosmosDb with Core (SQL) API" + + it should "be " in { + val output = tempFolder.newFolder("output.txt") + scribe.info(s"output path: ${output.getPath}") + + val (sc, args) = ContextAndArgs(Array()) + val a = sc + .readCosmosDbCoreApi( + cosmosDBEmulatorContainer.getEmulatorEndpoint, + cosmosDBEmulatorContainer.getEmulatorKey, + DATABASE, + CONTAINER, + s"SELECT * FROM c" + ) + .map(_.toJson) + .saveAsTextFile(output.getPath) + + sc.run() + } + +} diff --git a/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/Utils.scala b/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/Utils.scala new file mode 100644 index 0000000000..062b3fef5c --- /dev/null +++ b/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/Utils.scala @@ -0,0 +1,72 @@ +package com.spotify.scio.cosmosdb + +import scribe.filter.{level, packageName, select} +import scribe.format.{ + bold, + closeBracket, + cyan, + fileName, + green, + groupBySecond, + italic, + levelColoredPaddedRight, + line, + mdcMultiLine, + messages, + multiLine, + newLine, + openBracket, + position, + space, + string, + threadName, + time, + Formatter +} +import scribe.{Level, Logger} + +object Utils { + + def initLog(): Unit = { + + val formatter1 = Formatter.fromBlocks( + groupBySecond( + newLine, + openBracket, + bold(levelColoredPaddedRight), + space, + cyan(bold(time)), + closeBracket, + space, + string("("), + italic(threadName), + string(")"), + space, + green(position), + space, + string("("), + fileName, + string(":"), + line, + string(")"), + newLine + ), + // openBracket, + // bold(levelColoredPaddedRight), + multiLine(messages), + mdcMultiLine + ) + + Logger.root + .clearHandlers() + .clearModifiers() + .withHandler( + formatter = formatter1, + minimumLevel = Some(Level.Info), + modifiers = List( + select(packageName.startsWith("com.azure.cosmos")).exclude(level < Level.Warn) + ) + ) + .replace() + } +} diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala new file mode 100644 index 0000000000..cf6e0833a0 --- /dev/null +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala @@ -0,0 +1,35 @@ +package com.spotify.scio.cosmosdb + +import com.spotify.scio.ScioContext +import com.spotify.scio.cosmosdb.read.CosmosDbRead +import com.spotify.scio.io.* +import com.spotify.scio.values.SCollection +import org.bson.Document + +trait CosmosDbIO[T] extends ScioIO[T] {} + +case class ReadCosmosDdIO( + endpoint: String = null, + key: String = null, + database: String = null, + container: String = null, + query: String = null +) extends CosmosDbIO[Document] { + override type ReadP = Unit + override type WriteP = Nothing + override val tapT: TapT.Aux[Document, Nothing] = EmptyTapOf[Document] + + override protected def read(sc: ScioContext, params: ReadP): SCollection[Document] = + sc.applyTransform(CosmosDbRead(endpoint, key, database, container, query)) + + override protected def write(data: SCollection[Document], params: WriteP): Tap[Nothing] = + throw new UnsupportedOperationException("cosmosDbCoreApi is read-only") + + /** + * Write options also return a `ClosedTap`. Once the job completes you can open the `Tap`. Tap + * abstracts away the logic of reading the dataset directly as an Iterator[T] or re-opening it in + * another ScioContext. The Future is complete once the job finishes. This can be used to do light + * weight pipeline orchestration e.g. WordCountOrchestration.scala. + */ + override def tap(read: ReadP): Tap[Nothing] = EmptyTap +} diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/package.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/package.scala new file mode 100644 index 0000000000..6668fe375c --- /dev/null +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/package.scala @@ -0,0 +1,5 @@ +package com.spotify.scio + +import com.spotify.scio.cosmosdb.syntax.AllSyntax + +package object cosmosdb extends AllSyntax diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala new file mode 100644 index 0000000000..e46cf99cb8 --- /dev/null +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala @@ -0,0 +1,57 @@ +package com.spotify.scio.cosmosdb.read + +import com.azure.cosmos.models.CosmosQueryRequestOptions +import com.azure.cosmos.{CosmosClient, CosmosClientBuilder} +import org.apache.beam.sdk.annotations.Experimental +import org.apache.beam.sdk.annotations.Experimental.Kind +import org.apache.beam.sdk.io.BoundedSource +import org.bson.Document +import org.slf4j.LoggerFactory + +@Experimental(Kind.SOURCE_SINK) +private[read] class CosmosDbBoundedReader(cosmosSource: CosmosDbBoundedSource) + extends BoundedSource.BoundedReader[Document] { + private val log = LoggerFactory.getLogger(getClass) + private var maybeClient: Option[CosmosClient] = None + private var maybeIterator: Option[java.util.Iterator[Document]] = None + + override def start(): Boolean = { + maybeClient = Some( + new CosmosClientBuilder().gatewayMode + .endpointDiscoveryEnabled(false) + .endpoint(cosmosSource.readCosmos.endpoint) + .key(cosmosSource.readCosmos.key) + .buildClient + ) + + maybeIterator = maybeClient.map { client => + log.info("Get the container name") + + log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}") + client + .getDatabase(cosmosSource.readCosmos.database) + .getContainer(cosmosSource.readCosmos.container) + .queryItems( + cosmosSource.readCosmos.query, + new CosmosQueryRequestOptions(), + classOf[Document] + ) + .iterator() + } + + true + } + + override def advance(): Boolean = maybeIterator.exists(_.hasNext) + + override def getCurrent: Document = + maybeIterator + .filter(_.hasNext) + // .map(iterator => new Document(iterator.next())) + .map(_.next()) + .orNull + + override def getCurrentSource: CosmosDbBoundedSource = cosmosSource + + override def close(): Unit = maybeClient.foreach(_.close()) +} diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala new file mode 100644 index 0000000000..236d88a48a --- /dev/null +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala @@ -0,0 +1,40 @@ +package com.spotify.scio.cosmosdb.read + +import org.apache.beam.sdk.annotations.Experimental +import org.apache.beam.sdk.annotations.Experimental.Kind +import org.apache.beam.sdk.coders.{Coder, SerializableCoder} +import org.apache.beam.sdk.io.BoundedSource +import org.apache.beam.sdk.options.PipelineOptions +import org.bson.Document + +import java.util +import java.util.Collections + +/** + * A CosmosDB Core (SQL) API {@link BoundedSource} reading {@link Document} from a given instance. + */ +@Experimental(Kind.SOURCE_SINK) +private[read] class CosmosDbBoundedSource(private[read] val readCosmos: CosmosDbRead) + extends BoundedSource[Document] { + + /** + * @inheritDoc + * TODO: You have to find a better way, maybe by partition key + */ + override def split( + desiredBundleSizeBytes: Long, + options: PipelineOptions + ): util.List[CosmosDbBoundedSource] = + Collections.singletonList(this) + + /** + * @inheritDoc + * The Cosmos DB Coro (SQL) API not support this metrics by the querys + */ + override def getEstimatedSizeBytes(options: PipelineOptions) = 0L + + override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document]) + + override def createReader(options: PipelineOptions) = + new CosmosDbBoundedReader(this) +} diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala new file mode 100644 index 0000000000..f609ba427c --- /dev/null +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala @@ -0,0 +1,49 @@ +package com.spotify.scio.cosmosdb.read + +import org.apache.beam.sdk.annotations.Experimental +import org.apache.beam.sdk.annotations.Experimental.Kind +import org.apache.beam.sdk.io.Read +import org.apache.beam.sdk.transforms.PTransform +import org.apache.beam.sdk.values.{PBegin, PCollection} +import org.bson.Document +import org.slf4j.LoggerFactory + +/** A {@link PTransform} to read data from CosmosDB Core (SQL) API. */ +@Experimental(Kind.SOURCE_SINK) +private[cosmosdb] case class CosmosDbRead( + endpoint: String = null, + key: String = null, + database: String = null, + container: String = null, + query: String = null +) extends PTransform[PBegin, PCollection[Document]] { + + private val log = LoggerFactory.getLogger(classOf[CosmosDbRead]) + + /** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */ + def withCosmosEndpoint(endpoint: String): CosmosDbRead = this.copy(endpoint = endpoint) + + def withCosmosKey(key: String): CosmosDbRead = this.copy(key = key) + + def withDatabase(database: String): CosmosDbRead = this.copy(database = database) + + def withQuery(query: String): CosmosDbRead = this.copy(query = query) + + def withContainer(container: String): CosmosDbRead = this.copy(container = container) + + override def expand(input: PBegin): PCollection[Document] = { + log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query") + validate() + + // input.getPipeline.apply(Read.from(new CosmosSource(this))) + input.apply(Read.from(new CosmosDbBoundedSource(this))) + } + + private def validate(): Unit = { + require(endpoint != null, "CosmosDB endpoint is required") + require(key != null, "CosmosDB key is required") + require(database != null, "CosmosDB database is required") + require(container != null, "CosmosDB container is required") + require(query != null, "CosmosDB query is required") + } +} diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/AllSyntax.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/AllSyntax.scala new file mode 100644 index 0000000000..8d7ef82093 --- /dev/null +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/AllSyntax.scala @@ -0,0 +1,3 @@ +package com.spotify.scio.cosmosdb.syntax + +trait AllSyntax extends ScioContextSyntax diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala new file mode 100644 index 0000000000..cfc7bfcfff --- /dev/null +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala @@ -0,0 +1,22 @@ +package com.spotify.scio.cosmosdb.syntax + +import com.spotify.scio.ScioContext +import com.spotify.scio.cosmosdb.ReadCosmosDdIO +import com.spotify.scio.values.SCollection +import org.bson.Document + +trait ScioContextSyntax { + implicit def cosmosdbScioContextOps(sc: ScioContext): CosmosDbScioContextOps = + new CosmosDbScioContextOps(sc) +} + +final class CosmosDbScioContextOps(private val sc: ScioContext) extends AnyVal { + def readCosmosDbCoreApi( + endpoint: String = null, + key: String = null, + database: String = null, + container: String = null, + query: String = null + ): SCollection[Document] = + sc.read(ReadCosmosDdIO(endpoint, key, database, container, query)) +} From e07ab100a1e7b1db8c3c99f6949024a6be337934 Mon Sep 17 00:00:00 2001 From: Hector Miuler Malpica Gallegos Date: Wed, 1 Feb 2023 12:07:24 -0500 Subject: [PATCH 2/4] docs(scio-cosmosdb): Add Header license, add scaladoc in CosmosDbScioContextOps#readCosmosDbCoreApi and add slf4j-api dependency --- build.sbt | 6 ++- .../spotify/scio/cosmosdb/CosmosDbIO.scala | 16 ++++++++ .../com/spotify/scio/cosmosdb/package.scala | 16 ++++++++ .../cosmosdb/read/CosmosDbBoundedReader.scala | 16 ++++++++ .../cosmosdb/read/CosmosDbBoundedSource.scala | 18 ++++++++- .../scio/cosmosdb/read/CosmosDbRead.scala | 18 ++++++++- .../scio/cosmosdb/syntax/AllSyntax.scala | 16 ++++++++ .../cosmosdb/syntax/ScioContextSyntax.scala | 37 +++++++++++++++++++ 8 files changed, 139 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 50ae445774..5fb2b732cc 100644 --- a/build.sbt +++ b/build.sbt @@ -89,7 +89,6 @@ val breezeVersion = "2.1.0" val bsonVersion = "4.8.1" val cosmosVersion = "4.37.1" val cosmosContainerVersion = "1.17.5" -val scribeVersion = "3.10.7" val caffeineVersion = "2.9.3" val cassandraDriverVersion = "3.11.3" val cassandraVersion = "3.11.13" @@ -125,6 +124,7 @@ val scalaCollectionCompatVersion = "2.9.0" val scalacticVersion = "3.2.15" val scalaMacrosVersion = "2.1.1" val scalatestVersion = "3.2.15" +val scribeVersion = "3.10.7" val shapelessVersion = "2.3.10" val slf4jVersion = "1.7.36" val sparkeyVersion = "3.2.5" @@ -547,8 +547,10 @@ lazy val `scio-cosmosdb`: Project = project //scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xsource:3"), // , "-Ymacro-annotations" scalacOptions ++= Seq("-Xsource:3"), libraryDependencies ++= Seq( - "org.mongodb" % "bson" % bsonVersion, "com.azure" % "azure-cosmos" % cosmosVersion, + "org.mongodb" % "bson" % bsonVersion, + "org.slf4j" % "slf4j-api" % slf4jVersion, + // TEST "org.testcontainers" % "azure" % cosmosContainerVersion % IntegrationTest, "com.outr" %% "scribe" % scribeVersion % IntegrationTest, "com.outr" %% "scribe-slf4j" % scribeVersion % IntegrationTest diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala index cf6e0833a0..83b5121bde 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio.cosmosdb import com.spotify.scio.ScioContext diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/package.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/package.scala index 6668fe375c..2f2fb13d53 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/package.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/package.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio import com.spotify.scio.cosmosdb.syntax.AllSyntax diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala index e46cf99cb8..b1e2031a60 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio.cosmosdb.read import com.azure.cosmos.models.CosmosQueryRequestOptions diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala index 236d88a48a..2e195a5ab5 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio.cosmosdb.read import org.apache.beam.sdk.annotations.Experimental @@ -11,7 +27,7 @@ import java.util import java.util.Collections /** - * A CosmosDB Core (SQL) API {@link BoundedSource} reading {@link Document} from a given instance. + * A CosmosDB Core (SQL) API [[BoundedSource]] reading [[Document]] from a given instance. */ @Experimental(Kind.SOURCE_SINK) private[read] class CosmosDbBoundedSource(private[read] val readCosmos: CosmosDbRead) diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala index f609ba427c..73b454f549 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio.cosmosdb.read import org.apache.beam.sdk.annotations.Experimental @@ -8,7 +24,7 @@ import org.apache.beam.sdk.values.{PBegin, PCollection} import org.bson.Document import org.slf4j.LoggerFactory -/** A {@link PTransform} to read data from CosmosDB Core (SQL) API. */ +/** A [[PTransform]] to read data from CosmosDB Core (SQL) API. */ @Experimental(Kind.SOURCE_SINK) private[cosmosdb] case class CosmosDbRead( endpoint: String = null, diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/AllSyntax.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/AllSyntax.scala index 8d7ef82093..21ce0d15db 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/AllSyntax.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/AllSyntax.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio.cosmosdb.syntax trait AllSyntax extends ScioContextSyntax diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala index cfc7bfcfff..c27aaf3064 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio.cosmosdb.syntax import com.spotify.scio.ScioContext @@ -11,6 +27,27 @@ trait ScioContextSyntax { } final class CosmosDbScioContextOps(private val sc: ScioContext) extends AnyVal { + + /** + * Read data from CosmosDB CORE (SQL) API + * + * Example: + * + * url: + * AccountEndpoint=https://[cosmosdbname].documents.azure.com:443/;AccountKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyy==; + * + * @param endpoint + * The endpoint, example: AccountEndpoint=https://[cosmosdbname].documents.azure.com:443 + * @param key + * The key, example: ;AccountKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyy==; + * @param database + * The name of the database + * @param container + * The name of the container + * @param query + * The query for cosmosdb, example: "SELECT * FROM c" + * @return + */ def readCosmosDbCoreApi( endpoint: String = null, key: String = null, From a79c089b3105905e1d2b0c9fbeb4a02955b671b5 Mon Sep 17 00:00:00 2001 From: Hector Miuler Malpica Gallegos Date: Wed, 1 Feb 2023 12:10:33 -0500 Subject: [PATCH 3/4] test(scio-cosmosdb): Refactor for add the testcontainers-scala-scalatest --- build.sbt | 1 + .../spotify/scio/cosmosdb/CosmosDbIOIT.scala | 54 +++++++++---------- .../ScalaCosmosDBEmulatorContainer.scala | 25 +++++++++ 3 files changed, 52 insertions(+), 28 deletions(-) create mode 100644 scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/ScalaCosmosDBEmulatorContainer.scala diff --git a/build.sbt b/build.sbt index 5fb2b732cc..d1fa7efc41 100644 --- a/build.sbt +++ b/build.sbt @@ -551,6 +551,7 @@ lazy val `scio-cosmosdb`: Project = project "org.mongodb" % "bson" % bsonVersion, "org.slf4j" % "slf4j-api" % slf4jVersion, // TEST + "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersVersion % "it", "org.testcontainers" % "azure" % cosmosContainerVersion % IntegrationTest, "com.outr" %% "scribe" % scribeVersion % IntegrationTest, "com.outr" %% "scribe-slf4j" % scribeVersion % IntegrationTest diff --git a/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/CosmosDbIOIT.scala b/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/CosmosDbIOIT.scala index 04ce682eb8..7a8de547a4 100644 --- a/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/CosmosDbIOIT.scala +++ b/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/CosmosDbIOIT.scala @@ -1,51 +1,45 @@ package com.spotify.scio.cosmosdb import com.azure.cosmos.CosmosClientBuilder -import com.spotify.scio.ContextAndArgs +import com.dimafeng.testcontainers.ForAllTestContainer import com.spotify.scio.cosmosdb.Utils.initLog +import com.spotify.scio.{ ContextAndArgs, ScioMetrics } import org.bson.Document import org.junit.rules.TemporaryFolder import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import org.testcontainers.containers.CosmosDBEmulatorContainer -import org.testcontainers.utility.DockerImageName import java.nio.file.Files import scala.util.Using /** sbt scio-cosmosdb/IntegrationTest/test */ -class CosmosDbIOIT extends AnyFlatSpec with Matchers with BeforeAndAfterAll { - private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest" +class CosmosDbIOIT extends AnyFlatSpec with Matchers with BeforeAndAfterAll with ForAllTestContainer { private val DATABASE = "test" private val CONTAINER = "test" - private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer( - DockerImageName.parse(DOCKER_NAME) - ) private val tempFolder = new TemporaryFolder tempFolder.create() initLog() - override def beforeAll(): Unit = { - scribe.info("Star CosmosDB emulator") - cosmosDBEmulatorContainer.start() + override val container: ScalaCosmosDBEmulatorContainer = ScalaCosmosDBEmulatorContainer() + override def beforeAll(): Unit = { val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath - val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore + val keyStore = container.buildNewKeyStore keyStore.store( Files.newOutputStream(keyStoreFile.toFile.toPath), - cosmosDBEmulatorContainer.getEmulatorKey.toCharArray + container.emulatorKey.toCharArray ) System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString) - System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey) + System.setProperty("javax.net.ssl.trustStorePassword", container.emulatorKey) System.setProperty("javax.net.ssl.trustStoreType", "PKCS12") - scribe.info("Creando la data -------------------------------------------------------->") + scribe.info("Create data -------------------------------------------------------->") val triedCreateData = Using( new CosmosClientBuilder().gatewayMode .endpointDiscoveryEnabled(false) - .endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint) - .key(cosmosDBEmulatorContainer.getEmulatorKey) + .endpoint(container.emulatorEndpoint) + .key(container.emulatorKey) .buildClient ) { client => client.createDatabase(DATABASE) @@ -61,12 +55,7 @@ class CosmosDbIOIT extends AnyFlatSpec with Matchers with BeforeAndAfterAll { scribe.error("Error creando la data", throwable) throw throwable } - scribe.info("Data creada ------------------------------------------------------------<") - } - - override protected def afterAll(): Unit = { - scribe.info("Stop CosmosDB emulator") - cosmosDBEmulatorContainer.stop() + scribe.info("Data created ------------------------------------------------------------<") } behavior of "CosmosDb with Core (SQL) API" @@ -75,19 +64,28 @@ class CosmosDbIOIT extends AnyFlatSpec with Matchers with BeforeAndAfterAll { val output = tempFolder.newFolder("output.txt") scribe.info(s"output path: ${output.getPath}") - val (sc, args) = ContextAndArgs(Array()) - val a = sc + val (sc, _) = ContextAndArgs(Array()) + + val counter = ScioMetrics.counter("counter") + sc .readCosmosDbCoreApi( - cosmosDBEmulatorContainer.getEmulatorEndpoint, - cosmosDBEmulatorContainer.getEmulatorKey, + container.emulatorEndpoint, + container.emulatorKey, DATABASE, CONTAINER, s"SELECT * FROM c" ) + .tap(_ => counter.inc()) .map(_.toJson) .saveAsTextFile(output.getPath) - sc.run() + val result = sc.run().waitUntilFinish() + + result.counter(counter).committed.get should equal(10) } } + + + + diff --git a/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/ScalaCosmosDBEmulatorContainer.scala b/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/ScalaCosmosDBEmulatorContainer.scala new file mode 100644 index 0000000000..e6e86be64d --- /dev/null +++ b/scio-cosmosdb/src/it/scala/com/spotify/scio/cosmosdb/ScalaCosmosDBEmulatorContainer.scala @@ -0,0 +1,25 @@ +package com.spotify.scio.cosmosdb + +import com.dimafeng.testcontainers.SingleContainer +import com.spotify.scio.cosmosdb.ScalaCosmosDBEmulatorContainer.defaultDockerImageName +import org.testcontainers.containers.CosmosDBEmulatorContainer +import org.testcontainers.utility.DockerImageName + +import java.security.KeyStore + +case class ScalaCosmosDBEmulatorContainer( + dockerImageName: DockerImageName = DockerImageName.parse(defaultDockerImageName) +) extends SingleContainer[CosmosDBEmulatorContainer] { + + override val container: CosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(dockerImageName) + + def buildNewKeyStore: KeyStore = container.buildNewKeyStore + def emulatorEndpoint: String = container.getEmulatorEndpoint + def emulatorKey: String = container.getEmulatorKey +} + +object ScalaCosmosDBEmulatorContainer { + val defaultImage = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator" + val defaultTag = "latest" + val defaultDockerImageName = s"$defaultImage:$defaultTag" +} From 091e441a0b5164219457ae3ed12a19e6fd1dbcb2 Mon Sep 17 00:00:00 2001 From: Hector Miuler Malpica Gallegos Date: Wed, 1 Feb 2023 16:24:24 -0500 Subject: [PATCH 4/4] fix(scio-cosmosdb): Fix the CosmosDbBoundedReader#getCurrent is now idempotent, add @experimental annotations and simplifying the creation of the CosmosDbBoundedSource --- .../spotify/scio/cosmosdb/CosmosDbIO.scala | 10 ++-- .../cosmosdb/read/CosmosDbBoundedReader.scala | 46 +++++++++++-------- .../cosmosdb/read/CosmosDbBoundedSource.scala | 24 ++++++---- .../scio/cosmosdb/read/CosmosDbRead.scala | 39 ++++------------ .../cosmosdb/syntax/ScioContextSyntax.scala | 5 +- 5 files changed, 59 insertions(+), 65 deletions(-) diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala index 83b5121bde..288477a4b8 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/CosmosDbIO.scala @@ -25,11 +25,11 @@ import org.bson.Document trait CosmosDbIO[T] extends ScioIO[T] {} case class ReadCosmosDdIO( - endpoint: String = null, - key: String = null, - database: String = null, - container: String = null, - query: String = null + endpoint: String, + key: String, + database: String, + container: String, + query: String ) extends CosmosDbIO[Document] { override type ReadP = Unit override type WriteP = Nothing diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala index b1e2031a60..e6546fcad7 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedReader.scala @@ -17,57 +17,65 @@ package com.spotify.scio.cosmosdb.read import com.azure.cosmos.models.CosmosQueryRequestOptions -import com.azure.cosmos.{CosmosClient, CosmosClientBuilder} -import org.apache.beam.sdk.annotations.Experimental -import org.apache.beam.sdk.annotations.Experimental.Kind +import com.azure.cosmos.{ CosmosClient, CosmosClientBuilder } +import com.spotify.scio.annotations.experimental import org.apache.beam.sdk.io.BoundedSource import org.bson.Document import org.slf4j.LoggerFactory -@Experimental(Kind.SOURCE_SINK) +@experimental private[read] class CosmosDbBoundedReader(cosmosSource: CosmosDbBoundedSource) extends BoundedSource.BoundedReader[Document] { private val log = LoggerFactory.getLogger(getClass) private var maybeClient: Option[CosmosClient] = None private var maybeIterator: Option[java.util.Iterator[Document]] = None + @volatile private var current: Option[Document] = None + @volatile private var recordsReturned = 0L override def start(): Boolean = { maybeClient = Some( new CosmosClientBuilder().gatewayMode .endpointDiscoveryEnabled(false) - .endpoint(cosmosSource.readCosmos.endpoint) - .key(cosmosSource.readCosmos.key) + .endpoint(cosmosSource.endpoint) + .key(cosmosSource.key) .buildClient ) maybeIterator = maybeClient.map { client => log.info("Get the container name") - log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}") + log.info(s"Get the iterator of the query in container ${cosmosSource.container}") client - .getDatabase(cosmosSource.readCosmos.database) - .getContainer(cosmosSource.readCosmos.container) + .getDatabase(cosmosSource.database) + .getContainer(cosmosSource.container) .queryItems( - cosmosSource.readCosmos.query, + cosmosSource.query, new CosmosQueryRequestOptions(), classOf[Document] ) .iterator() } - true + advance() } - override def advance(): Boolean = maybeIterator.exists(_.hasNext) + override def advance(): Boolean = maybeIterator match { + case Some(iterator) if iterator.hasNext => + current = Some(iterator.next()) + recordsReturned += 1 + true + case _ => + false + } - override def getCurrent: Document = - maybeIterator - .filter(_.hasNext) - // .map(iterator => new Document(iterator.next())) - .map(_.next()) - .orNull + override def getCurrent: Document = current.orNull override def getCurrentSource: CosmosDbBoundedSource = cosmosSource - override def close(): Unit = maybeClient.foreach(_.close()) + override def close(): Unit = { + log.info("Closing reader after reading {} records.", recordsReturned) + maybeClient.foreach(_.close()) + maybeClient = None + maybeIterator = None + } } diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala index 2e195a5ab5..828fe7133e 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbBoundedSource.scala @@ -16,8 +16,7 @@ package com.spotify.scio.cosmosdb.read -import org.apache.beam.sdk.annotations.Experimental -import org.apache.beam.sdk.annotations.Experimental.Kind +import com.spotify.scio.annotations.experimental import org.apache.beam.sdk.coders.{Coder, SerializableCoder} import org.apache.beam.sdk.io.BoundedSource import org.apache.beam.sdk.options.PipelineOptions @@ -26,12 +25,21 @@ import org.bson.Document import java.util import java.util.Collections -/** - * A CosmosDB Core (SQL) API [[BoundedSource]] reading [[Document]] from a given instance. - */ -@Experimental(Kind.SOURCE_SINK) -private[read] class CosmosDbBoundedSource(private[read] val readCosmos: CosmosDbRead) - extends BoundedSource[Document] { +/** A CosmosDB Core (SQL) API [[BoundedSource]] reading [[Document]] from a given instance. */ +@experimental +private[read] class CosmosDbBoundedSource( + val endpoint: String, + val key: String, + val database: String, + val container: String, + val query: String +) extends BoundedSource[Document] { + + require(endpoint != null, "CosmosDB endpoint is required") + require(key != null, "CosmosDB key is required") + require(database != null, "CosmosDB database is required") + require(container != null, "CosmosDB container is required") + require(query != null, "CosmosDB query is required") /** * @inheritDoc diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala index 73b454f549..aab0a38f59 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/read/CosmosDbRead.scala @@ -16,50 +16,27 @@ package com.spotify.scio.cosmosdb.read -import org.apache.beam.sdk.annotations.Experimental -import org.apache.beam.sdk.annotations.Experimental.Kind +import com.spotify.scio.annotations.experimental import org.apache.beam.sdk.io.Read import org.apache.beam.sdk.transforms.PTransform -import org.apache.beam.sdk.values.{PBegin, PCollection} +import org.apache.beam.sdk.values.{ PBegin, PCollection } import org.bson.Document import org.slf4j.LoggerFactory /** A [[PTransform]] to read data from CosmosDB Core (SQL) API. */ -@Experimental(Kind.SOURCE_SINK) +@experimental private[cosmosdb] case class CosmosDbRead( - endpoint: String = null, - key: String = null, - database: String = null, - container: String = null, + endpoint: String, + key: String, + database: String, + container: String, query: String = null ) extends PTransform[PBegin, PCollection[Document]] { - private val log = LoggerFactory.getLogger(classOf[CosmosDbRead]) - /** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */ - def withCosmosEndpoint(endpoint: String): CosmosDbRead = this.copy(endpoint = endpoint) - - def withCosmosKey(key: String): CosmosDbRead = this.copy(key = key) - - def withDatabase(database: String): CosmosDbRead = this.copy(database = database) - - def withQuery(query: String): CosmosDbRead = this.copy(query = query) - - def withContainer(container: String): CosmosDbRead = this.copy(container = container) - override def expand(input: PBegin): PCollection[Document] = { log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query") - validate() - - // input.getPipeline.apply(Read.from(new CosmosSource(this))) - input.apply(Read.from(new CosmosDbBoundedSource(this))) + input.apply(Read.from(new CosmosDbBoundedSource(endpoint, key, database, container, query))) } - private def validate(): Unit = { - require(endpoint != null, "CosmosDB endpoint is required") - require(key != null, "CosmosDB key is required") - require(database != null, "CosmosDB database is required") - require(container != null, "CosmosDB container is required") - require(query != null, "CosmosDB query is required") - } } diff --git a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala index c27aaf3064..b79ae577de 100644 --- a/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala +++ b/scio-cosmosdb/src/main/scala/com/spotify/scio/cosmosdb/syntax/ScioContextSyntax.scala @@ -17,6 +17,7 @@ package com.spotify.scio.cosmosdb.syntax import com.spotify.scio.ScioContext +import com.spotify.scio.annotations.experimental import com.spotify.scio.cosmosdb.ReadCosmosDdIO import com.spotify.scio.values.SCollection import org.bson.Document @@ -28,6 +29,7 @@ trait ScioContextSyntax { final class CosmosDbScioContextOps(private val sc: ScioContext) extends AnyVal { + @experimental /** * Read data from CosmosDB CORE (SQL) API * @@ -54,6 +56,5 @@ final class CosmosDbScioContextOps(private val sc: ScioContext) extends AnyVal { database: String = null, container: String = null, query: String = null - ): SCollection[Document] = - sc.read(ReadCosmosDdIO(endpoint, key, database, container, query)) + ): SCollection[Document] = sc.read(ReadCosmosDdIO(endpoint, key, database, container, query)) }