Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apache_spark][driver] Add Apache Spark package with Driver data stream #2945

Merged
merged 10 commits into from
Apr 22, 2022
4 changes: 4 additions & 0 deletions packages/apache_spark/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ The Apache Spark integration collects and parses data using the Jolokia Metricbe

## Compatibility

<<<<<<< HEAD
yug-rajani marked this conversation as resolved.
Show resolved Hide resolved
This module has been tested against `Apache Spark version 3.2.0`
yug-rajani marked this conversation as resolved.
Show resolved Hide resolved
=======
This integration has been tested against `Apache Spark version 3.2.0`
>>>>>>> acfb43c556154ae398e89ab340c67999079cd425

## Requirements

Expand Down
289 changes: 289 additions & 0 deletions packages/apache_spark/data_stream/driver/agent/stream/stream.yml.hbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
metricsets: ["jmx"]
namespace: "metrics"
hosts:
{{#each hosts}}
- {{this}}
{{/each}}
path: {{path}}
period: {{period}}
jmx.mappings:
- mbean: 'metrics:name=*.driver.appStatus.jobDuration,type=gauges'
attributes:
- attr: Value
field: driver.job_duration
- mbean: 'metrics:name=*.driver.appStatus.jobs.failedJobs,type=counters'
attributes:
- attr: Count
field: driver.jobs.failed
- mbean: 'metrics:name=*.driver.appStatus.jobs.succeededJobs,type=counters'
attributes:
- attr: Count
field: driver.jobs.succeeded
- mbean: 'metrics:name=*.driver.appStatus.stages.completedStages,type=counters'
attributes:
- attr: Count
field: driver.stages.completed_count
- mbean: 'metrics:name=*.driver.appStatus.stages.failedStages,type=counters'
attributes:
- attr: Count
field: driver.stages.failed_count
- mbean: 'metrics:name=*.driver.appStatus.stages.skippedStages,type=counters'
attributes:
- attr: Count
field: driver.stages.skipped_count
- mbean: 'metrics:name=*.driver.appStatus.tasks.blackListedExecutors,type=counters'
attributes:
- attr: Count
field: driver.tasks.executors.black_listed
- mbean: 'metrics:name=*.driver.appStatus.tasks.completedTasks,type=counters'
attributes:
- attr: Count
field: driver.tasks.completed
- mbean: 'metrics:name=*.driver.appStatus.tasks.excludedExecutors,type=counters'
attributes:
- attr: Count
field: driver.tasks.executors.excluded
- mbean: 'metrics:name=*.driver.appStatus.tasks.failedTasks,type=counters'
attributes:
- attr: Count
field: driver.tasks.failed
- mbean: 'metrics:name=*.driver.appStatus.tasks.killedTasks,type=counters'
attributes:
- attr: Count
field: driver.tasks.killed
- mbean: 'metrics:name=*.driver.appStatus.tasks.skippedTasks,type=counters'
attributes:
- attr: Count
field: driver.tasks.skipped
- mbean: 'metrics:name=*.driver.appStatus.tasks.unblackListedExecutors,type=counters'
attributes:
- attr: Count
field: driver.tasks.executors.unblack_listed
- mbean: 'metrics:name=*.driver.appStatus.tasks.unexcludedExecutors,type=counters'
attributes:
- attr: Count
field: driver.tasks.executors.unexcluded
- mbean: 'metrics:name=*.driver.BlockManager.disk.diskSpaceUsed_MB,type=gauges'
attributes:
- attr: Value
field: driver.disk.space_used
- mbean: 'metrics:name=*.driver.BlockManager.memory.maxMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.max_mem
- mbean: 'metrics:name=*.driver.BlockManager.memory.maxOffHeapMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.off_heap.max
- mbean: 'metrics:name=*.driver.BlockManager.memory.maxOnHeapMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.on_heap.max
- mbean: 'metrics:name=*.driver.BlockManager.memory.memUsed_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.used
- mbean: 'metrics:name=*.driver.BlockManager.memory.offHeapMemUsed_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.off_heap.used
- mbean: 'metrics:name=*.driver.BlockManager.memory.onHeapMemUsed_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.on_heap.used
- mbean: 'metrics:name=*.driver.BlockManager.memory.remainingMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.remaining
- mbean: 'metrics:name=*.driver.BlockManager.memory.remainingOffHeapMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.off_heap.remaining
- mbean: 'metrics:name=*.driver.BlockManager.memory.remainingOnHeapMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.on_heap.remaining
- mbean: 'metrics:name=*.driver.DAGScheduler.job.activeJobs,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.job.active
- mbean: 'metrics:name=*.driver.DAGScheduler.job.allJobs,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.job.all
- mbean: 'metrics:name=*.driver.DAGScheduler.stage.failedStages,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.stages.failed
- mbean: 'metrics:name=*.driver.DAGScheduler.stage.runningStages,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.stages.running
- mbean: 'metrics:name=*.driver.DAGScheduler.stage.waitingStages,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.stages.waiting
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberAllExecutors,type=gauges'
attributes:
- attr: Value
field: driver.executors.all
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsDecommissionUnfinished,type=counters'
attributes:
- attr: Count
field: driver.executors.decommission_unfinished
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsExitedUnexpectedly,type=counters'
attributes:
- attr: Count
field: driver.executors.exited_unexpectedly
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsGracefullyDecommissioned,type=counters'
attributes:
- attr: Count
field: driver.executors.gracefully_decommissioned
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsKilledByDriver,type=counters'
attributes:
- attr: Count
field: driver.executors.killed_by_driver
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsPendingToRemove,type=gauges'
attributes:
- attr: Value
field: driver.executors.pending_to_remove
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsToAdd,type=gauges'
attributes:
- attr: Value
field: driver.executors.to_add
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors,type=gauges'
attributes:
- attr: Value
field: driver.executors.max_needed
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberTargetExecutors,type=gauges'
attributes:
- attr: Value
field: driver.executors.target
- mbean: 'metrics:name=*.driver.ExecutorMetrics.DirectPoolMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.memory.direct_pool
- mbean: 'metrics:name=*.driver.ExecutorMetrics.JVMHeapMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.memory.jvm.heap
- mbean: 'metrics:name=*.driver.ExecutorMetrics.JVMOffHeapMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.memory.jvm.off_heap
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MappedPoolMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.memory.mapped_pool
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MajorGCCount,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.gc.major.count
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MajorGCTime,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.gc.major.time
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MinorGCCount,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.gc.minor.count
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MinorGCTime,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.gc.minor.time
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OffHeapExecutionMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.off.execution
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OffHeapStorageMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.off.storage
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OffHeapUnifiedMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.off.unified
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OnHeapExecutionMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.on.execution
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OnHeapStorageMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.on.storage
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OnHeapUnifiedMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.on.unified
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreeJVMRSSMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.jvm.rss_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreeJVMVMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.jvm.v_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreeOtherRSSMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.other.rss_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreeOtherVMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.other.v_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreePythonRSSMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.python.rss_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreePythonVMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.python.v_memory
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.fileCacheHits,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.file_cache_hits
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.filesDiscovered,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.files_discovered
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.hiveClientCalls,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.hive_client_calls
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.parallelListingJobCount,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.parallel_listing_job.count
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.partitionsFetched,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.partitions_fetched
- mbean: 'metrics:name=*.driver.JVMCPU.jvmCpuTime,type=gauges'
attributes:
- attr: Value
field: driver.jvm.cpu.time
- mbean: 'metrics:name=*.driver.spark.streaming.*.states-rowsTotal,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.states.rows.total
- mbean: 'metrics:name=*.driver.spark.streaming.*.processingRate-total,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.processing_rate.total
- mbean: 'metrics:name=*.driver.spark.streaming.*.latency,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.latency
- mbean: 'metrics:name=*.driver.spark.streaming.*.states-usedBytes,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.states.used_bytes
- mbean: 'metrics:name=*.driver.spark.streaming.*.eventTime-watermark,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.event_time.watermark
- mbean: 'metrics:name=*.driver.spark.streaming.*.inputRate-total,type=gauge'
attributes:
- attr: Value
field: driver.spark.streaming.input_rate.total
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
description: Pipeline for parsing Apache Spark driver metrics.
processors:
- set:
field: ecs.version
value: '8.0.0'
- rename:
field: jolokia
target_field: apache_spark
ignore_missing: true
- set:
field: event.type
value: info
- set:
field: event.kind
value: metric
- set:
field: event.module
value: apache_spark
- script:
lang: painless
description: This script will add the name of application under key 'driver/executor.application_name' and executor id under 'apache_spark.metrics.executor.id'
if: ctx?.apache_spark?.metrics?.mbean?.contains("name=worker.") == false &&
ctx?.apache_spark?.metrics?.mbean?.contains("name=worker.") == false &&
ctx?.apache_spark?.metrics?.mbean?.contains("name=application.") == false
source: >-
def bean_name = ctx.apache_spark.metrics.mbean.toString().splitOnToken("=")[1];
def app_name = bean_name.splitOnToken(".")[0];
def executor_id = bean_name.splitOnToken(".")[1];
if (executor_id == "driver") {
ctx.apache_spark.metrics.driver.application = new HashMap();
ctx.apache_spark.metrics.driver.application_name = app_name;
} else {
ctx.apache_spark.metrics.executors.application = new HashMap();
ctx.apache_spark.metrics.executors.application_name = app_name;
ctx.apache_spark.metrics.executors.id = executor_id;
}
- remove:
field: apache_spark.metrics.mbean
ignore_failure: true
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'
12 changes: 12 additions & 0 deletions packages/apache_spark/data_stream/driver/fields/base-fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
- name: data_stream.dataset
type: constant_keyword
description: Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: Data stream namespace.
- name: data_stream.type
type: constant_keyword
description: Data stream type.
- name: '@timestamp'
type: date
description: Event timestamp.
12 changes: 12 additions & 0 deletions packages/apache_spark/data_stream/driver/fields/ecs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
- external: ecs
name: event.kind
- external: ecs
name: event.type
- external: ecs
name: ecs.version
- external: ecs
name: tags
- external: ecs
name: service.address
- external: ecs
name: service.type
Loading