Skip to content

Commit

Permalink
Chrwhit/master update two (#22123)
Browse files Browse the repository at this point in the history
* Exposes management node in azure-core-amqp (#22095)

* Update AmqpConnection to have a getManagementNode.

* Adding AmqpManagementNode.

* Update AmqpConnection, AmqpManagementNode, AmqpSession to use AsyncCloseable.

* Adding AsyncCloseable to AmqpLink.

* ClaimsBasedSecurityNode.java uses AsyncCloseable.

* Implements CbsNode's closeAsync() and adds tests.

* ReactorSession implements closeAsync()

* ReactorConnection uses closeAsync(). Renames dispose() to closeAsync(). Fixes errors where some close operations were not subscribed to.

* RequestResponseChannel. Remove close operation with message.

* Adding DeliveryOutcome models and DeliveryState enum.

* Add authorization scope to connection options.

* Add MessageUtils to serialize and deserialize AmqpAnnotatedMessage

* Update AmqpManagementNode to expose delivery outcomes because they can be associated with messages.

* Adding MessageUtil support for converting DeliveryOutcome and Outcomes.

* Fixing build breaks from ConnectionOptions.

* Adding management channel class.

* Adding management channel into ReactorConnection.

* Update ExceptionUtil to return instead of throwing on unknown amqp error codes.

* Moving ManagementChannel formatting.

* Add javadocs to ReceivedDeliveryOutcome.

* Add tests for ManagementChannel

* Adding tests for message utils.

* Fix javadoc on ModifiedDeliveryOutcome

* ReactorConnection: Hook up dispose method.

* EventHubs: Fixing instances of ConnectionOptions.

* ServiceBus: Fix build errors using ConnectionOptions.

* Adding MessageUtilsTests.

* Updating CHANGELOG.

* Annotate HttpRange with Immutable (#22119)

* Cosmos Spark: Changing inferSchema.forceNullableProperties default to true (#22049)

* Changing default

* Docs

* Tests

* new test

* doc update

* Change log

* Make getScopes in the ARM Authentication Policy Public (#22120)

Make getScopes in the ARM Authentication Policy Public

Co-authored-by: Connie Yau <[email protected]>
Co-authored-by: Alan Zimmer <[email protected]>
Co-authored-by: Matias Quaranta <[email protected]>
  • Loading branch information
4 people committed Jun 9, 2021
1 parent e1640c6 commit e065375
Showing 1 changed file with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,57 @@ class SparkE2EQueryITest
}
}

"spark query" can "when forceNullableProperties is false and rows have different schema" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
val samplingSize = 100
val expectedResults = samplingSize * 2
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)

// Inserting documents with slightly different schema
for( _ <- 1 to expectedResults) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
val arr = objectNode.putArray("object_array")
val nested = Utils.getSimpleObjectMapper.createObjectNode()
nested.put("A", "test")
nested.put("B", "test")
arr.add(nested)
objectNode.put("id", UUID.randomUUID().toString)
container.createItem(objectNode).block()
}

for( _ <- 1 to samplingSize) {
val objectNode2 = Utils.getSimpleObjectMapper.createObjectNode()
val arr = objectNode2.putArray("object_array")
val nested = Utils.getSimpleObjectMapper.createObjectNode()
nested.put("A", "test")
arr.add(nested)
objectNode2.put("id", UUID.randomUUID().toString)
container.createItem(objectNode2).block()
}

val cfgWithInference = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "true",
"spark.cosmos.read.inferSchema.forceNullableProperties" -> "false",
"spark.cosmos.read.inferSchema.samplingSize" -> samplingSize.toString,
"spark.cosmos.read.inferSchema.query" -> "SELECT * FROM c ORDER BY c._ts",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
)

val dfWithInference = spark.read.format("cosmos.oltp").options(cfgWithInference).load()
try {
dfWithInference.collect()
fail("Should have thrown an exception")
}
catch {
case inner: Exception =>
inner.toString.contains("The 1th field 'B' of input row cannot be null") shouldBe true
}
}

"spark query" can "use custom sampling size" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down

0 comments on commit e065375

Please sign in to comment.