From 61da1dd135e40ccfdef0dbe79e60062a47f8acd3 Mon Sep 17 00:00:00 2001 From: Edward Cheng Date: Mon, 22 Jan 2024 20:43:03 +0000 Subject: [PATCH] add remaining metrics --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- .../runners/dataflow/worker/StreamingDataflowWorker.java | 4 +++- .../dataflow/worker/DataflowWorkUnitClientTest.java | 8 +++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 41bcc70fe532..51b3cbbf178f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -734,7 +734,7 @@ class BeamModulePlugin implements Plugin { google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20230812-$google_clients_version", // Keep version consistent with the version in google_cloud_resourcemanager, managed by google_cloud_platform_libraries_bom google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20230806-$google_clients_version", - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20231203-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240113-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240110-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", // Keep version consistent with the version in google_cloud_nio, managed by google_cloud_platform_libraries_bom diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f2d7c02729c5..60e41526b49d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1762,8 +1762,10 @@ private void sendWorkerMessage() throws IOException { new StreamingScalingReport() .setActiveThreadCount(workUnitExecutor.activeCount()) .setActiveBundleCount(workUnitExecutor.elementsOutstanding()) + .setOutstandingBytes(workUnitExecutor.bytesOutstanding()) .setMaximumThreadCount(chooseMaximumNumberOfThreads()) - .setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding()); + .setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding()) + .setMaximumBytes(workUnitExecutor.maximumBytesOutstanding()); workUnitClient.reportWorkerMessage( workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java index 7720de3563b8..853c282cf740 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java @@ -240,7 +240,13 @@ public void testReportWorkerMessage() throws Exception { response.setContent(workerMessage.toPrettyString()); when(request.execute()).thenReturn(response); StreamingScalingReport activeThreadsReport = - new StreamingScalingReport().setActiveThreadCount(1); + new StreamingScalingReport() + .setActiveThreadCount(1) + .setActiveBundleCount(2) + .setOutstandingBytes(3) + .setMaximumThreadCount(4) + .setMaximumBundleCount(5) + .setMaximumBytes(6); WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); WorkerMessage msg = client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport); client.reportWorkerMessage(msg);