Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT SUBMIT SPLIT INTO OTHER PRsAdd HarnessMonitoringInfosRequest/Response InstructionRequest and MonitoringInfosMetadataRequest/Response support to Java #14490

Closed
wants to merge 4 commits into from

Conversation

ajamato
Copy link

@ajamato ajamato commented Apr 9, 2021

[BEAM-11994] Add HarnessMonitoringInfosRequest/Response InstructionRequest and MonitoringInfosMetadataRequest/Response support to Java SDK.

  • Adds InstructionRequest handler for HarnessMonitoringInfosRequest/Response (ported from Python SDK)
  • Adds start times to each MonitoringInfo recorded (ported from Python SDK)
  • Modified ShortIdMap to lookup based on the MonitoringInfoMetricName. We can use this in a follow up PR to further optomize the Java SDK Harness, to only construct each MonitoringInfo (URN+labels) pair once.
  • Enables the InstructionRequest in the Environment

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK ULR Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status Build Status --- Build Status ---
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@ajamato
Copy link
Author

ajamato commented Apr 9, 2021

R: @pabloem

@ajamato ajamato force-pushed the bq_metrics_process_wide branch 7 times, most recently from 32bd70a to 2bbc236 Compare April 10, 2021 05:00
@pabloem
Copy link
Member

pabloem commented Apr 14, 2021

sorry about the delay. taking a look..

@pabloem
Copy link
Member

pabloem commented Apr 14, 2021

Run Java PostCommit

@@ -36,4 +38,9 @@

/** Reset this metric. */
void reset();

/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this docstring right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the comment


