-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Event property counter #7500
Event property counter #7500
Conversation
Code and data > discussion. In favour of running this on cloud behind an env var and seeing what happens. Would be good to define the metrics that we are worried about before. |
null=True, | ||
), | ||
), | ||
("property_type_format", models.CharField(max_length=100, null=True)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we constrain the allowed formats too? Each format will need custom query code...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments inline but in general looking pretty good to me, very similar approach to last_seen_at which we already tested successfully
posthog/models/event_property.py
Outdated
property_type: models.CharField = models.CharField( | ||
max_length=20, choices=PropertyType.choices, default=PropertyType.STRING, null=True | ||
) | ||
property_type_format: models.CharField = models.CharField(max_length=100, null=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we really store this? I think we could intelligently parse as either a timestamp or ISO-8601 and discard everything else ?
posthog/models/event_property.py
Outdated
event: models.CharField = models.CharField(max_length=400, null=False) | ||
property: models.CharField = models.CharField(max_length=400, null=False) | ||
|
||
# contains e.g. the date format |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think the comment is more confusing than helpful, attribute seems pretty clear to me
posthog/models/event_property.py
Outdated
# "event_properties" is a field on Team, so have to call this "event_properties_model" | ||
Team, | ||
on_delete=models.CASCADE, | ||
related_name="event_properties_model", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could drop this, doesn't seem to add more clarity/simplicity to Django's default.
posthog/models/event_property.py
Outdated
|
||
# things we keep track of | ||
total_volume: models.BigIntegerField = models.BigIntegerField(default=None, null=True) | ||
created_at: models.DateTimeField = models.DateTimeField(default=timezone.now, null=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be null
? It's a new model
posthog/settings.py
Outdated
@@ -450,7 +450,7 @@ def get_list(text: str) -> List[str]: | |||
# https://docs.djangoproject.com/en/2.2/ref/settings/#databases | |||
|
|||
if TEST or DEBUG: | |||
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://localhost:5432/posthog") | |||
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://posthog:posthog@localhost:5432/posthog") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this change, by default some local installations have an empty password and the default OS username (e.g. Postgres.app)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This got updated in master anyway...
@@ -15,9 +15,9 @@ export function getDefaultConfig(): PluginsServerConfig { | |||
return { | |||
CELERY_DEFAULT_QUEUE: 'celery', | |||
DATABASE_URL: isTestEnv | |||
? 'postgres://localhost:5432/test_posthog' | |||
? 'postgres://posthog:posthog@localhost:5432/test_posthog' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as for settings.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For test/dev environments, this is good and aligns with our new "developing locally" guidelines.
plugin-server/package.json
Outdated
@@ -33,7 +33,7 @@ | |||
"prepublishOnly": "yarn build", | |||
"setup:dev:clickhouse": "cd .. && export DEBUG=1 PRIMARY_DB=clickhouse && python manage.py migrate_clickhouse", | |||
"setup:test:ee": "yarn setup:test:postgres && yarn setup:test:clickhouse", | |||
"setup:test:postgres": "cd .. && (dropdb test_posthog || echo 'no db to drop') && createdb test_posthog && DATABASE_URL=postgres://localhost:5432/test_posthog DEBUG=1 python manage.py migrate", | |||
"setup:test:postgres": "cd .. && (PGPASSWORD=posthog dropdb -h localhost -U posthog test_posthog || echo 'no db to drop') && PGPASSWORD=posthog createdb -h localhost -U posthog test_posthog && DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog DEBUG=1 python manage.py migrate", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update deployment docs then? Seems like we need to create this username with a specific password, also to detail what permissions are needed.
flushLastSeenAtCacheJob = schedule.scheduleJob('10 * * * * *', async () => { | ||
await piscina!.broadcastTask({ task: 'flushLastSeenAtCache' }) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know better here, don't have all the context to make the right call
export function detectDateFormat(value: string): string | void { | ||
if (value.match(/^\d{4}-\d{2}-\d{2}$/)) { | ||
return 'YYYY-MM-DD' | ||
} | ||
|
||
if (value.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(.\d{1,3}?Z?|)$/)) { | ||
return 'ISO8601 UTC' | ||
} | ||
|
||
if (value.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(.\d{1,3}|)\+\d{2}:\d{2}$/)) { | ||
return 'ISO8601 TZ' | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think timestamps are pretty common too. I added a comment elsewhere, do wonder if this is required or CH is smart enough to parse them correctly when used.
} | ||
this.eventPropertiesBuffer.totalVolume += 1 | ||
|
||
propertyBuffer.propertyType = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when we have outlier property values? e.g. I have a bug that every once in a while I send an invalid value (e.g. for a timestamp)? Can we base this based on the format of the majority instead of the latest?
const elapsedTime = DateTime.now().diff(startTime).as('milliseconds') | ||
this.statsd?.set('flushEventPropertyCounter.Size', cacheSize) | ||
this.statsd?.set('flushEventPropertyCounter.QuerySize', columns[0].length) | ||
this.statsd?.timing('flushEventPropertyCounter', elapsedTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: postgresQuery automatically has this, so this is kind of redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few fly-by comments since a review was requested.
All good to ship from my end, but didn't also dig too deeply from the correctness side of things.
I still think this task lends itself well to having really loose constraints (e.g. do you really need exact volume counts or exact timestamps) and done independently from pg, but as agreed before this can be revisited when it blows up.
await this.db.postgresQuery( | ||
`INSERT INTO posthog_eventproperty(team_id, event, property, property_type, property_type_format, total_volume, created_at, last_seen_at) | ||
SELECT * FROM UNNEST ($1::int[], $2::text[], $3::text[], $4::text[], $5::text[], $6::bigint[], $7::timestamp with time zone[], $8::timestamp with time zone[]) | ||
ON CONFLICT ON CONSTRAINT posthog_eventproperty_team_id_event_property_10910b3b_uniq DO UPDATE SET |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth giving the index a name in the migration instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So after more testing, I've gotten deadlocks locally. Yikes.
In the existing implementation we flush all event property buffers, on all threads, on all instances, at exactly 20 seconds past each minute. This means we upload a buffer of up to 50k event properties with an insert on conflict update
statement to postgres on every plugin server thread at precisely the same moment.
When I got the deadlock it was uploading two batches of 1800 items. One inserted in a second, the other deadlocked and died. You don't need to do much math to assume that even with a few more orders of magnitude of performance, and with added measures to spread out the flushes, we'll be in trouble... unless we retry and are in luck. 🍀
This turns out to be a common thing with concurrent upserts. The recommended approach is to 1) sort the keys (apparently doesn't help) 2) just retry.
Obviously this won't scale.
The main point of doing this task was to match events with the properties used by these events. The secondary goal was to also sort the properties by usage and e.g. lower the property in the list if it hasn't been seen in a while. Detecting the type and counting the events was an added bonus.
I have now removed everything but the code to track events & property pairs. It works similar to how the existing event and property definition tracker worked: it keeps a track of all team&event&property pairs it has seen, and if there are new ones, it inserts them in a loop. This is run synchronously during ingestion, just like it has been for the existing trackers.
This should eliminate the possibility of deadlocks. I think it'll perform fine as well. However the only thing I'm now concerned about is memory usage.
Assuming 100k unique team/event combos (select count(*) from posthog_eventdefinition
), a back of the envelope calculation with ~20 properties per event gives an added 100k2020 = 40MB of memory per thread. It should be less than that in reality, as we only cache these properties for active teams, but this might nevertheless become the next pain point. 🤔
plugin-server/package.json
Outdated
@@ -33,7 +33,7 @@ | |||
"prepublishOnly": "yarn build", | |||
"setup:dev:clickhouse": "cd .. && export DEBUG=1 PRIMARY_DB=clickhouse && python manage.py migrate_clickhouse", | |||
"setup:test:ee": "yarn setup:test:postgres && yarn setup:test:clickhouse", | |||
"setup:test:postgres": "cd .. && (dropdb test_posthog || echo 'no db to drop') && createdb test_posthog && DATABASE_URL=postgres://localhost:5432/test_posthog DEBUG=1 python manage.py migrate", | |||
"setup:test:postgres": "cd .. && (PGPASSWORD=posthog dropdb -h localhost -U posthog test_posthog || echo 'no db to drop') && PGPASSWORD=posthog createdb -h localhost -U posthog test_posthog && DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog DEBUG=1 python manage.py migrate", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However I solved this now by replacing
"setup:test:postgres": "cd .. && (PGPASSWORD=posthog dropdb -h localhost -U posthog test_posthog || echo 'no db to drop') && PGPASSWORD=posthog createdb -h localhost -U posthog test_posthog && DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog DEBUG=1 python manage.py migrate",
With
"setup:test:postgres": "cd .. && python manage.py setup_test_environment",
@@ -15,9 +15,9 @@ export function getDefaultConfig(): PluginsServerConfig { | |||
return { | |||
CELERY_DEFAULT_QUEUE: 'celery', | |||
DATABASE_URL: isTestEnv | |||
? 'postgres://localhost:5432/test_posthog' | |||
? 'postgres://posthog:posthog@localhost:5432/test_posthog' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For test/dev environments, this is good and aligns with our new "developing locally" guidelines.
posthog/settings.py
Outdated
@@ -450,7 +450,7 @@ def get_list(text: str) -> List[str]: | |||
# https://docs.djangoproject.com/en/2.2/ref/settings/#databases | |||
|
|||
if TEST or DEBUG: | |||
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://localhost:5432/posthog") | |||
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://posthog:posthog@localhost:5432/posthog") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This got updated in master anyway...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some comments, but looks operationally just as sane as before. Didn't dig too deeb for correctness though
this.eventDefinitionsCache = new Map() | ||
this.eventPropertiesCache = new LRU({ | ||
max: serverConfig.EVENT_PROPERTY_LRU_SIZE, // keep in memory the last 10k team+event combos we have seen | ||
maxAge: 60 * 60 * 1000, // and each for up to 1 hour |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth extracting to constants.ts given it's used in another file as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I guess we can. Moved ONE_HOUR
to a new file called constants.ts
, and imported in the 4 places it's used.
While all 4 places do reference the time diff of 60601000 milliseconds, they're not connected, and having them all use the same const feels funny. Ideally the constants could be called something like EVENT_PROPERTY_LRU_MAX_AGE
... and then moved out of contants.ts
into config.ts
with the other envs... but I'm going to draw a line here.
@@ -106,18 +118,31 @@ export class TeamManager { | |||
ingested: team.ingested_event, | |||
}) | |||
|
|||
await this.cacheEventNamesAndProperties(team.id) | |||
await this.cacheEventNamesAndProperties(team.id, event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 Should we use await Promise.all([ ... ])
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid this can substantially increase the number of open postgres connections in short bursts. Thus safer, for now, to keep this sequential.
The first function here should anyway run before all the others, but the sync
-s could be done in parallel in the future, though, again, I'm a bit afraid of excess postgres parallelisation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since this should only be run the first time a unique event or property is ever seen, we might just be prematurely optimising here.
for (const property of propertyKeys) { | ||
if (!properties.has(property)) { | ||
properties.add(property) | ||
await this.db.postgresQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It we could do this insert in a batch instead of per-property 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's pretty much what I was doing in the original implementation, leading to the deadlocks mentioned above. Even if we go from ON CONFLICT DO UPDATE
to ON CONFLICT DO NOTHING
, I'm going to have to implement code to retry the transactions if the block deadlocks, and all other complexities that come with it.
It would much safer since we're no longer updating the rows that are already in the database, and not running all flush operations at precisely the same moment on every thread, but these deadlocks can't be avoided --> the same new event may come in 10 times in the same millisecond
, be handled by 4 threads, which all insert the same properties at precisely the same moment.
Thus, while a bit slower, I think ~20 simple inserts for each new event the first time it's seen will scale much better.
Changes
EventProperty
, and populates it during ingestion:How did you test this code?