Skip to content

Commit

Permalink
[apache_spark][driver] Add Apache Spark package with Driver data stre…
Browse files Browse the repository at this point in the history
…am (#2945)

* Add driver data stream for Apache Spark

* Add entry to CODEOWNERS

* Drop/group required fields as per review comments

* Resolve review comments and add system tests

* Modify system tests as per review comments

* Update docker compose version

* Update the Docker image reference to a specific SHA
  • Loading branch information
yug-rajani authored Apr 22, 2022
1 parent e01918f commit a2e1a3a
Show file tree
Hide file tree
Showing 12 changed files with 948 additions and 13 deletions.
8 changes: 8 additions & 0 deletions packages/apache_spark/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,11 @@ This is the `nodes` data stream.
{{event "nodes"}}

{{fields "nodes"}}

### Driver

This is the `driver` data stream.

{{event "driver"}}

{{fields "driver"}}
20 changes: 7 additions & 13 deletions packages/apache_spark/_dev/deploy/docker/application/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,16 @@
print("Usage: wordcount <file>", file=sys.stderr)
sys.exit(-1)

spark = SparkSession\
.builder\
.master(sys.argv[2])\
.appName("PythonWordCount")\
.getOrCreate()

spark = SparkSession.builder.master(sys.argv[2]).appName("PythonWordCount").getOrCreate()

t_end = time.time() + 60 * 15

# Run loop for 15 mins
while time.time() < t_end:
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))

spark.stop()
3 changes: 3 additions & 0 deletions packages/apache_spark/changelog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

