Skip to content

Commit

Permalink
feat(azure-cosmosdb): Add support for cosmosdb with Core (SQL) API
Browse files Browse the repository at this point in the history
Refs: #23604
  • Loading branch information
Miuler committed Oct 13, 2022
1 parent 98cebe7 commit da8151f
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,5 @@ website/www/yarn-error.log
**/*.tfvars

# Ignore Katas auto-generated files
**/*-remote-info.yaml
**/*-remote-info.yaml
**/*.backup
7 changes: 7 additions & 0 deletions sdks/java/io/azure-cosmosdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@


Run test for this module (cosmosdb)

```shell
gradle ":sdks:java:io:azure-cosmosdb:test"
```
71 changes: 71 additions & 0 deletions sdks/java/io/azure-cosmosdb/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id("org.apache.beam.module")
id("scala")
}

ext {
junitVersion = "5.9.1"
cosmosVersion = "4.37.1"
cosmosContainerVersion = "1.17.5"
lombokVersion = "1.18.24"
}

applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb")

description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
ext.summary = "IO library to read and write Azure Cosmos DB services from Beam."

dependencies {
implementation("org.scala-lang:scala-library:2.13.10")
implementation("com.azure:azure-cosmos:${cosmosVersion}")
implementation library.java.commons_io
permitUnusedDeclared library.java.commons_io // BEAM-11761
implementation library.java.slf4j_api
implementation library.java.vendored_guava_26_0_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.jackson_annotations
implementation library.java.jackson_core
implementation library.java.jackson_databind
}
//implementation("com.azure:azure-identity:1.0.8")
//implementation("com.azure:azure-core:1.9.0")
//implementation "com.microsoft.azure:azure-storage:8.6.5"
//implementation "com.azure:azure-storage-blob:12.10.0"
//implementation "com.azure:azure-storage-common:12.10.0"

// LAMBOK
dependencies {
compileOnly("org.projectlombok:lombok:${lombokVersion}")
annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
testCompileOnly("org.projectlombok:lombok:${lombokVersion}")
testAnnotationProcessor("org.projectlombok:lombok:${lombokVersion}")
}

// TEST
dependencies {
testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.mockito_core
//testImplementation library.java.junit
testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly library.java.slf4j_jdk14
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.beam.sdk.io.azure.cosmos

import org.apache.beam.sdk.io.BoundedSource

class CosmosBoundedReader extends BoundedSource.BoundedReader[String] {
override def start(): Boolean = ???

override def advance(): Boolean = ???

override def getCurrent: String = ???

override def getCurrentSource: String = ???

override def close(): Unit = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.beam.sdk.io.azure.cosmos;

import org.apache.beam.sdk.io.Read
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{ PBegin, PCollection };

object CosmosIO {
def read(): ReadCosmos = {
ReadCosmos();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.beam.sdk.io.azure.cosmos

import org.apache.beam.sdk.io.BoundedSource
import org.apache.beam.sdk.options.PipelineOptions

import java.util

class CosmosSource(readCosmos: ReadCosmos) extends BoundedSource {
override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[_ <: BoundedSource[Nothing]] =
???

override def getEstimatedSizeBytes(options: PipelineOptions): Long = ???

override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Nothing] = new CosmosBoundedReader()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.beam.sdk.io.azure.cosmos

import org.apache.beam.sdk.io.Read
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{ PBegin, PCollection }

case class ReadCosmos(endpoint: String = null) extends PTransform[PBegin, PCollection[String]] {

/** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */
def withCosmosEndpoint(endpoint: String): ReadCosmos =
this.copy(endpoint = endpoint)

override def expand(input: PBegin): PCollection[String] = {
// input.getPipeline.apply(Read.from(new CosmosSource(this)))
input.apply(Read.from(new CosmosSource(this)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.beam.sdk.io.azure.cosmos;

import org.junit.Test;

public class CosmosIOJTest {

@Test
public void test() {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.apache.beam.sdk.io.azure.cosmos

import com.azure.cosmos.CosmosClientBuilder
import org.apache.beam.sdk.testing.TestPipeline
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.LoggerFactory
import org.testcontainers.containers.CosmosDBEmulatorContainer
import org.testcontainers.utility.DockerImageName

import java.nio.file.Files
import scala.annotation.meta.getter

@RunWith(classOf[JUnit4])
class CosmosIOTest {
private val log = LoggerFactory.getLogger("CosmosIOTest")
@(Rule @getter)
val pipelineWrite: TestPipeline = TestPipeline.create
@(Rule @getter)
val pipelineRead: TestPipeline = TestPipeline.create

@Test def test(): Unit = {
ReadCosmosBuilder()
val client = new CosmosClientBuilder().gatewayMode
.endpointDiscoveryEnabled(false)
.endpoint(CosmosIOTest.emulator.getEmulatorEndpoint)
.key(CosmosIOTest.emulator.getEmulatorKey)
.buildClient
log.info("CosmosDB client created {}", client)
}
}

@RunWith(classOf[JUnit4])
object CosmosIOTest {
private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
private var emulator: CosmosDBEmulatorContainer = null

@BeforeClass
def setup(): Unit = {
log.info("Starting CosmosDB emulator")
emulator = new CosmosDBEmulatorContainer(
DockerImageName.parse("mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest")
)
emulator.start()
val tempFolder = new TemporaryFolder
tempFolder.create()
val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath
val keyStore = emulator.buildNewKeyStore
keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), emulator.getEmulatorKey.toCharArray)
System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString)
System.setProperty("javax.net.ssl.trustStorePassword", emulator.getEmulatorKey)
System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")
}

@AfterClass def close(): Unit = {
log.info("Stop CosmosDB emulator")
emulator.close()
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ include(":sdks:java:io:amazon-web-services")
include(":sdks:java:io:amazon-web-services2")
include(":sdks:java:io:amqp")
include(":sdks:java:io:azure")
include(":sdks:java:io:azure-cosmosdb")
include(":sdks:java:io:cassandra")
include(":sdks:java:io:clickhouse")
include(":sdks:java:io:common")
Expand Down

0 comments on commit da8151f

Please sign in to comment.