Skip to content

Commit

Permalink
Merge pull request #986 from TeachMeTW/feature/stats_timing
Browse files Browse the repository at this point in the history
Stats Timing
  • Loading branch information
shankari authored Oct 18, 2024
2 parents 593e61c + 0b39803 commit 1551a30
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 3 deletions.
4 changes: 4 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ def _getData2Wrapper():
"stats/server_api_error": "statsevent",
# pipeline stage time, measured on the server
"stats/pipeline_time": "statsevent",
# dashboard time, measured on the server
"stats/dashboard_time": "statsevent",
# intended to log the occurrence of errors in the pipeline
"stats/pipeline_error": "statsevent",
# intended to log the occurrence of errors in the dashboard
"stats/dashboard_error": "statsevent",
# time for various client operations, measured on the client
# comparison with the server_api_time can help debug networking issues
"stats/client_time": "statsevent",
Expand Down
2 changes: 1 addition & 1 deletion emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
eaum.match_incoming_user_inputs(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name,
time.time(), uct.elapsed)
time.time(), uit.elapsed)

# Hack until we delete these spurious entries
# https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868
Expand Down
39 changes: 37 additions & 2 deletions emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import time
# Standard imports
from future import standard_library
standard_library.install_aliases()
from builtins import *
import logging
import time

# Our imports
import emission.storage.timeseries.abstract_timeseries as esta
import emission.core.wrapper.entry as ecwe
import emission.core.timer as ec_timer


# metadata format is
Expand Down Expand Up @@ -46,3 +46,38 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading):
new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data)
return esta.TimeSeries.get_time_series(user_id).insert(new_entry)

def store_dashboard_time(code_fragment_name: str, timer: ec_timer.Timer):
"""
Stores statistics about execution times in dashboard code using a Timer object.
Both of our current dashboards generate _aggregate_ metrics. We do not work at a per-user level
in the Python dashboards, so we pass in only the name of the step being instrumented and the timing information.
:param code_fragment_name (str): The name of the function or code fragment being timed.
:param timer (ec_timer.Timer): The Timer object that records the execution duration.
"""
# Get the current timestamp in seconds since epoch
timestamp = time.time()

# Call the existing store_stats_entry function
store_stats_entry(
user_id=None, # No user ID as per current dashboard design
metadata_key="stats/dashboard_time",
name=code_fragment_name,
ts=timestamp,
reading=timer.elapsed_ms
)


def store_dashboard_error(code_fragment_name: str, timer: ec_timer.Timer):
# Get the current timestamp in seconds since epoch
timestamp = time.time()

# Call the existing store_stats_entry function
store_stats_entry(
user_id=None, # No user ID as per current dashboard design
metadata_key="stats/dashboard_error",
name=code_fragment_name,
ts=timestamp,
reading=timer.elapsed_ms
)

2 changes: 2 additions & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(self, user_id):
"stats/server_api_time": self.timeseries_db,
"stats/server_api_error": self.timeseries_db,
"stats/pipeline_time": self.timeseries_db,
"stats/dashboard_time": self.timeseries_db,
"stats/dashboard_error": self.timeseries_db,
"stats/pipeline_error": self.timeseries_db,
"stats/client_time": self.timeseries_db,
"stats/client_nav_event": self.timeseries_db,
Expand Down
157 changes: 157 additions & 0 deletions emission/tests/storageTests/TestStatsQueries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import unittest
import logging
import time

import emission.core.get_database as edb
import emission.core.timer as ect
import emission.storage.decorations.stats_queries as esdsq
import emission.storage.timeseries.abstract_timeseries as esta
import emission.tests.common as etc


class TestFunctionTiming(unittest.TestCase):
@classmethod
def setUpClass(self):
"""
Set up resources before any tests are run.
"""
self.timeseries_db = esta.TimeSeries.get_time_series(None)

def tearDown(self):
"""
Clean up relevant database entries after each test to maintain isolation.
"""
keys_to_clean = ["stats/dashboard_time", "stats/dashboard_error"]
edb.get_timeseries_db().delete_many(
{"metadata.key": {"$in": keys_to_clean}}
)
logging.debug(f"After test, cleared DB entries for {keys_to_clean}")

def verify_stats_entries(self, expected_entries: list[tuple[str, str, float]]):
"""
Verifies that each of the expected entries, in the form of (key, name, elapsed_ms),
are stored correctly in the database.
:param expected_entries: A list of tuples containing (key, name, expected_elapsed_ms).
"""
logging.debug(f"Ensuring {len(expected_entries)} entries exist in DB.")
# Prepare keys for database query based on expected entries.
key_list = [key for (key, _, _) in expected_entries]
# Fetch matching entries from the timeseries database.
stored_entrys = list(self.timeseries_db.find_entries(key_list))
# Check if the number of retrieved entries matches expectations.
self.assertEqual(len(stored_entrys), len(expected_entries))

# Validate each stored entry against the expected data.
for i in range(len(expected_entries)):
stored_entry = stored_entrys[i]
expected_key, expected_name, expected_reading = expected_entries[i]
logging.debug(f"Comparing expected {expected_entries[i]} " +
f"with stored {stored_entry['metadata']['key']} {stored_entry['data']}")
# Verify the key matches.
self.assertEqual(stored_entry['metadata']['key'], expected_key)
# Verify the name matches.
self.assertEqual(stored_entry['data']['name'], expected_name)
# Verify the elapsed time is as expected.
self.assertEqual(stored_entry['data']['reading'], expected_reading)

def test_single_function_timing(self):
"""
Test the execution and timing of a single function.
This test measures how long 'sample_function' takes to execute and verifies
that the timing information is correctly stored in the database.
"""
def sample_function():
logging.debug("Executing sample_function")
time.sleep(2) # Simulate processing time by sleeping for 2 seconds.
return True

# Measure the execution time of 'sample_function'.
with ect.Timer() as timer:
sample_function()

# Record the timing data in the database.
esdsq.store_dashboard_time("sample_function", timer)

# Confirm the timing was recorded correctly.
self.verify_stats_entries([
("stats/dashboard_time", "sample_function", timer.elapsed_ms)
])

def test_multiple_functions_timing(self):
"""
Test the execution and timing of two functions.
This test records and validates the time taken for:
(i) function_one,
(ii) function_two, and
(iii) both functions together.
"""
def function_one():
# Simulate processing time by sleeping for 1 second.
return time.sleep(1)

def function_two():
# Simulate processing time by sleeping for 1.5 seconds.
return time.sleep(1.5)

# Track the total time for both functions.
with ect.Timer() as timer_both:
# Time 'function_one' execution.
with ect.Timer() as timer_one:
function_one()
# Record 'function_one' timing.
esdsq.store_dashboard_time("function_one", timer_one)

# Time 'function_two' execution.
with ect.Timer() as timer_two:
function_two()
# Record 'function_two' timing.
esdsq.store_dashboard_time("function_two", timer_two)

# Record the combined timing for both functions.
esdsq.store_dashboard_time("functions_one_and_two", timer_both)

# Validate individual and combined timings.
self.assertAlmostEqual(timer_one.elapsed_ms, 1000, delta=100)
self.assertAlmostEqual(timer_two.elapsed_ms, 1500, delta=100)
self.assertAlmostEqual(timer_both.elapsed_ms, 2500, delta=100)

# Ensure all timing entries are correctly stored.
self.verify_stats_entries([
("stats/dashboard_time", "function_one", timer_one.elapsed_ms),
("stats/dashboard_time", "function_two", timer_two.elapsed_ms),
("stats/dashboard_time", "functions_one_and_two", timer_both.elapsed_ms)
])

def test_faulty_function_timing(self):
"""
Test the execution and timing of a faulty function that is expected to raise an exception.
This test ensures that even when a function fails, the timing information is correctly
recorded as an error in the database.
"""
def faulty_function():
logging.debug("Executing faulty_function")
time.sleep(1) # Simulate processing time before failure.
raise ValueError("Simulated error in faulty_function")

try:
# Attempt to execute and time the faulty function.
with ect.Timer() as timer:
faulty_function()
except ValueError as e:
# Handle the expected exception and record the timing as an error.
logging.info(f"Caught expected error: {e}")
esdsq.store_dashboard_error('faulty_function', timer)
# Continue after handling the exception.
pass

# Verify that the error timing was recorded.
self.verify_stats_entries([
("stats/dashboard_error", "faulty_function", timer.elapsed_ms)
])



if __name__ == '__main__':
etc.configLogging()
unittest.main()

0 comments on commit 1551a30

Please sign in to comment.