-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce event aggregator feature #87
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, we will indeed need to find a way to integrate it in the connectors code so we could benefit from those aggregation capabilities.
xxhash.xxh64( | ||
f"{event_dialect_uuid};{fingerprint.build_hash_str_func(event)}" | ||
).hexdigest(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xxhash.xxh64( | |
f"{event_dialect_uuid};{fingerprint.build_hash_str_func(event)}" | |
).hexdigest(), | |
xxhash.xxh3_64_hexdigest( | |
f"{event_dialect_uuid};{fingerprint.build_hash_str_func(event)}" | |
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
import time | ||
from typing import Callable, Tuple | ||
|
||
import xxhash as xxhash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import xxhash as xxhash | |
import xxhash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pycharm you are drunk. thank you @Darkheir
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
""" | ||
Returns the hash to fingerprint the specified event and its ttl | ||
""" | ||
event_dialect_uuid = event["sekoiaio"]["intake"]["dialect_uuid"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this attribute is set in the events. The workflow sets it, but in the connectors we usually only have the intake key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the need to explicit the dialect as an aggregation engine will be configured for each connector with their specific dialects
condition_func: Callable[[dict], bool] | ||
build_hash_str_func: Callable[[dict], str] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could have a single function that, if it returns None
, will omit aggregation ?
The idea behind it is to avoid to have to iterate the events twice, once to check if it can be aggregated and a second time to calculate the fingerprint.
condition_func: Callable[[dict], bool] | |
build_hash_str_func: Callable[[dict], str] | |
build_hash_str_func: Callable[[dict], str | None] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applied
|
||
class EventAggregatorTTLThread(threading.Thread): | ||
f_must_stop: bool # thread will stop if this flag is active | ||
ttl: int # ttl value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ttl: int # ttl value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
aggregated_event = copy.deepcopy(self.event) | ||
aggregated_event["sekoiaio"]["repeat"] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggregated_event = copy.deepcopy(self.event) | |
aggregated_event["sekoiaio"]["repeat"] = { | |
aggregated_event = copy.deepcopy(self.event) | |
aggregated_event.setdefault("sekoiaio", {}) | |
aggregated_event["sekoiaio"]["repeat"] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
def __init__(self, aggregation_definitions: dict[str, list[Fingerprint]]): | ||
self.aggregation_definitions = aggregation_definitions | ||
self.aggregations: dict[str, Aggregation] = dict() | ||
self.lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improvement that could be added later:
If the lock becomes a bottleneck a common solution is to use shards to avoid lock contention.
Based on the fingerprint the Aggregations would be stored in 1 of the n shards. The lock to access or write a key would then need to lock only 1 shard, allowing other threads to read/write in the other shards at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach looks very interesting.
The main issue that prevented me from implementing on this initial version is the complexity that comes with the following expectation that every shard must be handled by dedicated threads to prevent one locked shard to block the others.
But obviously, if you believe this current version will have a major impact on the performances of a collector I will implement this sharding mecanism in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, let's not add complexity that may not even bee needed.
It was just a thought I got while reviewing your code :)
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #87 +/- ##
==========================================
- Coverage 95.83% 95.74% -0.09%
==========================================
Files 26 27 +1
Lines 1897 1998 +101
==========================================
+ Hits 1818 1913 +95
- Misses 79 85 +6
☔ View full report in Codecov by Sentry. |
Unfortunately, the connector class doesn't expose a method that gives access to a parsed version of the event. Instead it only have access to the str version of the event. |
e513150
to
dde0103
Compare
So maybe we could expose an |
This PR introduces the event aggregator feature in the SDK.
Some work must still be done to find a way to transparently integrate it in connectors logic without major performance impact.