-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest: processor stats #34202
ingest: processor stats #34202
Conversation
This change introduces stats per processors. Total, time, failed, current are currently supported. All pipelines will now show all top level processors that belong to it. Failure processors are not displayed, however, the time taken to execute the failure chain is part of the stats for the top level processor. The processor name is the type of the processor, ordered as defined in the pipeline. If a tag for the processor is found, then the tag is appended to the type. Pipeline processors will have the pipeline name appended to the name of the name of the processors (before the tag if one exists). If more then one pipeline is used to process the document, then each pipeline will carry its own stats. The outer most pipeline will also include the inner most pipeline stats. Conditional processors will only included in the stats if the condition evaluates to true.
Pinging @elastic/es-core-infra |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments
super(); | ||
this.ignoreFailure = ignoreFailure; | ||
this.processors = processors; | ||
this.onFailureProcessors = onFailureProcessors; | ||
this.clock = clock; | ||
processorsWithMetrics = new ArrayList<>(processors.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use consistent style, setting with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) { | ||
Processor processor = processorWithMetric.v1(); | ||
IngestMetric metric = processorWithMetric.v2(); | ||
long startTimeInMillis = clock.millis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is backed by System.currentTimeMillis()
. In other areas of the system, we avoid this and use a wrapper on ThreadPool which avoids excessive calls to that method, caching the result across threads. Either we should be using a LongSupplier like we do in other areas, or have a Clock implementation backed by ThreadPool. Additionally, I'm not sure we need absolute time, so probably the other method for relative time would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { | ||
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spacing is off here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
return ingestDocument; | ||
} | ||
|
||
Processor getProcessor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is just going to be used by tests, why not make the member package protected instead of a getter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used here: https://github.com/elastic/elasticsearch/pull/34202/files#diff-579dffc1e22e3db13c41f685046b2891R439 to get the actual processor name
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since | ||
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and | ||
//consistent id's per processor and/or semantic equals for each processor will be needed. | ||
if(newPerProcessMetrics.size() == oldPerProcessMetrics.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>(); | ||
getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics); | ||
getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics); | ||
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why try to transfer metrics at all? Could we just say when a pipeline's configuration is updated, the metrics are reset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When 1 pipeline changes, then all pipelines are rebuilt and without this code we would loose all metrics on any pipeline change. For example we have 2 pipelines, and 1 one of them is deleted, we don't want to loose the metrics for the pipeline that had no changes. We don't have easy access to exactly which pipeline changed, and the heuristic to carry forward the metrics is if the pipeline still exists, the count of processors and the types of processor (in order) don't change, then carry forward the metrics.
pipelineStats.writeTo(out); | ||
List<Tuple<String, Stats>> processorStats = entry.getValue().v2(); | ||
out.writeVInt(processorStats.size()); | ||
for(Tuple<String, Stats> processorTuple : processorStats){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
*/ | ||
public Map<String, Stats> getStatsPerPipeline() { | ||
public Map<String, Tuple<IngestStats.Stats, List<Tuple<String, IngestStats.Stats>>>> getStatsPerPipeline() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the type here would be much easier to understand as a dedicated class, rather than a very nested set of Tuple/List/Map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 8067758. IngestStats now accepts totalStats, List, and Map<String, ProcessorStat> (keyed by pipelineId), and a builder to help build from the Metrics representation. I hope this makes the code more readable.
List<Tuple<String, IngestStats.Stats>> deserializedProcessorStats = | ||
deserializedIngestStats.getProcessorStatsForPipeline(pipelineName); | ||
Iterator<Tuple<String, IngestStats.Stats>> it = deserializedProcessorStats.iterator(); | ||
for(Tuple<String, IngestStats.Stats> processorTuple : processorStats){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@rjernst - All initial comments have been addressed. Mind to take another look ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* master: Use trial license in docs tests (elastic#34673) Scripting: Convert script fields to use script context (elastic#34164) TEST: Mute testDedupByPrimaryTerm ingest: processor stats (elastic#34202)
This reverts commit 6567729.
I reverted this from master in 0577703 due to failing tests in the mixed cluster tests. |
* master: Revert "ingest: processor stats (elastic#34202)"
* master: Revert "ingest: processor stats (elastic#34202)"
This change introduces stats per processors. Total, time, failed, current are currently supported. All pipelines will now show all top level processors that belong to it. Failure processors are not displayed, however, the time taken to execute the failure chain is part of the stats for the top level processor. The processor name is the type of the processor, ordered as defined in the pipeline. If a tag for the processor is found, then the tag is appended to the type. Pipeline processors will have the pipeline name appended to the name of the name of the processors (before the tag if one exists). If more then one pipeline is used to process the document, then each pipeline will carry its own stats. The outer most pipeline will also include the inner most pipeline stats. Conditional processors will only included in the stats if the condition evaluates to true.
Re-introduce this change with test fix on #34724 |
@yaronp68 FYI |
This change introduces stats per processors. Total, time, failed, current are currently supported. All pipelines will now show all top level processors that belong to it. Failure processors are not displayed, however, the time taken to execute the failure chain is part of the stats for the top level processor. The processor name is the type of the processor, ordered as defined in the pipeline. If a tag for the processor is found, then the tag is appended to the type. Pipeline processors will have the pipeline name appended to the name of the name of the processors (before the tag if one exists). If more then one pipeline is used to process the document, then each pipeline will carry its own stats. The outer most pipeline will also include the inner most pipeline stats. Conditional processors will only included in the stats if the condition evaluates to true.
This change introduces stats per processors. Total, time, failed,
current are currently supported. All pipelines will now show all
top level processors that belong to it. Failure processors are not
displayed, however, the time taken to execute the failure chain is part
of the stats for the top level processor.
The processor name is the type of the processor, ordered as defined in
the pipeline. If a tag for the processor is found, then the tag is
appended to the type (colon separated).
Pipeline processors will have the pipeline name appended (colon separated)
to the name of the pipeline (before the tag if one exists).
If more then one pipeline is used to process the document, then each pipeline
will carry its own stats. The outer most pipeline will also include the
inner most pipeline stats.
Conditional processors will only be included in the stats if the condition
evaluates to true.
Best attempts are made to carry forward processor metrics between
cluster state changes. If no changes are made to the pipeline, metrics
will be carried forward. However, if the pipeline changes and the
number of processors or the order of the processors (determined by type)
changes, the processor metrics will be reset to zero.
Closes #33387
An example pipeline:
^^ The
"processors": [
is new.An example with a tag and a pipeline processor:
Rally with the http_logs with grok was run against the code prior and with this change. No performance impact was noticed.
Also, note this PR does not make any changes to the output of simulate or simulate?verbose. (that will be a different PR).