/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
default @Nullable DateTime getStartTime() {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to not add a default, and make this not-nullable? (i.e. force all implementers to define a non-null start time?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to add this to all the metrics, because I didn't want to add unncessary calls to the system to get clock times.

Let's add it to just this place first, and consider adding more if they are needed for another project.

@@ -302,7 +325,7 @@ public void commitUpdates() {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
UpdateT update = checkNotNull(cell.getValue().getCumulative());
updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update));
updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update, null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the start time should be the current time? or maybe not?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -98,7 +98,8 @@ public void flush(boolean async) {
UpdateT value = cell.getValue();
if (value != null) {
MetricKey key = MetricKey.create(stepName, cell.getName());
MetricUpdates.MetricUpdate<UpdateT> update = MetricUpdates.MetricUpdate.create(key, value);
MetricUpdates.MetricUpdate<UpdateT> update =
MetricUpdates.MetricUpdate.create(key, value, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to get the time from the cell?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is actually now the same MetricCell class. so I can't pull a startTime off it.

its actually a CellT extends AbstractMetric

return false;
}

default void setIsProcessWide(boolean procesWide) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if setting does not do anything, maybe we should throw an error instead of ignoring the instruction silently?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 30 to 33
private HashMap<MonitoringInfoMetricName, String> infoKeyToShortId = new HashMap<>();

private HashMap<String, MetricsApi.MonitoringInfo> shortIdToInfo = new HashMap<>();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if these should use a Guava cache with automatic invalidation and so on?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file was deleted. As @robertwb checked in a different implemetnation in the last few days.

* MonitoringInfos payloads and MonitoringInfo metadata for "process-wide" metrics, which return
* metric values calculated over the life of the process.
*/
public class ProcessWideInstructionHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename this to reference MonitoringInfoMetadataInstructionHandler or something like that? Since it has process-wide and non-process-wide MIs

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to HarnessMonitoringInfosInstructionHandler, which is all this does now.

BeamFnApi.MonitoringInfosMetadataResponse.newBuilder();
response.putAllMonitoringInfo(
MonitoringInfoShortIdCache.getShortIdCache()
.getInfos(request.getMonitoringInfos().getMonitoringInfoIdList()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the only moment when a MInfo gets added to the ShortIdCache is when we call getShortId on it, right? I don't see getShortId getting called for the monitoringInfoMetadata, only for the harnessMonitoringInfos - so I wonder how do we get the monitoring infos from the other containers in this call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wasn't implemented when I sent you this PR. Though robert's change did introduce it.
I have updated the PR to merge with his implementation.

@ajamato
Copy link
Author

ajamato commented Apr 16, 2021

R: @robertwb
Looks like you had added the ShortID code in this PR
4dd45e0

I will take a look at your change, and try to merge things together.

@ajamato ajamato force-pushed the bq_metrics_process_wide branch 5 times, most recently from 813774d to e10e165 Compare April 22, 2021 01:52
@ajamato ajamato requested a review from pabloem April 22, 2021 01:52
@ajamato
Copy link
Author

ajamato commented Apr 26, 2021

Run Java_Examples_Dataflow PreCommit

@ajamato ajamato force-pushed the bq_metrics_process_wide branch from e10e165 to 7f18273 Compare April 27, 2021 20:37
@pabloem
Copy link
Member

pabloem commented Apr 28, 2021

Run Java PreCommit

Copy link
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for the one comment.

MonitoringInfo.Builder cleaner = MonitoringInfo.newBuilder(info);
cleaner.clearPayload();
cleaner.clearStartTime();
cleaner.clearType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the type also necessary to identify a monitoring info? What identifies it / what goes into the hash? Is it hash consistent?

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR seems to do three separate things.

(1) Change the way ShortIdMap works,
(2) Add process-wide metrics, and
(3) add the notion of startTime to a subset of metrics.

It may be preferable to split them up.

@@ -45,6 +48,8 @@
*/
public CounterCell(MetricName name) {
this.name = name;
// The start time of when this cell was first instantied by the container.
this.startTime = new DateTime(DateTimeZone.UTC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a concern that this might be slow?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to #14805

Yes, revised to only make this call once per process in the new PR

@@ -94,6 +99,11 @@ public MetricName getName() {
return name;
}

@Override
public @Nullable DateTime getStartTime() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we always initialize it, when would it be null?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to #14805

I don't want to make the system call to get the time for every metric as you suggested due to performance concerns. So I make only the process wide MetricContainer make this call. So it should be called once for the process.

@@ -74,6 +74,8 @@

protected final @Nullable String stepName;

private boolean isProcessWide = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this final. We should know at instantiation time whether it's process wide (given that the step name above is final).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to #14805

Done in the new PR

/** Create a new {@link MetricsContainerImpl} associated with the given {@code stepName}. */
/**
* Create a new {@link MetricsContainerImpl} associated with the given {@code stepName}. If
* stepName is null, this MetricsContainer is not bound to a step.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, require this be non-null, and add a no-arg constructor for the no-step-name (presumably is-process-wide) case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to #14805

Done in the new PR

@@ -262,6 +265,8 @@ public static void main(
}
});

MetricsEnvironment.setProcessWideContainer(new MetricsContainerImpl(null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we don't hold onto a reference for this, why don't we have MetricsEnvironment create (if possibly lazily) the process-wide container?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to #14805

Unfortunately I could not make MetricsEnvironment instantiate a MetricsContainerImpl inside itself as this would create a dep cycle between their packages. So keeping this one as is.

@robertwb
Copy link
Contributor

I'm wondering about the value of attaching startTime to each metric. It seems to add a lot of complexity and is a bit ill-defined in the Beam model. Are these tied somehow to process-wide metrics? If so, could we just use the start time of the process itself when publishing these rather than adding the overhead of tracking it per-metric?

@ajamato ajamato force-pushed the bq_metrics_process_wide branch from 7f18273 to 25aae15 Compare May 13, 2021 00:40
@ajamato ajamato changed the title [BEAM-11994] Add HarnessMonitoringInfosRequest/Response InstructionRequest and MonitoringInfosMetadataRequest/Response support to Java DO NOT SUBMIT SPLIT INTO OTHER PRsAdd HarnessMonitoringInfosRequest/Response InstructionRequest and MonitoringInfosMetadataRequest/Response support to Java May 13, 2021
@ajamato
Copy link
Author

ajamato commented May 13, 2021

i have split this up into two PRs, rather than 3. As the startTime logic is much simpler now, and only done for the process wide MetricContainer. Overall the whole thing is much smaller. PTAL

(1) Change the way ShortIdMap works,
#14804
(2) Add process-wide metrics, and add the notion of startTime to a subset of metrics.
#14805

@robertwb
Copy link
Contributor

robertwb commented May 13, 2021 via email

@ajamato ajamato closed this May 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants