Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(azure-cosmosdb): Add support for cosmosdb with Core (SQL) API #23610

Closed
wants to merge 3 commits into from

Conversation

Miuler
Copy link
Contributor

@Miuler Miuler commented Oct 12, 2022

Fixes: #23604

Support the CosmosDB Core (SQL) API for read

I add unittest using the testcontainers.

@lukecwik can you help me with this pull request?


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link

codecov bot commented Oct 12, 2022

Codecov Report

Merging #23610 (0792ad9) into master (f5020e7) will increase coverage by 0.00%.
The diff coverage is n/a.

@@           Coverage Diff           @@
##           master   #23610   +/-   ##
=======================================
  Coverage   73.11%   73.12%           
=======================================
  Files         735      735           
  Lines       98161    98161           
=======================================
+ Hits        71771    71780    +9     
+ Misses      25026    25017    -9     
  Partials     1364     1364           
Flag Coverage Δ
python 82.67% <ø> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/transforms/combiners.py 93.05% <0.00%> (-0.39%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.30% <0.00%> (+0.12%) ⬆️
...s/python/apache_beam/io/gcp/bigquery_file_loads.py 87.38% <0.00%> (+0.22%) ⬆️
sdks/python/apache_beam/pipeline.py 92.42% <0.00%> (+0.29%) ⬆️
sdks/python/apache_beam/internal/dill_pickler.py 84.89% <0.00%> (+0.71%) ⬆️
sdks/python/apache_beam/internal/metrics/metric.py 94.00% <0.00%> (+1.00%) ⬆️
...che_beam/runners/interactive/interactive_runner.py 91.77% <0.00%> (+1.26%) ⬆️
sdks/python/apache_beam/utils/interactive_utils.py 97.56% <0.00%> (+2.43%) ⬆️
.../python/apache_beam/testing/test_stream_service.py 92.85% <0.00%> (+4.76%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@Miuler Miuler force-pushed the master branch 3 times, most recently from 513c841 to 887992a Compare October 31, 2022 04:21
@github-actions github-actions bot removed the mongodb label Oct 31, 2022
@Miuler Miuler marked this pull request as ready for review October 31, 2022 16:50
@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @Abacn for label build.
R: @Abacn for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@damccorm
Copy link
Contributor

damccorm commented Nov 8, 2022

@Abacn could you take a look?

@Abacn
Copy link
Contributor

Abacn commented Nov 8, 2022

Will do; however this is the first API in scala I saw. R: @kennknowles may know more about how good our current build infrastructure support Scala build.

@github-actions
Copy link
Contributor

github-actions bot commented Nov 8, 2022

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Looks pretty good. I had a couple questions.

.gitignore Outdated Show resolved Hide resolved
sdks/java/io/azure-cosmosdb/build.gradle Outdated Show resolved Hide resolved
sdks/java/io/azure-cosmosdb/build.gradle Outdated Show resolved Hide resolved
sdks/java/io/azure-cosmosdb/build.gradle Show resolved Hide resolved
@Miuler Miuler force-pushed the master branch 3 times, most recently from 50712ea to 5c70c19 Compare November 14, 2022 23:00
@Abacn
Copy link
Contributor

Abacn commented Nov 14, 2022

Thanks for the revision. Please avoid overwrite reviewed commits if doing force push; and I think this API may better fit into https://github.com/apache/beam/tree/master/sdks/java/io/azure subproject as thisi is other azure IOs lies.

@Miuler
Copy link
Contributor Author

Miuler commented Nov 14, 2022

Thanks for the revision. Please avoid overwrite reviewed commits if doing force push; and I think this API may better fit into https://github.com/apache/beam/tree/master/sdks/java/io/azure subproject as thisi is other azure IOs lies.

The io.azure is only azure blob storage, this io.azure.cosmos is for all apis for comosdb (mongo, cassandra, core sql api, Gremlin, table, etc)

@Miuler Miuler force-pushed the master branch 2 times, most recently from 56fcbeb to f1dd226 Compare November 14, 2022 23:40
@Abacn
Copy link
Contributor

Abacn commented Nov 14, 2022

Thanks for the revision. Please avoid overwrite reviewed commits if doing force push; and I think this API may better fit into https://github.com/apache/beam/tree/master/sdks/java/io/azure subproject as thisi is other azure IOs lies.

The io.azure is only azure blob storage, this io.azure.cosmos is for all apis for comosdb (mongo, cassandra, core sql api, Gremlin, table, etc)

I see, got it, never minds.

@Miuler Miuler force-pushed the master branch 2 times, most recently from c393dd0 to 283726f Compare November 22, 2022 15:06
@Miuler
Copy link
Contributor Author

Miuler commented Nov 22, 2022

what would be missing to continue? @Abacn @kennknowles

@Miuler
Copy link
Contributor Author

Miuler commented Dec 13, 2022

@kennknowles @Abacn What is missing to approve the pull?

@Abacn
Copy link
Contributor

Abacn commented Dec 13, 2022

Sorry for delay. R: @kennknowles

@Miuler
Copy link
Contributor Author

Miuler commented Dec 21, 2022

Sorry for delay. R: @kennknowles

what was it?

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delay. I was meant to pin @kennknowles who actually reviewed the PR. Here I only have some non-technical comments/tips:

Please resolve the test failures. The whitespace failure message is

./sdks/java/io/azure-cosmosdb/README.md:20:19: W201 Trailing whitespace

RAT failure means missing Apache License at new file header (see other source files in the code base for example)

Generally we also mark new IO connector with experimental annotation (org.apache.beam.sdk.annotations.Experimental); also I would appreciate some doc string for public class.

CHANGES.md Outdated Show resolved Hide resolved
@Abacn
Copy link
Contributor

Abacn commented Dec 24, 2022

And after rebase, could you please make changes on new commits instead of amend to reviewed commits. This helps us to review only diff

@Miuler Miuler force-pushed the master branch 2 times, most recently from feec6b9 to b37833c Compare December 28, 2022 18:50
Miuler added a commit to Miuler/beam that referenced this pull request Dec 28, 2022
Refs: apache#23604

diff --git a/.gitignore b/.gitignore
index f5fc63f9de..7dd6ee80d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -135,4 +135,3 @@ playground/frontend/playground_components/pubspec.lock
 **/*.tfvars

 # Ignore Katas auto-generated files
-**/*-remote-info.yaml
\ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index db9f37eeec..a5a1cfac94 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -62,6 +62,7 @@
 * S3 implementation of the Beam filesystem (Go) ([apache#23991](apache#23991)).
 * Support for SingleStoreDB source and sink added (Java) ([apache#22617](apache#22617)).
 * Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([apache#24210](apache#24210)).
+* Support for read from Cosmos DB Core SQL API [apache#23610](apache#23610)

 ## New Features / Improvements

diff --git a/sdks/java/io/azure-cosmosdb/README.md b/sdks/java/io/azure-cosmosdb/README.md
new file mode 100644
index 0000000000..f7c61c73ab
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/README.md
@@ -0,0 +1,40 @@
+# Cosmos DB Core SQL API
+
+Compile all module azure-cosmosdb
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:build
+```
+
+## Test
+
+Run TEST for this module (Cosmos DB Core SQL API):
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:test
+```
+
+
+## Publish in Maven Local
+
+Publish this module
+
+```shell
+# apache beam core
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/  publishToMavenLocal
+
+# apache beam azure-cosmosdb
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/  publishToMavenLocal
+```
+
+Publish all modules of apache beam
+
+```shell
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal
+```
+
+
diff --git a/sdks/java/io/azure-cosmosdb/build.gradle b/sdks/java/io/azure-cosmosdb/build.gradle
new file mode 100644
index 0000000000..3875532652
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+  id("org.apache.beam.module")
+  id("scala")
+}
+
+ext {
+  junitVersion = "5.9.1"
+  cosmosVersion = "4.37.1"
+  cosmosContainerVersion = "1.17.5"
+  bsonMongoVersion = "4.7.2"
+  log4jVersion = "2.19.0"
+}
+
+applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb")
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
+ext.summary = "IO library to read and write Azure Cosmos DB services from Beam."
+
+dependencies {
+  implementation("org.scala-lang:scala-library:2.12.17")
+  implementation("com.azure:azure-cosmos:${cosmosVersion}")
+  implementation library.java.commons_io
+  permitUnusedDeclared library.java.commons_io // BEAM-11761
+  implementation library.java.slf4j_api
+  implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation("org.mongodb:bson:${bsonMongoVersion}")
+}
+
+// TEST
+dependencies {
+  testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
+  testImplementation("com.outr:scribe_2.12:3.10.4")
+  testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
+  testImplementation library.java.mockito_core
+  testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion")
+  testRuntimeOnly library.java.slf4j_jdk14
+  testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
new file mode 100644
index 0000000000..f25b903068
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.models.CosmosQueryRequestOptions
+import com.azure.cosmos.{CosmosClient, CosmosClientBuilder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+
+private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
+  private val log = LoggerFactory.getLogger(getClass)
+  private var maybeClient: Option[CosmosClient] = None
+  private var maybeIterator: Option[java.util.Iterator[Document]] = None
+
+  override def start(): Boolean = {
+    maybeClient = Some(
+      new CosmosClientBuilder()
+        .gatewayMode
+        .endpointDiscoveryEnabled(false)
+        .endpoint(cosmosSource.readCosmos.endpoint)
+        .key(cosmosSource.readCosmos.key)
+        .buildClient
+    )
+
+    maybeIterator = maybeClient.map { client =>
+      log.info("Get the container name")
+
+      log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}")
+      client
+        .getDatabase(cosmosSource.readCosmos.database)
+        .getContainer(cosmosSource.readCosmos.container)
+        .queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document])
+        .iterator()
+    }
+
+    true
+  }
+
+  override def advance(): Boolean = maybeIterator.exists(_.hasNext)
+
+  override def getCurrent: Document = maybeIterator
+    .filter(_.hasNext)
+    //.map(iterator => new Document(iterator.next()))
+    .map(_.next())
+    .orNull
+
+  override def getCurrentSource: CosmosBoundedSource = cosmosSource
+
+  override def close(): Unit = maybeClient.foreach(_.close())
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
new file mode 100644
index 0000000000..39fafc9039
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
@@ -0,0 +1,25 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.coders.{Coder, SerializableCoder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.apache.beam.sdk.options.PipelineOptions
+import org.bson.Document
+
+import java.util
+import java.util.Collections
+
+class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] {
+
+  /** @inheritdoc
+   * TODO: You have to find a better way, maybe by partition key */
+  override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this)
+
+  /** @inheritdoc
+   * The Cosmos DB Coro (SQL) API not support this metrics by the querys */
+  override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L
+
+  override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document])
+
+  override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] =
+    new CosmosBoundedReader(this)
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
new file mode 100644
index 0000000000..fcd3a7abfc
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
@@ -0,0 +1,8 @@
+package org.apache.beam.sdk.io.azure.cosmos;;
+
+object CosmosIO {
+  def read(): CosmosRead = {
+    CosmosRead()
+  }
+}
+
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
new file mode 100644
index 0000000000..48a0ece0d4
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
@@ -0,0 +1,45 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.io.Read
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.values.{PBegin, PCollection}
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+case class CosmosRead(private[cosmos] val endpoint: String = null,
+                      private[cosmos] val key: String = null,
+                      private[cosmos] val database: String = null,
+                      private[cosmos] val container: String = null,
+                      private[cosmos] val query: String = null)
+  extends PTransform[PBegin, PCollection[Document]] {
+
+
+  private val log = LoggerFactory.getLogger(classOf[CosmosRead])
+
+  /** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */
+  def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint)
+
+  def withCosmosKey(key: String): CosmosRead = this.copy(key = key)
+
+  def withDatabase(database: String): CosmosRead = this.copy(database = database)
+
+  def withQuery(query: String): CosmosRead = this.copy(query = query)
+
+  def withContainer(container: String): CosmosRead = this.copy(container = container)
+
+  override def expand(input: PBegin): PCollection[Document] = {
+    log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query")
+    validate()
+
+    // input.getPipeline.apply(Read.from(new CosmosSource(this)))
+    input.apply(Read.from(new CosmosBoundedSource(this)))
+  }
+
+  private def validate(): Unit = {
+    require(endpoint != null, "CosmosDB endpoint is required")
+    require(key != null, "CosmosDB key is required")
+    require(database != null, "CosmosDB database is required")
+    require(container != null, "CosmosDB container is required")
+    require(query != null, "CosmosDB query is required")
+  }
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..28f1bb63f7
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=DEBUG, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+#log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
new file mode 100644
index 0000000000..7de8dda145
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
@@ -0,0 +1,103 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.CosmosClientBuilder
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer}
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.transforms.Count
+import org.apache.beam.sdk.values.PCollection
+import org.bson.Document
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.LoggerFactory
+import org.testcontainers.containers.CosmosDBEmulatorContainer
+import org.testcontainers.utility.DockerImageName
+
+import java.nio.file.Files
+import scala.util.Using
+
+@RunWith(classOf[JUnit4])
+class CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest")
+  //  @(Rule @Getter)
+  //  val pipelineWrite: TestPipeline = TestPipeline.create
+  //  @(Rule @Getter)
+  //  val pipelineRead: TestPipeline = TestPipeline.create
+
+  @test
+  def readFromCosmosCoreSqlApi(): Unit = {
+    val read = CosmosIO
+      .read()
+      .withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey)
+      .withQuery(s"SELECT * FROM c")
+      .withContainer(CONTAINER)
+      .withDatabase(DATABASE)
+
+    val pipeline = Pipeline.create()
+    val count: PCollection[java.lang.Long] = pipeline
+      .apply(read)
+      .apply(Count.globally())
+
+    PAssert.thatSingleton(count).isEqualTo(10)
+
+    pipeline.run().waitUntilFinish()
+  }
+}
+
+/** Initialization of static fields and methods */
+@RunWith(classOf[JUnit4])
+object CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
+  private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
+  private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME))
+  private val DATABASE = "test"
+  private val CONTAINER = "test"
+
+  @BeforeClass
+  def setup(): Unit = {
+    log.info("Starting CosmosDB emulator")
+    cosmosDBEmulatorContainer.start()
+
+    val tempFolder = new TemporaryFolder
+    tempFolder.create()
+    val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath
+    val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore
+    keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray)
+    System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString)
+    System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey)
+    System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")
+
+
+    log.info("Creando la data -------------------------------------------------------->")
+    val triedCreateData = Using(new CosmosClientBuilder()
+      .gatewayMode
+      .endpointDiscoveryEnabled(false)
+      .endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .key(cosmosDBEmulatorContainer.getEmulatorKey)
+      .buildClient) { client =>
+
+      client.createDatabase(DATABASE)
+      val db = client.getDatabase(DATABASE)
+      db.createContainer(CONTAINER, "/id")
+      val container = db.getContainer(CONTAINER)
+      for (i <- 1 to 10) {
+        container.createItem(new Document("id", i.toString))
+      }
+    }
+    if (triedCreateData.isFailure) {
+      val throwable = triedCreateData.failed.get
+      log.error("Error creando la data", throwable)
+      throw throwable
+    }
+    log.info("Data creada ------------------------------------------------------------<")
+  }
+
+  @afterclass
+  def close(): Unit = {
+    log.info("Stop CosmosDB emulator")
+    cosmosDBEmulatorContainer.stop()
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 8527d17d3c..033c9dc24b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -157,6 +157,7 @@ include(":sdks:java:io:amazon-web-services")
 include(":sdks:java:io:amazon-web-services2")
 include(":sdks:java:io:amqp")
 include(":sdks:java:io:azure")
+include(":sdks:java:io:azure-cosmosdb")
 include(":sdks:java:io:cassandra")
 include(":sdks:java:io:clickhouse")
 include(":sdks:java:io:common")
@Miuler
Copy link
Contributor Author

Miuler commented Dec 29, 2022

@kennknowles What would I need to continue?

@Miuler Miuler force-pushed the master branch 2 times, most recently from 31b93d7 to ed2dfb7 Compare December 30, 2022 10:11
@Abacn
Copy link
Contributor

Abacn commented Jan 4, 2023

@kennknowles is dealing with 2.44.0 release and if not available, let me try to find other reviewers R: @pabloem

@kennknowles
Copy link
Member

Oh hmm when I last commented I am not sure what I was thinking actually, and I don't remember noticing the scala dependency. What does this mean for Java users? The built artifact will have a dependency on a very specific version of the scala runtime and libraries, right? I don't think Beam has really done much work to manage different scala versions effectively. Though our runners do also use scala...

@Miuler
Copy link
Contributor Author

Miuler commented Jan 6, 2023

Oh hmm when I last commented I am not sure what I was thinking actually, and I don't remember noticing the scala dependency. What does this mean for Java users? The built artifact will have a dependency on a very specific version of the scala runtime and libraries, right? I don't think Beam has really done much work to manage different scala versions effectively. Though our runners do also use scala...

@kennknowles

The version used in most cases is 2.12, although there are still some 2.11, which should be updated,

./sdks/java/io/azure-cosmosdb/build.gradle:38:     implementation("org.scala-lang:scala-library:2.12.17")
./sdks/java/io/sparkreceiver/2/build.gradle:50:       compileOnly "org.scala-lang:scala-library:2.11.12"
./runners/spark/spark_runner.gradle:179:              compileOnly "org.scala-lang:scala-library:2.11.12"
./runners/spark/spark_runner.gradle:183:              compileOnly "org.scala-lang:scala-library:2.12.15"
./runners/spark/3/job-server/build.gradle:38:               force "org.scala-lang:scala-library:2.12.10"
./runners/samza/build.gradle:59:                      runtimeOnly "org.scala-lang:scala-library:2.11.8"

But it would be nice to work on gradle, so you can cross-compile for 2.12, 2.13, and it would be transparent for the current, 3.x version not yet, because neither spark nor flink support it yet.

@Abacn
Copy link
Contributor

Abacn commented Jan 17, 2023

If I understood correctly what @kennknowles means is that our build system has not considered mixed java/scala project (e.g. our sub project is organized as some_project/src/main/ instead of some_project/src/main/java and some_project/src/main/scala as in https://docs.gradle.org/current/userguide/scala_plugin.html)

The current ones listed above are all spark runner (which is written in Scala) related, either runner artifacts (runner/) or sparkreceiverIO. The latter was declared compileOnly. We still do not impose scala dependencies to end users.

Besides built-in IOs Beam also has more IO connectors in other repositories, some of them listed in https://beam.apache.org/documentation/io/connectors/#other-io-connectors-for-apache-beam . That said I would suggest put this library into an independent repo and would be happy to include it in Beam's documentation.

@Miuler
Copy link
Contributor Author

Miuler commented Jan 18, 2023

If I understood correctly what @kennknowles means is that our build system has not considered mixed java/scala project (e.g. our sub project is organized as some_project/src/main/ instead of some_project/src/main/java and some_project/src/main/scala as in https://docs.gradle.org/current/userguide/scala_plugin.html)

The current ones listed above are all spark runner (which is written in Scala) related, either runner artifacts (runner/) or sparkreceiverIO. The latter was declared compileOnly. We still do not impose scala dependencies to end users.

Besides built-in IOs Beam also has more IO connectors in other repositories, some of them listed in https://beam.apache.org/documentation/io/connectors/#other-io-connectors-for-apache-beam . That said I would suggest put this library into an independent repo and would be happy to include it in Beam's documentation.

Let's see, I don't understand, if there is scala at the IO level (sparkreceiver), those who do not use the azure-cosmosdb dependency will not include scala as a library, and by default, in all runners use scala 2.12 as version, neither flink nor spark support scala 3.x yet (at some point you have to modify gradle to support cross-compilation, 2.12, 2.13, 3.x, etc, at the moment it only supports one version)

@Miuler
Copy link
Contributor Author

Miuler commented Jan 19, 2023

@Abacn @kennknowles

what is needed?

The tests work like this:
gradle sdks:java:io:azure-cosmosdb:test

the validation of the code also:

gradle rat
gradle sdks:java:io:azure-cosmosdb:spotbugsMain

the publising too:
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/ publishToMavenLocal

the publication of everything also:

gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/ publishToMavenLocal

gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/ publishToMavenLocal

gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal

what else is needed? If as is the gradle compiles, tests, publishes?

https://github.com/apache/beam/blob/ed2dfb7886a694589fc2c4ab1f79364c11e2ba7c/sdks/java/io/azure-cosmosdb/README.md

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CosmosDB: Add support for Azure database CosmosDB with Core (SQL) Api
4 participants