Skip to content

Commit

Permalink
Merge pull request #16 from jmfiola/main
Browse files Browse the repository at this point in the history
Adding tagging based on request context
  • Loading branch information
pjcalvo authored Aug 4, 2022
2 parents 555c9e8 + 012450f commit eaf6681
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 21 deletions.
2 changes: 1 addition & 1 deletion example/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# locust-influxdb-boilerplate

LocustIO base project with a custom influxDB listener.
LocustIO base project with a custom influxDB listener. This package requires Locust v1.5.0 or greater.

## Instructions

Expand Down
2 changes: 1 addition & 1 deletion example/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
locust_influxdb_listener == 0.0.5
influxdb==5.3.1
locust==1.4.1
locust==1.5.0
34 changes: 16 additions & 18 deletions locust_influxdb_listener/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def __init__(
events = env.events

# requests
events.request_success.add_listener(self.request_success)
events.request_failure.add_listener(self.request_failure)
events.request.add_listener(self.request)
# events
events.test_stop.add_listener(self.test_stop)
events.user_error.add_listener(self.user_error)
Expand All @@ -98,11 +97,11 @@ def __init__(
# complete
atexit.register(self.quitting)

def request_success(self, request_type, name, response_time, response_length, **_kwargs) -> None:
self.__listen_for_requests_events(self.node_id, 'locust_requests', request_type, name, response_time, response_length, True, None)

def request_failure(self, request_type, name, response_time, response_length, exception, **_kwargs) -> None:
self.__listen_for_requests_events(self.node_id, 'locust_requests', request_type, name, response_time, response_length, False, exception)
def request(self, request_type, name, response_time, response_length, response,
context, exception, start_time=None, url=None) -> None:
self.__listen_for_requests_events(
self.node_id, 'locust_requests', request_type, name, response_time,
response_length, response, context, exception, start_time, url)

def spawning_complete(self, user_count) -> None:
self.__register_event(self.node_id, user_count, 'spawning_complete')
Expand All @@ -125,7 +124,6 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs)
"""
Persist locust event such as hatching started or stopped to influxdb.
Append user_count in case that it exists
:param node_id: The id of the node reporting the event.
:param event: The event name or description.
"""
Expand All @@ -143,23 +141,28 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs)
self.cache.append(point)

def __listen_for_requests_events(self, node_id, measurement, request_type, name,
response_time, response_length, success, exception) -> None:
response_time, response_length, response,
context, exception, start_time, url) -> None:
"""
Persist request information to influxdb.
:param node_id: The id of the node reporting the event.
:param measurement: The measurement where to save this point.
:param success: Flag the info to as successful request or not
"""

time = datetime.utcnow()
was_successful = True
if response:
# override with response code
was_successful = 199 < response.status_code < 400
tags = {
'node_id': node_id,
'request_type': request_type,
'name': name,
'success': success,
'success': was_successful,
'exception': repr(exception),
}
if context and type(context) == dict:
tags.update(context)

if isinstance(exception, HTTPError):
tags['code'] = exception.response.status_code
Expand All @@ -175,7 +178,6 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name,
def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None:
"""
Persist locust errors to InfluxDB.
:param node_id: The id of the node reporting the error.
:return: None
"""
Expand All @@ -197,7 +199,6 @@ def __listen_for_locust_errors(self, node_id, user_instance, exception: Exceptio
def __flush_cached_points_worker(self) -> None:
"""
Background job that puts the points into the cache to be flushed according tot he interval defined.
:param influxdb_client:
:param interval:
:return: None
Expand All @@ -210,7 +211,6 @@ def __flush_cached_points_worker(self) -> None:
def __make_data_point(self, measurement: str, tags: dict, fields: dict, time: datetime) -> dict:
"""
Create a list with a single point to be saved to influxdb.
:param measurement: The measurement where to save this point.
:param tags: Dictionary of tags to be saved in the measurement.
:param fields: Dictionary of field to be saved to measurement.
Expand All @@ -228,7 +228,6 @@ def last_flush_on_quitting(self):
def __flush_points(self, influxdb_client: InfluxDBClient) -> None:
"""
Write the cached data points to influxdb
:param influxdb_client: An instance of InfluxDBClient
:return: None
"""
Expand All @@ -239,5 +238,4 @@ def __flush_points(self, influxdb_client: InfluxDBClient) -> None:
if not success:
log.error('Failed to write points to influxdb.')
# If failed for any reason put back into the beginning of cache
self.cache.insert(0, to_be_flushed)

self.cache.insert(0, to_be_flushed)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"Operating System :: OS Independent",
],
install_requires=[
'locust>=1.1.1',
'locust>=1.5.0',
'influxdb>=5.2.2',
],
python_requires='>=3.6',
Expand Down

0 comments on commit eaf6681

Please sign in to comment.