Skip to content

Commit

Permalink
Add configurable commitAllMetricUpdates (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
FuRyanf authored Sep 4, 2024
1 parent 8dc92bc commit fa75dd3
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.25'
project.version = '2.45.26'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.25
sdk_version=2.45.25
version=2.45.26
sdk_version=2.45.26

javaVersion=1.8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,18 @@ public class SamzaMetricsContainer {
public static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS";
public static final String USE_SHORT_METRIC_NAMES_CONFIG =
"beam.samza.metrics.useShortMetricNames";
public static final String COMMIT_ALL_METRIC_UPDATES =
"beam.samza.metrics.commitAllMetricUpdates";

private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
private final MetricsRegistryMap metricsRegistry;
private final boolean useShortMetricNames;
private final boolean commitAllMetricUpdates;

public SamzaMetricsContainer(MetricsRegistryMap metricsRegistry, Config config) {
this.metricsRegistry = metricsRegistry;
this.useShortMetricNames = config.getBoolean(USE_SHORT_METRIC_NAMES_CONFIG, false);
this.commitAllMetricUpdates = config.getBoolean(COMMIT_ALL_METRIC_UPDATES, false);
this.metricsRegistry.metrics().put(BEAM_METRICS_GROUP, new ConcurrentHashMap<>());
LOG.info("Creating Samza metrics container with userShortMetricName = {}", useShortMetricNames);
}
Expand All @@ -81,11 +85,10 @@ public MetricsContainerStepMap getContainers() {
public void updateMetrics(String stepName) {

assert metricsRegistry != null;

final List<String> stepNameList = Arrays.asList(stepName, GLOBAL_CONTAINER_STEP_NAME);
// Since global metrics do not belong to any step, we need to update it in every step.
final MetricResults metricResults =
asAttemptedOnlyMetricResultsForSteps(
metricsContainers, Arrays.asList(stepName, GLOBAL_CONTAINER_STEP_NAME));
asAttemptedOnlyMetricResultsForSteps(metricsContainers, stepNameList);
final MetricQueryResults results = metricResults.allMetrics();

final CounterUpdater updateCounter = new CounterUpdater();
Expand All @@ -96,6 +99,11 @@ public void updateMetrics(String stepName) {

final DistributionUpdater updateDistribution = new DistributionUpdater();
results.getDistributions().forEach(updateDistribution);
if (commitAllMetricUpdates) {
stepNameList.stream()
.map(metricsContainers::getContainer)
.forEach(MetricsContainerImpl::commitUpdates);
}
}

public void updateExecutableStageBundleMetric(String metricName, long time) {
Expand Down

0 comments on commit fa75dd3

Please sign in to comment.