Skip to content

Commit

Permalink
Event property counter (#7500)
Browse files Browse the repository at this point in the history
* create event property model

* add null

* rename cache vars

* update event properties table on ingestion

* match date formats

* match date formats

* better string handling

* property type can be null too

* pass event timestamp

* update property type later

* perform all updates through a buffer object

* move to EventPropertyCounter

* fix migration

* improve flush last seen at job

* flush job periodically + env

* upsert all event properties in 1 query

* log to statsd

* enable property counter only if experimental mode enabled

* use now() instead of event timestamp

* fix seconds

* add user/pass for default postgres

* add tests

* use big integers

* make query work with 50k props

* processing events saves event properties

* fix script

* test date format detection

* default enabled

* only enable event property counter for specific teams

* eslint fixes

* fix logs double-sync noise in tests

* fix bigint test

* don't do tasks that make no sense

* remove dead code

* simpler test setup

* different contraint name

* refactor team manager

* greatly simplify the system

* fetch cached event properties

* fix team manager and timestamps

* add cached entry

* also don't cache event properties for teams that have it disabled

* remove indexes that are not going to be used

* remove unused imports

* blacked

* remember event properties with a LRU cache

* fix eslint

* clean up the last bits

* move ONE_HOUR to constants

* use sane_repr

* fix typo
  • Loading branch information
mariusandra authored Dec 17, 2021
1 parent 9a2d5c0 commit 151f759
Show file tree
Hide file tree
Showing 19 changed files with 286 additions and 70 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ axes: 0006_remove_accesslog_trusted
contenttypes: 0002_remove_content_type_name
database: 0002_auto_20190129_2304
ee: 0005_project_based_permissioning
posthog: 0191_rename_specialmigration_asyncmigration
posthog: 0192_event_properties
rest_hooks: 0002_swappable_hook_model
sessions: 0001_initial
social_django: 0010_uid_db_index
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"prepublishOnly": "yarn build",
"setup:dev:clickhouse": "cd .. && export DEBUG=1 PRIMARY_DB=clickhouse && python manage.py migrate_clickhouse",
"setup:test:ee": "yarn setup:test:postgres && yarn setup:test:clickhouse",
"setup:test:postgres": "cd .. && (dropdb test_posthog || echo 'no db to drop') && createdb test_posthog && DATABASE_URL=postgres://localhost:5432/test_posthog DEBUG=1 python manage.py migrate",
"setup:test:postgres": "cd .. && python manage.py setup_test_environment",
"setup:test:clickhouse": "cd .. && unset KAFKA_URL && export TEST=1 PRIMARY_DB=clickhouse CLICKHOUSE_DATABASE=posthog_test && python manage.py migrate_clickhouse",
"services:start": "cd .. && docker-compose -f ee/docker-compose.ch.yml up zookeeper kafka clickhouse",
"services:stop": "cd .. && docker-compose -f ee/docker-compose.ch.yml down",
Expand Down
11 changes: 8 additions & 3 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ export function getDefaultConfig(): PluginsServerConfig {
return {
CELERY_DEFAULT_QUEUE: 'celery',
DATABASE_URL: isTestEnv
? 'postgres://localhost:5432/test_posthog'
? 'postgres://posthog:posthog@localhost:5432/test_posthog'
: isDevEnv
? 'postgres://localhost:5432/posthog'
? 'postgres://posthog:posthog@localhost:5432/posthog'
: null,
POSTHOG_DB_NAME: null,
POSTHOG_DB_USER: 'postgres',
Expand Down Expand Up @@ -59,6 +59,7 @@ export function getDefaultConfig(): PluginsServerConfig {
REDIS_POOL_MAX_SIZE: 3,
DISABLE_MMDB: isTestEnv,
DISTINCT_ID_LRU_SIZE: 10000,
EVENT_PROPERTY_LRU_SIZE: 10000,
INTERNAL_MMDB_SERVER_PORT: 0,
PLUGIN_SERVER_IDLE: false,
JOB_QUEUES: 'graphile',
Expand All @@ -78,6 +79,7 @@ export function getDefaultConfig(): PluginsServerConfig {
SITE_URL: null,
NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: '',
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: true,
EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS: '',
}
}

Expand Down Expand Up @@ -114,6 +116,7 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
REDIS_POOL_MAX_SIZE: 'maximum number of Redis connections to use per thread',
DISABLE_MMDB: 'whether to disable fetching MaxMind database for IP location',
DISTINCT_ID_LRU_SIZE: 'size of persons distinct ID LRU cache',
EVENT_PROPERTY_LRU_SIZE: "size of the event property tracker's LRU cache (keyed by [team.id, event])",
INTERNAL_MMDB_SERVER_PORT: 'port of the internal server used for IP location (0 means random)',
PLUGIN_SERVER_IDLE: 'whether to disengage the plugin server, e.g. for development',
JOB_QUEUES: 'retry queue engine and fallback queues',
Expand All @@ -135,7 +138,9 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
'(advanced) corresponds to the length of time a piscina worker should block for when looking for tasks',
NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS:
'(advanced) teams for which to run the new person properties update flow on',
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: 'enable experimental feature to track lastSeenAt',
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: '(advanced) enable experimental feature to track lastSeenAt',
EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS:
'(advanced) teams for which to enable experimental feature to track event properties',
}
}

Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/config/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const ONE_HOUR = 60 * 60 * 1000
7 changes: 4 additions & 3 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export async function startPluginsServer(
statusReport.stopStatusReportSchedule()
piscinaStatsJob && schedule.cancelJob(piscinaStatsJob)
internalMetricsStatsJob && schedule.cancelJob(internalMetricsStatsJob)
flushLastSeenAtCacheJob && schedule.cancelJob(flushLastSeenAtCacheJob)
await jobQueueConsumer?.stop()
await scheduleControl?.stopSchedule()
await new Promise<void>((resolve, reject) =>
Expand Down Expand Up @@ -198,10 +199,10 @@ export async function startPluginsServer(
})
}

// every minute flush lastSeenAt cache
// every 10 seconds past the minute flush lastSeenAt cache
if (serverConfig.EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED) {
flushLastSeenAtCacheJob = schedule.scheduleJob('0 * * * * *', async () => {
await hub!.teamManager.flushLastSeenAtCache()
flushLastSeenAtCacheJob = schedule.scheduleJob('10 * * * * *', async () => {
await piscina!.broadcastTask({ task: 'flushLastSeenAtCache' })
})
}

Expand Down
9 changes: 9 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export interface PluginsServerConfig extends Record<string, any> {
REDIS_POOL_MAX_SIZE: number
DISABLE_MMDB: boolean
DISTINCT_ID_LRU_SIZE: number
EVENT_PROPERTY_LRU_SIZE: number
INTERNAL_MMDB_SERVER_PORT: number
PLUGIN_SERVER_IDLE: boolean
JOB_QUEUES: string
Expand All @@ -104,6 +105,7 @@ export interface PluginsServerConfig extends Record<string, any> {
SITE_URL: string | null
NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: string
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: boolean
EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS: string
}

export interface Hub extends PluginsServerConfig {
Expand Down Expand Up @@ -761,6 +763,13 @@ export interface PropertyDefinitionType {
team_id: number
}

export interface EventPropertyType {
id: string
event: string
property: string
team_id: number
}

export type PluginFunction = 'onEvent' | 'onAction' | 'processEvent' | 'onSnapshot' | 'pluginTask'

export enum CeleryTriggeredJobOperation {
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
ElementGroup,
Event,
EventDefinitionType,
EventPropertyType,
Group,
GroupTypeIndex,
GroupTypeToColumnIndex,
Expand Down Expand Up @@ -1119,6 +1120,13 @@ export class DB {
).rows as PropertyDefinitionType[]
}

// EventProperty

public async fetchEventProperties(): Promise<EventPropertyType[]> {
return (await this.postgresQuery('SELECT * FROM posthog_eventproperty', undefined, 'fetchEventProperties'))
.rows as EventPropertyType[]
}

// Action & ActionStep & Action<>Event

public async fetchAllActionsGroupedByTeam(): Promise<Record<Team['id'], Record<Action['id'], Action>>> {
Expand Down
7 changes: 1 addition & 6 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,7 @@ export async function createHub(
status.info('👍', `Redis`)

const db = new DB(postgres, redisPool, kafkaProducer, clickhouse, statsd)
const teamManager = new TeamManager(
db,
statsd,
serverConfig.SITE_URL,
serverConfig.EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED
)
const teamManager = new TeamManager(db, serverConfig, statsd)
const organizationManager = new OrganizationManager(db)
const pluginsApiKeyManager = new PluginsApiKeyManager(db)
const actionManager = new ActionManager(db)
Expand Down
3 changes: 2 additions & 1 deletion plugin-server/src/utils/db/postgres-logs-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ export class PostgresLogsWrapper {
this.flushTimeout = null
}
if (this.logs.length > 0) {
await this.db.batchInsertPostgresLogs(this.logs)
const logs = this.logs
this.logs = []
await this.db.batchInsertPostgresLogs(logs)
}
}
}
3 changes: 1 addition & 2 deletions plugin-server/src/worker/ingestion/person-manager.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import LRU from 'lru-cache'

import { ONE_HOUR } from '../../config/constants'
import { PluginsServerConfig } from '../../types'
import { DB } from '../../utils/db/db'

const ONE_HOUR = 60 * 60 * 1000

export class PersonManager {
personSeen: LRU<string, boolean>

Expand Down
Loading

0 comments on commit 151f759

Please sign in to comment.