From cc1ebcce1e8ed5e18cf025b909a31dbf88240b5c Mon Sep 17 00:00:00 2001 From: "jacob.fiola" Date: Mon, 18 Jul 2022 15:44:29 -0600 Subject: [PATCH 1/6] adding tagging based on request context --- locust_influxdb_listener/__init__.py | 87 +++++++++++++++------------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index 357a4d4..9f868aa 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -17,17 +17,18 @@ class InfluxDBSettings: """ Store influxdb settings """ + def __init__( - self, - influx_host: str = 'localhost', - influx_port: int = 8086, - user: str = 'admin', - pwd: str = 'pass', - database: str = 'default', - interval_ms: int = 1000, - ssl: bool = False, - verify_ssl: bool = False, - create_database: bool = False + self, + influx_host: str = 'localhost', + influx_port: int = 8086, + user: str = 'admin', + pwd: str = 'pass', + database: str = 'default', + interval_ms: int = 1000, + ssl: bool = False, + verify_ssl: bool = False, + create_database: bool = False ): self.influx_host = influx_host self.influx_port = influx_port @@ -38,17 +39,17 @@ def __init__( self.ssl = ssl self.verify_ssl = verify_ssl self.create_database = create_database - -class InfluxDBListener: + +class InfluxDBListener: """ Events listener that writes locust events to the given influxdb connection """ - + def __init__( - self, - env: locust.env.Environment, - influxDbSettings: InfluxDBSettings + self, + env: locust.env.Environment, + influxDbSettings: InfluxDBSettings ): # flush related attributes @@ -56,7 +57,7 @@ def __init__( self.cache = [] self.stop_flag = False self.interval_ms = influxDbSettings.interval_ms - # influxdb settings + # influxdb settings try: self.influxdb_client = InfluxDBClient( host=influxDbSettings.influx_host, @@ -84,13 +85,12 @@ def __init__( # start background event to push data to influx self.flush_worker = gevent.spawn(self.__flush_cached_points_worker) self.test_start(0) - + events = env.events - + # requests - events.request_success.add_listener(self.request_success) - events.request_failure.add_listener(self.request_failure) - # events + events.request.add_listener(self.request) + # events events.test_stop.add_listener(self.test_stop) events.user_error.add_listener(self.user_error) events.spawning_complete.add_listener(self.spawning_complete) @@ -98,11 +98,11 @@ def __init__( # complete atexit.register(self.quitting) - def request_success(self, request_type, name, response_time, response_length, **_kwargs) -> None: - self.__listen_for_requests_events(self.node_id, 'locust_requests', request_type, name, response_time, response_length, True, None) - - def request_failure(self, request_type, name, response_time, response_length, exception, **_kwargs) -> None: - self.__listen_for_requests_events(self.node_id, 'locust_requests', request_type, name, response_time, response_length, False, exception) + def request(self, request_type, name, response_time, response_length, response, + context, exception, start_time=None, url=None) -> None: + self.__listen_for_requests_events( + self.node_id, 'locust_requests', request_type, name, response_time, + response_length, response, context, exception, start_time, url) def spawning_complete(self, user_count) -> None: self.__register_event(self.node_id, user_count, 'spawning_complete') @@ -113,7 +113,7 @@ def test_start(self, user_count) -> None: def test_stop(self, user_count=None, environment=None) -> None: self.__register_event(self.node_id, 0, 'test_stopped') - + def user_error(self, user_instance, exception, tb, **_kwargs) -> None: self.__listen_for_locust_errors(self.node_id, user_instance, exception, tb) @@ -121,7 +121,8 @@ def quitting(self, **_kwargs) -> None: self.__register_event(self.node_id, 0, 'quitting') self.last_flush_on_quitting() - def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) -> None: + def __register_event(self, node_id: str, user_count: int, event: str, + **_kwargs) -> None: """ Persist locust event such as hatching started or stopped to influxdb. Append user_count in case that it exists @@ -143,23 +144,29 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) self.cache.append(point) def __listen_for_requests_events(self, node_id, measurement, request_type, name, - response_time, response_length, success, exception) -> None: + response_time, response_length, response, + context, exception, start_time, url) -> None: """ Persist request information to influxdb. :param node_id: The id of the node reporting the event. :param measurement: The measurement where to save this point. - :param success: Flag the info to as successful request or not """ time = datetime.utcnow() + if response: + was_successful = 199 < response.status_code < 400 + else: + was_successful = True tags = { 'node_id': node_id, 'request_type': request_type, 'name': name, - 'success': success, + 'success': was_successful, 'exception': repr(exception), } + if context and type(context) == dict: + tags.update(context) if isinstance(exception, HTTPError): tags['code'] = exception.response.status_code @@ -167,12 +174,14 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name, fields = { 'response_time': response_time, 'response_length': response_length, - 'counter': self.env.stats.num_requests, # TODO: Review the need of this field + 'counter': self.env.stats.num_requests, + # TODO: Review the need of this field } point = self.__make_data_point(measurement, tags, fields, time) self.cache.append(point) - def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None: + def __listen_for_locust_errors(self, node_id, user_instance, + exception: Exception = None, tb=None) -> None: """ Persist locust errors to InfluxDB. @@ -193,7 +202,6 @@ def __listen_for_locust_errors(self, node_id, user_instance, exception: Exceptio point = self.__make_data_point('locust_exceptions', tags, fields, time) self.cache.append(point) - def __flush_cached_points_worker(self) -> None: """ Background job that puts the points into the cache to be flushed according tot he interval defined. @@ -207,7 +215,8 @@ def __flush_cached_points_worker(self) -> None: self.__flush_points(self.influxdb_client) gevent.sleep(self.interval_ms / 1000) - def __make_data_point(self, measurement: str, tags: dict, fields: dict, time: datetime) -> dict: + def __make_data_point(self, measurement: str, tags: dict, fields: dict, + time: datetime) -> dict: """ Create a list with a single point to be saved to influxdb. @@ -216,15 +225,14 @@ def __make_data_point(self, measurement: str, tags: dict, fields: dict, time: da :param fields: Dictionary of field to be saved to measurement. :param time: The time os this point. """ - return {"measurement": measurement, "tags": tags, "time": time, "fields": fields} - + return {"measurement": measurement, "tags": tags, "time": time, + "fields": fields} def last_flush_on_quitting(self): self.stop_flag = True self.flush_worker.join() self.__flush_points(self.influxdb_client) - def __flush_points(self, influxdb_client: InfluxDBClient) -> None: """ Write the cached data points to influxdb @@ -240,4 +248,3 @@ def __flush_points(self, influxdb_client: InfluxDBClient) -> None: log.error('Failed to write points to influxdb.') # If failed for any reason put back into the beginning of cache self.cache.insert(0, to_be_flushed) - From 8b3e8195178049b1f1b71654b565699c199faedd Mon Sep 17 00:00:00 2001 From: "jacob.fiola" Date: Mon, 18 Jul 2022 16:10:32 -0600 Subject: [PATCH 2/6] update versions --- example/requirements.txt | 4 ++-- setup.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/example/requirements.txt b/example/requirements.txt index 908f0d0..bf313c9 100644 --- a/example/requirements.txt +++ b/example/requirements.txt @@ -1,3 +1,3 @@ -locust_influxdb_listener == 0.0.5 +locust_influxdb_listener == 0.0.6 influxdb==5.3.1 -locust==1.4.1 \ No newline at end of file +locust==2.8.6 \ No newline at end of file diff --git a/setup.py b/setup.py index 561f767..bdc47b8 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="locust_influxdb_listener", # Replace with your own username - version="0.0.5", + version="0.0.6", author="Pablo Calvo", author_email="pjcalvov@gmail.com", description="Locust.io 1.X influxdb listener", @@ -19,7 +19,7 @@ "Operating System :: OS Independent", ], install_requires=[ - 'locust>=1.1.1', + 'locust>=2.8.0', 'influxdb>=5.2.2', ], python_requires='>=3.6', From 6949ff2d2bcf799e1d6c8165470dc934de6d2e96 Mon Sep 17 00:00:00 2001 From: "jacob.fiola" Date: Wed, 20 Jul 2022 09:14:45 -0600 Subject: [PATCH 3/6] address comments --- example/requirements.txt | 2 +- locust_influxdb_listener/__init__.py | 12 +++--------- setup.py | 2 +- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/example/requirements.txt b/example/requirements.txt index bf313c9..24c504d 100644 --- a/example/requirements.txt +++ b/example/requirements.txt @@ -1,3 +1,3 @@ -locust_influxdb_listener == 0.0.6 +locust_influxdb_listener == 0.0.5 influxdb==5.3.1 locust==2.8.6 \ No newline at end of file diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index 9f868aa..6a6204e 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -126,7 +126,6 @@ def __register_event(self, node_id: str, user_count: int, event: str, """ Persist locust event such as hatching started or stopped to influxdb. Append user_count in case that it exists - :param node_id: The id of the node reporting the event. :param event: The event name or description. """ @@ -148,16 +147,15 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name, context, exception, start_time, url) -> None: """ Persist request information to influxdb. - :param node_id: The id of the node reporting the event. :param measurement: The measurement where to save this point. """ time = datetime.utcnow() + was_successful = True if response: + # override with response code was_successful = 199 < response.status_code < 400 - else: - was_successful = True tags = { 'node_id': node_id, 'request_type': request_type, @@ -184,7 +182,6 @@ def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None: """ Persist locust errors to InfluxDB. - :param node_id: The id of the node reporting the error. :return: None """ @@ -205,7 +202,6 @@ def __listen_for_locust_errors(self, node_id, user_instance, def __flush_cached_points_worker(self) -> None: """ Background job that puts the points into the cache to be flushed according tot he interval defined. - :param influxdb_client: :param interval: :return: None @@ -219,7 +215,6 @@ def __make_data_point(self, measurement: str, tags: dict, fields: dict, time: datetime) -> dict: """ Create a list with a single point to be saved to influxdb. - :param measurement: The measurement where to save this point. :param tags: Dictionary of tags to be saved in the measurement. :param fields: Dictionary of field to be saved to measurement. @@ -236,7 +231,6 @@ def last_flush_on_quitting(self): def __flush_points(self, influxdb_client: InfluxDBClient) -> None: """ Write the cached data points to influxdb - :param influxdb_client: An instance of InfluxDBClient :return: None """ @@ -247,4 +241,4 @@ def __flush_points(self, influxdb_client: InfluxDBClient) -> None: if not success: log.error('Failed to write points to influxdb.') # If failed for any reason put back into the beginning of cache - self.cache.insert(0, to_be_flushed) + self.cache.insert(0, to_be_flushed) \ No newline at end of file diff --git a/setup.py b/setup.py index bdc47b8..de46957 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="locust_influxdb_listener", # Replace with your own username - version="0.0.6", + version="0.0.5", author="Pablo Calvo", author_email="pjcalvov@gmail.com", description="Locust.io 1.X influxdb listener", From 53be6e0cb1c768b093c7ea35ebe212460307c51d Mon Sep 17 00:00:00 2001 From: "jacob.fiola" Date: Wed, 20 Jul 2022 09:18:39 -0600 Subject: [PATCH 4/6] remove confusing formatting --- locust_influxdb_listener/__init__.py | 61 +++++++++++++--------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/locust_influxdb_listener/__init__.py b/locust_influxdb_listener/__init__.py index 6a6204e..0086bbe 100644 --- a/locust_influxdb_listener/__init__.py +++ b/locust_influxdb_listener/__init__.py @@ -17,18 +17,17 @@ class InfluxDBSettings: """ Store influxdb settings """ - def __init__( - self, - influx_host: str = 'localhost', - influx_port: int = 8086, - user: str = 'admin', - pwd: str = 'pass', - database: str = 'default', - interval_ms: int = 1000, - ssl: bool = False, - verify_ssl: bool = False, - create_database: bool = False + self, + influx_host: str = 'localhost', + influx_port: int = 8086, + user: str = 'admin', + pwd: str = 'pass', + database: str = 'default', + interval_ms: int = 1000, + ssl: bool = False, + verify_ssl: bool = False, + create_database: bool = False ): self.influx_host = influx_host self.influx_port = influx_port @@ -39,17 +38,17 @@ def __init__( self.ssl = ssl self.verify_ssl = verify_ssl self.create_database = create_database + - -class InfluxDBListener: +class InfluxDBListener: """ Events listener that writes locust events to the given influxdb connection """ - + def __init__( - self, - env: locust.env.Environment, - influxDbSettings: InfluxDBSettings + self, + env: locust.env.Environment, + influxDbSettings: InfluxDBSettings ): # flush related attributes @@ -57,7 +56,7 @@ def __init__( self.cache = [] self.stop_flag = False self.interval_ms = influxDbSettings.interval_ms - # influxdb settings + # influxdb settings try: self.influxdb_client = InfluxDBClient( host=influxDbSettings.influx_host, @@ -85,12 +84,12 @@ def __init__( # start background event to push data to influx self.flush_worker = gevent.spawn(self.__flush_cached_points_worker) self.test_start(0) - + events = env.events - + # requests events.request.add_listener(self.request) - # events + # events events.test_stop.add_listener(self.test_stop) events.user_error.add_listener(self.user_error) events.spawning_complete.add_listener(self.spawning_complete) @@ -113,7 +112,7 @@ def test_start(self, user_count) -> None: def test_stop(self, user_count=None, environment=None) -> None: self.__register_event(self.node_id, 0, 'test_stopped') - + def user_error(self, user_instance, exception, tb, **_kwargs) -> None: self.__listen_for_locust_errors(self.node_id, user_instance, exception, tb) @@ -121,8 +120,7 @@ def quitting(self, **_kwargs) -> None: self.__register_event(self.node_id, 0, 'quitting') self.last_flush_on_quitting() - def __register_event(self, node_id: str, user_count: int, event: str, - **_kwargs) -> None: + def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs) -> None: """ Persist locust event such as hatching started or stopped to influxdb. Append user_count in case that it exists @@ -172,14 +170,12 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name, fields = { 'response_time': response_time, 'response_length': response_length, - 'counter': self.env.stats.num_requests, - # TODO: Review the need of this field + 'counter': self.env.stats.num_requests, # TODO: Review the need of this field } point = self.__make_data_point(measurement, tags, fields, time) self.cache.append(point) - def __listen_for_locust_errors(self, node_id, user_instance, - exception: Exception = None, tb=None) -> None: + def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None: """ Persist locust errors to InfluxDB. :param node_id: The id of the node reporting the error. @@ -199,6 +195,7 @@ def __listen_for_locust_errors(self, node_id, user_instance, point = self.__make_data_point('locust_exceptions', tags, fields, time) self.cache.append(point) + def __flush_cached_points_worker(self) -> None: """ Background job that puts the points into the cache to be flushed according tot he interval defined. @@ -211,8 +208,7 @@ def __flush_cached_points_worker(self) -> None: self.__flush_points(self.influxdb_client) gevent.sleep(self.interval_ms / 1000) - def __make_data_point(self, measurement: str, tags: dict, fields: dict, - time: datetime) -> dict: + def __make_data_point(self, measurement: str, tags: dict, fields: dict, time: datetime) -> dict: """ Create a list with a single point to be saved to influxdb. :param measurement: The measurement where to save this point. @@ -220,14 +216,15 @@ def __make_data_point(self, measurement: str, tags: dict, fields: dict, :param fields: Dictionary of field to be saved to measurement. :param time: The time os this point. """ - return {"measurement": measurement, "tags": tags, "time": time, - "fields": fields} + return {"measurement": measurement, "tags": tags, "time": time, "fields": fields} + def last_flush_on_quitting(self): self.stop_flag = True self.flush_worker.join() self.__flush_points(self.influxdb_client) + def __flush_points(self, influxdb_client: InfluxDBClient) -> None: """ Write the cached data points to influxdb From 880cfa9ff3cea971529d3b66344baa9dd5637a8e Mon Sep 17 00:00:00 2001 From: "jacob.fiola" Date: Mon, 1 Aug 2022 08:32:00 -0600 Subject: [PATCH 5/6] addressing comments --- example/README.md | 2 +- example/requirements.txt | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/example/README.md b/example/README.md index 93ec302..c428acd 100644 --- a/example/README.md +++ b/example/README.md @@ -1,6 +1,6 @@ # locust-influxdb-boilerplate -LocustIO base project with a custom influxDB listener. +LocustIO base project with a custom influxDB listener. This package requires that Locust v1.5.0 or greater. ## Instructions diff --git a/example/requirements.txt b/example/requirements.txt index 24c504d..a297120 100644 --- a/example/requirements.txt +++ b/example/requirements.txt @@ -1,3 +1,3 @@ locust_influxdb_listener == 0.0.5 influxdb==5.3.1 -locust==2.8.6 \ No newline at end of file +locust==1.5.0 \ No newline at end of file diff --git a/setup.py b/setup.py index de46957..20c6b3d 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ "Operating System :: OS Independent", ], install_requires=[ - 'locust>=2.8.0', + 'locust>=1.5.0', 'influxdb>=5.2.2', ], python_requires='>=3.6', From 012450f51e465385a1d22d68f7b0076740355d4d Mon Sep 17 00:00:00 2001 From: Jacob Fiola Date: Thu, 4 Aug 2022 07:43:15 -0600 Subject: [PATCH 6/6] Update example/README.md Co-authored-by: Pablillo Calvo --- example/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/README.md b/example/README.md index c428acd..4faa3a0 100644 --- a/example/README.md +++ b/example/README.md @@ -1,6 +1,6 @@ # locust-influxdb-boilerplate -LocustIO base project with a custom influxDB listener. This package requires that Locust v1.5.0 or greater. +LocustIO base project with a custom influxDB listener. This package requires Locust v1.5.0 or greater. ## Instructions