Skip to content

Commit

Permalink
Merge pull request #18 from DavidKatz-il/refactor_processing
Browse files Browse the repository at this point in the history
refactor spark-processor service
  • Loading branch information
dereeno authored Feb 14, 2023
2 parents dc12159 + 8f13869 commit 1416d2f
Show file tree
Hide file tree
Showing 17 changed files with 568 additions and 262 deletions.
File renamed without changes.
16 changes: 16 additions & 0 deletions common/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json
from pathlib import Path


def read_json(file_path: str | Path) -> dict:
with open(file_path, "r", encoding="utf-8") as json_file:
return json.load(json_file)


def string_to_bytes(size):
units = {"b": 1, "k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4}
size = size.lower().strip()
num = float(size[:-1])
unit = size[-1]
value_in_bytes = int(num * units[unit])
return value_in_bytes
Empty file added configs/__init__.py
Empty file.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
events_config = {
{
"SparkListenerApplicationStart": {
"application_start_time": ["Timestamp"]
},
Expand All @@ -12,13 +12,13 @@
},
"SparkListenerTaskEnd": {
"executor_id": ["Task Info", "Executor ID"],
"executor_cpu_time": ["Task Metrics", "Executor CPU Time"],
"cpu_time": ["Task Metrics", "Executor CPU Time"],
"bytes_read": ["Task Metrics", "Input Metrics", "Bytes Read"],
"records_read": ["Task Metrics", "Input Metrics", "Records Read"],
"bytes_written": ["Task Metrics", "Output Metrics", "Bytes Written"],
"records_written": ["Task Metrics", "Output Metrics", "Records Written"],
"local_bytes_read": ["Task Metrics", "Shuffle Read Metrics", "Local Bytes Read"],
"remote_bytes_read": ["Task Metrics", "Shuffle Read Metrics", "Remote Bytes Read"],
"shuffle_local_bytes_read": ["Task Metrics", "Shuffle Read Metrics", "Local Bytes Read"],
"shuffle_remote_bytes_read": ["Task Metrics", "Shuffle Read Metrics", "Remote Bytes Read"],
"shuffle_bytes_written": ["Task Metrics", "Shuffle Write Metrics", "Shuffle Bytes Written"],
"jvm_memory": ["Task Executor Metrics", "ProcessTreeJVMRSSMemory"],
"python_memory": ["Task Executor Metrics", "ProcessTreePythonRSSMemory"],
Expand Down
2 changes: 1 addition & 1 deletion spark_endpoint/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from common.config import app_config
from common.logger import get_logger
from common.models import RawEvent, db_session
from common.db_models import RawEvent, db_session

logger = get_logger()

Expand Down
35 changes: 26 additions & 9 deletions spark_job_processor/app.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,50 @@
import json
import os

from common.config import app_config
from kafka import KafkaConsumer
from sqlalchemy import select

from spark_job_processor.processor import process_message
from common.config import app_config
from common.logger import get_logger
from common.db_models import RawEvent, SparkJobRun, db_session
from spark_job_processor.events_processor import EventsProcessor

TOPIC_NAME = "JOB_RUN_EVENT"

ENVIRONMENT = os.getenv('ENVIRONMENT', 'development')
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
CONFIG = app_config[ENVIRONMENT]
logger = get_logger()


def get_events_from_db(job_run_id: str):
stmt = select(RawEvent).where(RawEvent.job_run_id == job_run_id)
return db_session.scalars(stmt)


def insert_spark_job_run(data: dict):
spark_job_run = SparkJobRun(**data)
db_session.add(spark_job_run)
db_session.commit()


def run():
consumer = KafkaConsumer(
CONFIG.KAFKA_TOPIC_NAME,
bootstrap_servers=[CONFIG.KAFKA_BOOTSTRAP_SERVERS],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
logger.info('Starting to consume messages')
logger.info("Starting to consume messages")
events_processor = EventsProcessor()

for msg in consumer:
try:
logger.info('Received message: {}'.format(msg.value))
process_message(**msg.value)
logger.info(f"Received message: {msg.value}")
events = [dict(raw_event.event) for raw_event in get_events_from_db(msg.value['job_run_id'])]
application_data = events_processor.process_events(events, **msg.value)
insert_spark_job_run(application_data)
except Exception as e:
logger.error('Processor error: {}'.format(e), exc_info=True)
logger.error("Processor error: {}".format(e), exc_info=True)


if __name__ == '__main__':
if __name__ == "__main__":
run()
60 changes: 60 additions & 0 deletions spark_job_processor/events_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from spark_job_processor.events_resolver import EventsResolver


class EventsProcessor:
def __init__(self) -> None:
self.resolver: EventsResolver = EventsResolver()

def process_events(
self,
events: list[dict],
job_run_id,
job_id,
pipeline_id=None,
pipeline_run_id=None,
) -> dict:
application = self.resolver.events_resolver(events)
application.set_totals()
total_cpu_time_used = round(
application.executor_total.task_total.cpu_time / 1e9, 4
)

peak_memory_usage = 0
if application.memory_per_executor:
peak_memory_usage = round(
(
application.executor_total.task_total.total_memory
/ application.memory_per_executor
)
* 100,
4,
)

cpu_utilization = 0
if application.executor_total.cpu_uptime:
cpu_utilization = round(
(total_cpu_time_used / application.executor_total.cpu_uptime)
* 100,
4,
)

processed_data = {
"id": job_run_id,
"job_id": job_id,
"pipeline_id": pipeline_id,
"pipeline_run_id": pipeline_run_id,
"cpu_utilization": cpu_utilization,
"total_cpu_time_used": total_cpu_time_used,
"num_of_executors": len(application.executors),
"total_memory_per_executor": application.memory_per_executor,
"total_bytes_read": application.executor_total.task_total.bytes_read,
"total_shuffle_bytes_read": application.executor_total.task_total.total_shuffle_bytes_read,
"total_bytes_written": application.executor_total.task_total.bytes_written,
"total_shuffle_bytes_written": application.executor_total.task_total.shuffle_bytes_written,
"total_cpu_uptime": application.executor_total.cpu_uptime,
"peak_memory_usage": peak_memory_usage,
"total_cores_num": application.executor_total.num_cores,
"start_time": application.start_time,
"end_time": application.end_time,
}
return processed_data
96 changes: 96 additions & 0 deletions spark_job_processor/events_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from datetime import datetime
from pathlib import Path

from common.logger import get_logger
from common.utils import string_to_bytes
from spark_job_processor.events_resolver_base import EventsResolverBase
from spark_job_processor.models import Application, Task

logger = get_logger()


CONFIG_FILE_PATH = Path("configs/events_config.json")


class EventsResolver(EventsResolverBase):
def __init__(self) -> None:
super().__init__(path_events_config=CONFIG_FILE_PATH, event_field_name="Event")

def events_resolver(self, events: list[dict]) -> Application:
application = Application()
for event in events:
match event["Event"]:
case "SparkListenerApplicationStart":
app_start_timestamp = self.find_value_in_event(
event, "application_start_time"
)
application.start_time = datetime.utcfromtimestamp(
app_start_timestamp / 1000.0
)

case "SparkListenerApplicationEnd":
app_end_timestamp = self.find_value_in_event(
event, "application_end_time"
)
application.end_time = datetime.utcfromtimestamp(
app_end_timestamp / 1000.0
)

case "SparkListenerExecutorAdded":
executor_start_timestamp = self.find_value_in_event(
event, "executor_start_time"
)
executor_id = self.find_value_in_event(event, "executor_id")
_executor = application.executors[executor_id]
_executor.start_time = datetime.utcfromtimestamp(
executor_start_timestamp / 1000.0
)
_executor.num_cores = self.find_value_in_event(
event, "cores_num"
)

case "SparkListenerTaskEnd":
_task_data = {
field: self.find_value_in_event(event, field)
for field in self.events_config["SparkListenerTaskEnd"]
}
executor_id = _task_data.pop("executor_id")
_task = Task(**_task_data)
application.executors[executor_id].tasks.append(_task)

case "SparkListenerExecutorRemoved" | "SparkListenerExecutorCompleted":
executor_id = self.find_value_in_event(event, "executor_id")
executor_end_timestamp = self.find_value_in_event(
event, "executor_end_time"
)
application.executors[executor_id][
"end_time"
] = datetime.utcfromtimestamp(executor_end_timestamp / 1000.0)

case "SparkListenerEnvironmentUpdate":
try:
executor_memory = string_to_bytes(
self.find_value_in_event(event, "executor_memory")
)
memory_overhead_factor = float(
self.find_value_in_event(event, "memory_overhead_factor")
)
application.memory_per_executor = executor_memory * (
1 + memory_overhead_factor
)
except Exception as exc:
logger.error(
"Failed to parse executor memory from event: %s",
exc,
exc_info=True,
)

self.post_processing(application)
return application

@staticmethod
def post_processing(application: Application):
if application.end_time:
for executor in application.executors.values():
if executor.end_time is None:
executor.end_time = application.end_time
19 changes: 19 additions & 0 deletions spark_job_processor/events_resolver_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod
from pathlib import Path

from common.utils import read_json


class EventsResolverBase(ABC):
def __init__(self, path_events_config: str | Path, event_field_name: str) -> None:
self.events_config = read_json(path_events_config)
self.event_field_name = event_field_name

@abstractmethod
def events_resolver(self, events: list[dict]):
"""Resolving the events"""

def find_value_in_event(self, event: str, field_name: str):
for value in self.events_config[event[self.event_field_name]][field_name]:
event = event[value]
return event
Loading

0 comments on commit 1416d2f

Please sign in to comment.