-
Notifications
You must be signed in to change notification settings - Fork 121
Structured Stream demos
Khoa Dang edited this page Oct 17, 2017
·
9 revisions
Below are the results of some Structured Streaming test runs in different scenarios using the CosmosDB Spark Connector.
Partitioned Collection: 100,000 RUs, 50 partitions
Azure HDI Cluster: 14 Standard_D12_v2 workers (4 cores, 28GB memory). 56 cores, 392GB memory in total.
- Set up the CosmosDB collection to have various insert load ranging from 10 to 1500 documents/second.
- Start a streaming source reading data from the CosmosDB change feed of the collection and use the console as the streaming sink.
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.codehaus.jackson.map.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.streaming._
val configMap = Map(
"Endpoint" -> "COSMOSDB ENDPOINT",
"Masterkey" -> "COSMOSDB MASTER KEY",
"Database" -> "DATABASE NAME",
"collection" -> "COLLECTION NAME",
"ChangeFeedCheckpointLocation" -> "checkpointlocation",
"changefeedqueryname" -> "Structured Stream interval count")
// Start reading change feed as a stream
var streamData = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(sourceConfigMap).load()
// Start streaming query to console sink
val query = streamData.withColumn("countcol", streamData.col("id").substr(0, 0)).groupBy("countcol").count().writeStream.outputMode("complete").format("console").start()
We've noticed that the change feed documents were received correctly for all configurations of insert load. In case of node failures, the connector was able to resume the change feed since the last checkpoint. Below are the highlights from the test run logs.
# Writing 100k documents
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
+--------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
+--------+-----+
...
-------------------------------------------
Batch: 5
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
| | 8108|
+--------+-----+
-------------------------------------------
Batch: 6
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
| |21040|
+--------+-----+
...
-------------------------------------------
Batch: 13
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
| |99907|
+--------+-----+
-------------------------------------------
Batch: 14
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
| |100000|
+--------+------+
...
# Writing 200k documents
-------------------------------------------
Batch: 39
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
| |108095|
+--------+------+
...
-------------------------------------------
Batch: 56
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
| |292406|
+--------+------+
-------------------------------------------
Batch: 57
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
| |300000|
+--------+------+
...
# Writing 200k documents and there were node failures
-------------------------------------------
Batch: 67
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
| |300647|
+--------+------+
...
-------------------------------------------
Batch: 76
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
| |394534|
+--------+------+
# The streaming source was terminated and restarted
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
| |105466|
+--------+------+
- Set up the CosmosDB collection to have various insert load ranging from 10 to 1500 documents/second.
- Start a streaming source reading data from the CosmosDB change feed of the collection and create a sink to another CosmosDB collection.
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.codehaus.jackson.map.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.streaming._
val configMap = Map(
"Endpoint" -> "COSMOSDB ENDPOINT",
"Masterkey" -> "COSMOSDB MASTER KEY",
"Database" -> "DATABASE NAME",
"collection" -> "COLLECTION NAME",
"ChangeFeedCheckpointLocation" -> "changefeedcheckpointlocation")
val sourceConfigMap = configMap.+(("changefeedqueryname", "Structured Stream replication streaming test"))
// Start to read the stream
var streamData = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(sourceConfigMap).load()
val sinkConfigMap = configMap.-("collection").+(("collection", sinkCollection))
// Start the stream writer
val streamingQueryWriter = streamData.writeStream.format(classOf[CosmosDBSinkProvider].getName).outputMode("append").options(sinkConfigMap).option("checkpointLocation", "streamingcheckpointlocation")
var streamingQuery = streamingQueryWriter.start()