- version: "0.1.0"
changes:
- description: Implement "driver" data stream
type: enhancement
link: https://github.com/elastic/integrations/pull/2945
- description: Implement "application" data stream
type: enhancement
link: https://github.com/elastic/integrations/pull/2941
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
vars: ~
data_stream:
vars:
hosts:
- http://apache-spark-main:{{Ports.[1]}}
path:
- /jolokia/?ignoreErrors=true&canonicalNaming=false
289 changes: 289 additions & 0 deletions packages/apache_spark/data_stream/driver/agent/stream/stream.yml.hbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
metricsets: ["jmx"]
namespace: "metrics"
hosts:
{{#each hosts}}
- {{this}}
{{/each}}
path: {{path}}
period: {{period}}
jmx.mappings:
- mbean: 'metrics:name=*.driver.appStatus.jobDuration,type=gauges'
attributes:
- attr: Value
field: driver.job_duration
- mbean: 'metrics:name=*.driver.appStatus.jobs.failedJobs,type=counters'
attributes:
- attr: Count
field: driver.jobs.failed
- mbean: 'metrics:name=*.driver.appStatus.jobs.succeededJobs,type=counters'
attributes:
- attr: Count
field: driver.jobs.succeeded
- mbean: 'metrics:name=*.driver.appStatus.stages.completedStages,type=counters'
attributes:
- attr: Count
field: driver.stages.completed_count
- mbean: 'metrics:name=*.driver.appStatus.stages.failedStages,type=counters'
attributes:
- attr: Count
field: driver.stages.failed_count
- mbean: 'metrics:name=*.driver.appStatus.stages.skippedStages,type=counters'
attributes:
- attr: Count
field: driver.stages.skipped_count
- mbean: 'metrics:name=*.driver.appStatus.tasks.blackListedExecutors,type=counters'
attributes:
- attr: Count
field: driver.tasks.executors.black_listed
- mbean: 'metrics:name=*.driver.appStatus.tasks.completedTasks,type=counters'
attributes:
- attr: Count
field: driver.tasks.completed
- mbean: 'metrics:name=*.driver.appStatus.tasks.excludedExecutors,type=counters'
attributes:
- attr: Count
field: driver.tasks.executors.excluded
- mbean: 'metrics:name=*.driver.appStatus.tasks.failedTasks,type=counters'
attributes:
- attr: Count
field: driver.tasks.failed
- mbean: 'metrics:name=*.driver.appStatus.tasks.killedTasks,type=counters'
attributes:
- attr: Count
field: driver.tasks.killed
- mbean: 'metrics:name=*.driver.appStatus.tasks.skippedTasks,type=counters'
attributes:
- attr: Count
field: driver.tasks.skipped
- mbean: 'metrics:name=*.driver.appStatus.tasks.unblackListedExecutors,type=counters'
attributes:
- attr: Count
field: driver.tasks.executors.unblack_listed
- mbean: 'metrics:name=*.driver.appStatus.tasks.unexcludedExecutors,type=counters'
attributes:
- attr: Count
field: driver.tasks.executors.unexcluded
- mbean: 'metrics:name=*.driver.BlockManager.disk.diskSpaceUsed_MB,type=gauges'
attributes:
- attr: Value
field: driver.disk.space_used
- mbean: 'metrics:name=*.driver.BlockManager.memory.maxMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.max_mem
- mbean: 'metrics:name=*.driver.BlockManager.memory.maxOffHeapMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.off_heap.max
- mbean: 'metrics:name=*.driver.BlockManager.memory.maxOnHeapMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.on_heap.max
- mbean: 'metrics:name=*.driver.BlockManager.memory.memUsed_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.used
- mbean: 'metrics:name=*.driver.BlockManager.memory.offHeapMemUsed_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.off_heap.used
- mbean: 'metrics:name=*.driver.BlockManager.memory.onHeapMemUsed_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.on_heap.used
- mbean: 'metrics:name=*.driver.BlockManager.memory.remainingMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.remaining
- mbean: 'metrics:name=*.driver.BlockManager.memory.remainingOffHeapMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.off_heap.remaining
- mbean: 'metrics:name=*.driver.BlockManager.memory.remainingOnHeapMem_MB,type=gauges'
attributes:
- attr: Value
field: driver.memory.on_heap.remaining
- mbean: 'metrics:name=*.driver.DAGScheduler.job.activeJobs,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.job.active
- mbean: 'metrics:name=*.driver.DAGScheduler.job.allJobs,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.job.all
- mbean: 'metrics:name=*.driver.DAGScheduler.stage.failedStages,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.stages.failed
- mbean: 'metrics:name=*.driver.DAGScheduler.stage.runningStages,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.stages.running
- mbean: 'metrics:name=*.driver.DAGScheduler.stage.waitingStages,type=gauges'
attributes:
- attr: Value
field: driver.dag_scheduler.stages.waiting
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberAllExecutors,type=gauges'
attributes:
- attr: Value
field: driver.executors.all
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsDecommissionUnfinished,type=counters'
attributes:
- attr: Count
field: driver.executors.decommission_unfinished
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsExitedUnexpectedly,type=counters'
attributes:
- attr: Count
field: driver.executors.exited_unexpectedly
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsGracefullyDecommissioned,type=counters'
attributes:
- attr: Count
field: driver.executors.gracefully_decommissioned
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsKilledByDriver,type=counters'
attributes:
- attr: Count
field: driver.executors.killed_by_driver
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsPendingToRemove,type=gauges'
attributes:
- attr: Value
field: driver.executors.pending_to_remove
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberExecutorsToAdd,type=gauges'
attributes:
- attr: Value
field: driver.executors.to_add
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors,type=gauges'
attributes:
- attr: Value
field: driver.executors.max_needed
- mbean: 'metrics:name=*.driver.ExecutorAllocationManager.executors.numberTargetExecutors,type=gauges'
attributes:
- attr: Value
field: driver.executors.target
- mbean: 'metrics:name=*.driver.ExecutorMetrics.DirectPoolMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.memory.direct_pool
- mbean: 'metrics:name=*.driver.ExecutorMetrics.JVMHeapMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.memory.jvm.heap
- mbean: 'metrics:name=*.driver.ExecutorMetrics.JVMOffHeapMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.memory.jvm.off_heap
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MappedPoolMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.memory.mapped_pool
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MajorGCCount,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.gc.major.count
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MajorGCTime,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.gc.major.time
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MinorGCCount,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.gc.minor.count
- mbean: 'metrics:name=*.driver.ExecutorMetrics.MinorGCTime,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.gc.minor.time
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OffHeapExecutionMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.off.execution
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OffHeapStorageMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.off.storage
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OffHeapUnifiedMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.off.unified
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OnHeapExecutionMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.on.execution
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OnHeapStorageMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.on.storage
- mbean: 'metrics:name=*.driver.ExecutorMetrics.OnHeapUnifiedMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.heap_memory.on.unified
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreeJVMRSSMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.jvm.rss_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreeJVMVMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.jvm.v_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreeOtherRSSMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.other.rss_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreeOtherVMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.other.v_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreePythonRSSMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.python.rss_memory
- mbean: 'metrics:name=*.driver.ExecutorMetrics.ProcessTreePythonVMemory,type=gauges'
attributes:
- attr: Value
field: driver.executor_metrics.process_tree.python.v_memory
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.fileCacheHits,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.file_cache_hits
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.filesDiscovered,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.files_discovered
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.hiveClientCalls,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.hive_client_calls
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.parallelListingJobCount,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.parallel_listing_job.count
- mbean: 'metrics:name=*.driver.HiveExternalCatalog.partitionsFetched,type=counters'
attributes:
- attr: Count
field: driver.hive_external_catalog.partitions_fetched
- mbean: 'metrics:name=*.driver.JVMCPU.jvmCpuTime,type=gauges'
attributes:
- attr: Value
field: driver.jvm.cpu.time
- mbean: 'metrics:name=*.driver.spark.streaming.*.states-rowsTotal,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.states.rows.total
- mbean: 'metrics:name=*.driver.spark.streaming.*.processingRate-total,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.processing_rate.total
- mbean: 'metrics:name=*.driver.spark.streaming.*.latency,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.latency
- mbean: 'metrics:name=*.driver.spark.streaming.*.states-usedBytes,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.states.used_bytes
- mbean: 'metrics:name=*.driver.spark.streaming.*.eventTime-watermark,type=gauges'
attributes:
- attr: Value
field: driver.spark.streaming.event_time.watermark
- mbean: 'metrics:name=*.driver.spark.streaming.*.inputRate-total,type=gauge'
attributes:
- attr: Value
field: driver.spark.streaming.input_rate.total
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
description: Pipeline for parsing Apache Spark driver metrics.
processors:
- set:
field: ecs.version
value: '8.1.0'
- rename:
field: jolokia.metrics
target_field: apache_spark
ignore_missing: true
- set:
field: event.type
value: info
- set:
field: event.kind
value: metric
- set:
field: event.module
value: apache_spark
- script:
lang: painless
description: This script will add the name of application under key 'driver/executor.application_name' and executor id under 'apache_spark.executor.id'
if: ctx?.apache_spark?.mbean?.contains("name=worker.") == false &&
ctx?.apache_spark?.mbean?.contains("name=worker.") == false &&
ctx?.apache_spark?.mbean?.contains("name=application.") == false
source: >-
def bean_name = ctx.apache_spark.mbean.toString().splitOnToken("=")[1];
def app_name = bean_name.splitOnToken(".")[0];
def executor_id = bean_name.splitOnToken(".")[1];
if (executor_id == "driver") {
ctx.apache_spark.driver.application_name = app_name;
} else {
ctx.apache_spark.executors.application_name = app_name;
ctx.apache_spark.executors.id = executor_id;
}
- remove:
field:
- apache_spark.mbean
- jolokia
ignore_failure: true
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'
Loading

0 comments on commit a2e1a3a

Please sign in to comment.