diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 558398498dc07..cf1e77892af3b 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -4,7 +4,7 @@ axes: 0006_remove_accesslog_trusted contenttypes: 0002_remove_content_type_name database: 0002_auto_20190129_2304 ee: 0005_project_based_permissioning -posthog: 0191_rename_specialmigration_asyncmigration +posthog: 0192_event_properties rest_hooks: 0002_swappable_hook_model sessions: 0001_initial social_django: 0010_uid_db_index diff --git a/plugin-server/package.json b/plugin-server/package.json index 67e10d95e0453..45669d1f2e3f0 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -33,7 +33,7 @@ "prepublishOnly": "yarn build", "setup:dev:clickhouse": "cd .. && export DEBUG=1 PRIMARY_DB=clickhouse && python manage.py migrate_clickhouse", "setup:test:ee": "yarn setup:test:postgres && yarn setup:test:clickhouse", - "setup:test:postgres": "cd .. && (dropdb test_posthog || echo 'no db to drop') && createdb test_posthog && DATABASE_URL=postgres://localhost:5432/test_posthog DEBUG=1 python manage.py migrate", + "setup:test:postgres": "cd .. && python manage.py setup_test_environment", "setup:test:clickhouse": "cd .. && unset KAFKA_URL && export TEST=1 PRIMARY_DB=clickhouse CLICKHOUSE_DATABASE=posthog_test && python manage.py migrate_clickhouse", "services:start": "cd .. && docker-compose -f ee/docker-compose.ch.yml up zookeeper kafka clickhouse", "services:stop": "cd .. && docker-compose -f ee/docker-compose.ch.yml down", diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 39c9c999cc749..b877d88ccac38 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -15,9 +15,9 @@ export function getDefaultConfig(): PluginsServerConfig { return { CELERY_DEFAULT_QUEUE: 'celery', DATABASE_URL: isTestEnv - ? 'postgres://localhost:5432/test_posthog' + ? 'postgres://posthog:posthog@localhost:5432/test_posthog' : isDevEnv - ? 'postgres://localhost:5432/posthog' + ? 'postgres://posthog:posthog@localhost:5432/posthog' : null, POSTHOG_DB_NAME: null, POSTHOG_DB_USER: 'postgres', @@ -59,6 +59,7 @@ export function getDefaultConfig(): PluginsServerConfig { REDIS_POOL_MAX_SIZE: 3, DISABLE_MMDB: isTestEnv, DISTINCT_ID_LRU_SIZE: 10000, + EVENT_PROPERTY_LRU_SIZE: 10000, INTERNAL_MMDB_SERVER_PORT: 0, PLUGIN_SERVER_IDLE: false, JOB_QUEUES: 'graphile', @@ -78,6 +79,7 @@ export function getDefaultConfig(): PluginsServerConfig { SITE_URL: null, NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: '', EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: true, + EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS: '', } } @@ -114,6 +116,7 @@ export function getConfigHelp(): Record { REDIS_POOL_MAX_SIZE: 'maximum number of Redis connections to use per thread', DISABLE_MMDB: 'whether to disable fetching MaxMind database for IP location', DISTINCT_ID_LRU_SIZE: 'size of persons distinct ID LRU cache', + EVENT_PROPERTY_LRU_SIZE: "size of the event property tracker's LRU cache (keyed by [team.id, event])", INTERNAL_MMDB_SERVER_PORT: 'port of the internal server used for IP location (0 means random)', PLUGIN_SERVER_IDLE: 'whether to disengage the plugin server, e.g. for development', JOB_QUEUES: 'retry queue engine and fallback queues', @@ -135,7 +138,9 @@ export function getConfigHelp(): Record { '(advanced) corresponds to the length of time a piscina worker should block for when looking for tasks', NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: '(advanced) teams for which to run the new person properties update flow on', - EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: 'enable experimental feature to track lastSeenAt', + EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: '(advanced) enable experimental feature to track lastSeenAt', + EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS: + '(advanced) teams for which to enable experimental feature to track event properties', } } diff --git a/plugin-server/src/config/constants.ts b/plugin-server/src/config/constants.ts new file mode 100644 index 0000000000000..f170b25f7ed6c --- /dev/null +++ b/plugin-server/src/config/constants.ts @@ -0,0 +1 @@ +export const ONE_HOUR = 60 * 60 * 1000 diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 418051a8d6b91..8be1b3f222969 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -81,6 +81,7 @@ export async function startPluginsServer( statusReport.stopStatusReportSchedule() piscinaStatsJob && schedule.cancelJob(piscinaStatsJob) internalMetricsStatsJob && schedule.cancelJob(internalMetricsStatsJob) + flushLastSeenAtCacheJob && schedule.cancelJob(flushLastSeenAtCacheJob) await jobQueueConsumer?.stop() await scheduleControl?.stopSchedule() await new Promise((resolve, reject) => @@ -198,10 +199,10 @@ export async function startPluginsServer( }) } - // every minute flush lastSeenAt cache + // every 10 seconds past the minute flush lastSeenAt cache if (serverConfig.EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED) { - flushLastSeenAtCacheJob = schedule.scheduleJob('0 * * * * *', async () => { - await hub!.teamManager.flushLastSeenAtCache() + flushLastSeenAtCacheJob = schedule.scheduleJob('10 * * * * *', async () => { + await piscina!.broadcastTask({ task: 'flushLastSeenAtCache' }) }) } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 168f500028a8d..6685d8a4cb3b7 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -85,6 +85,7 @@ export interface PluginsServerConfig extends Record { REDIS_POOL_MAX_SIZE: number DISABLE_MMDB: boolean DISTINCT_ID_LRU_SIZE: number + EVENT_PROPERTY_LRU_SIZE: number INTERNAL_MMDB_SERVER_PORT: number PLUGIN_SERVER_IDLE: boolean JOB_QUEUES: string @@ -104,6 +105,7 @@ export interface PluginsServerConfig extends Record { SITE_URL: string | null NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: string EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: boolean + EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS: string } export interface Hub extends PluginsServerConfig { @@ -761,6 +763,13 @@ export interface PropertyDefinitionType { team_id: number } +export interface EventPropertyType { + id: string + event: string + property: string + team_id: number +} + export type PluginFunction = 'onEvent' | 'onAction' | 'processEvent' | 'onSnapshot' | 'pluginTask' export enum CeleryTriggeredJobOperation { diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 3e71b342d7f0a..c6dc7ed67d55a 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -31,6 +31,7 @@ import { ElementGroup, Event, EventDefinitionType, + EventPropertyType, Group, GroupTypeIndex, GroupTypeToColumnIndex, @@ -1119,6 +1120,13 @@ export class DB { ).rows as PropertyDefinitionType[] } + // EventProperty + + public async fetchEventProperties(): Promise { + return (await this.postgresQuery('SELECT * FROM posthog_eventproperty', undefined, 'fetchEventProperties')) + .rows as EventPropertyType[] + } + // Action & ActionStep & Action<>Event public async fetchAllActionsGroupedByTeam(): Promise>> { diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 69cbfaa13feba..f5362e9efe87f 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -171,12 +171,7 @@ export async function createHub( status.info('👍', `Redis`) const db = new DB(postgres, redisPool, kafkaProducer, clickhouse, statsd) - const teamManager = new TeamManager( - db, - statsd, - serverConfig.SITE_URL, - serverConfig.EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED - ) + const teamManager = new TeamManager(db, serverConfig, statsd) const organizationManager = new OrganizationManager(db) const pluginsApiKeyManager = new PluginsApiKeyManager(db) const actionManager = new ActionManager(db) diff --git a/plugin-server/src/utils/db/postgres-logs-wrapper.ts b/plugin-server/src/utils/db/postgres-logs-wrapper.ts index 7445d22dd64ff..ecc3ffe585750 100644 --- a/plugin-server/src/utils/db/postgres-logs-wrapper.ts +++ b/plugin-server/src/utils/db/postgres-logs-wrapper.ts @@ -36,8 +36,9 @@ export class PostgresLogsWrapper { this.flushTimeout = null } if (this.logs.length > 0) { - await this.db.batchInsertPostgresLogs(this.logs) + const logs = this.logs this.logs = [] + await this.db.batchInsertPostgresLogs(logs) } } } diff --git a/plugin-server/src/worker/ingestion/person-manager.ts b/plugin-server/src/worker/ingestion/person-manager.ts index 7bd30170eb762..5b5fbe5524258 100644 --- a/plugin-server/src/worker/ingestion/person-manager.ts +++ b/plugin-server/src/worker/ingestion/person-manager.ts @@ -1,10 +1,9 @@ import LRU from 'lru-cache' +import { ONE_HOUR } from '../../config/constants' import { PluginsServerConfig } from '../../types' import { DB } from '../../utils/db/db' -const ONE_HOUR = 60 * 60 * 1000 - export class PersonManager { personSeen: LRU diff --git a/plugin-server/src/worker/ingestion/team-manager.ts b/plugin-server/src/worker/ingestion/team-manager.ts index 0a628903b7ac8..1da96378eeb1a 100644 --- a/plugin-server/src/worker/ingestion/team-manager.ts +++ b/plugin-server/src/worker/ingestion/team-manager.ts @@ -1,8 +1,10 @@ import { Properties } from '@posthog/plugin-scaffold' import { StatsD } from 'hot-shots' +import LRU from 'lru-cache' import { DateTime } from 'luxon' -import { Team, TeamId } from '../../types' +import { ONE_HOUR } from '../../config/constants' +import { PluginsServerConfig, Team, TeamId } from '../../types' import { DB } from '../../utils/db/db' import { timeoutGuard } from '../../utils/db/utils' import { posthog } from '../../utils/posthog' @@ -14,25 +16,37 @@ type TeamCache = Map export class TeamManager { db: DB teamCache: TeamCache - eventNamesCache: Map> - eventLastSeenCache: Map // key: ${team_id}_${name}; value: DateTime.valueOf() + eventDefinitionsCache: Map> + eventPropertiesCache: LRU> // Map> + eventLastSeenCache: Map // key: JSON.stringify([team_id, event]); value: timestamp (in ms) lastFlushAt: DateTime // time when the `eventLastSeenCache` was last flushed - eventPropertiesCache: Map> + propertyDefinitionsCache: Map> instanceSiteUrl: string experimentalLastSeenAtEnabled: boolean + propertyCounterTeams: Set statsd?: StatsD - // TODO: #7422 Remove temporary parameter - constructor(db: DB, statsd?: StatsD, instanceSiteUrl?: string | null, experimentalLastSeenAtEnabled?: boolean) { + constructor(db: DB, serverConfig: PluginsServerConfig, statsd?: StatsD) { this.db = db this.statsd = statsd this.teamCache = new Map() - this.eventNamesCache = new Map() + this.eventDefinitionsCache = new Map() + this.eventPropertiesCache = new LRU({ + max: serverConfig.EVENT_PROPERTY_LRU_SIZE, // keep in memory the last 10k team+event combos we have seen + maxAge: ONE_HOUR, // and each for up to 1 hour + updateAgeOnGet: true, + }) this.eventLastSeenCache = new Map() - this.eventPropertiesCache = new Map() - this.instanceSiteUrl = instanceSiteUrl || 'unknown' + this.propertyDefinitionsCache = new Map() + this.instanceSiteUrl = serverConfig.SITE_URL || 'unknown' this.lastFlushAt = DateTime.now() - this.experimentalLastSeenAtEnabled = experimentalLastSeenAtEnabled ?? false + + // TODO: #7422 Remove temporary EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED + // TODO: #7500 Remove temporary EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS + this.experimentalLastSeenAtEnabled = serverConfig.EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED ?? false + this.propertyCounterTeams = new Set( + (serverConfig.EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS || '').split(',').map(parseInt) + ) } public async fetchTeam(teamId: number): Promise { @@ -66,12 +80,11 @@ export class TeamManager { const events = this.eventLastSeenCache this.eventLastSeenCache = new Map() - for (const event of events) { - const [key, value] = event + for (const [key, timestamp] of events) { const [teamId, eventName] = JSON.parse(key) - if (teamId && eventName && value) { + if (teamId && eventName && timestamp) { valuesStatements.push(`($${params.length + 1},$${params.length + 2},$${params.length + 3})`) - params.push(teamId, eventName, value / 1000) + params.push(teamId, eventName, timestamp / 1000) } } @@ -106,18 +119,31 @@ export class TeamManager { ingested: team.ingested_event, }) - await this.cacheEventNamesAndProperties(team.id) + await this.cacheEventNamesAndProperties(team.id, event) + await this.syncEventDefinitions(team, event) + await this.syncEventProperties(team, event, Object.keys(properties)) + await this.syncPropertyDefinitions(properties, team) + await this.setTeamIngestedEvent(team, properties) + + clearTimeout(timeout) + + const statsDEvent = this.experimentalLastSeenAtEnabled + ? 'updateEventNamesAndProperties.lastSeenAtEnabled' + : 'updateEventNamesAndProperties' + this.statsd?.timing(statsDEvent, DateTime.now().diff(startTime).as('milliseconds')) + } - if (!this.eventNamesCache.get(team.id)?.has(event)) { + private async syncEventDefinitions(team: Team, event: string) { + if (!this.eventDefinitionsCache.get(team.id)?.has(event)) { // TODO: #7422 Temporary conditional to test experimental feature if (this.experimentalLastSeenAtEnabled) { status.info('Inserting new event definition with last_seen_at') await this.db.postgresQuery( `INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id, last_seen_at, created_at)` + - ` VALUES ($1, $2, NULL, NULL, $3, NOW(), NOW())` + + ` VALUES ($1, $2, NULL, NULL, $3, $4, NOW())` + ` ON CONFLICT ON CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq` + - ` DO UPDATE SET last_seen_at=NOW()`, - [new UUIDT().toString(), event, team.id], + ` DO UPDATE SET last_seen_at=$4`, + [new UUIDT().toString(), event, team.id, DateTime.now()], 'insertEventDefinition' ) } else { @@ -129,13 +155,14 @@ export class TeamManager { 'insertEventDefinition' ) } - this.eventNamesCache.get(team.id)?.add(event) + this.eventDefinitionsCache.get(team.id)?.add(event) } else { // TODO: #7422 Temporary conditional to test experimental feature if (this.experimentalLastSeenAtEnabled) { const eventCacheKey = JSON.stringify([team.id, event]) - if ((this.eventLastSeenCache.get(eventCacheKey) ?? 0) < DateTime.now().valueOf()) { - this.eventLastSeenCache.set(eventCacheKey, DateTime.now().valueOf()) + const timestamp = DateTime.now().valueOf() + if ((this.eventLastSeenCache.get(eventCacheKey) ?? 0) < timestamp) { + this.eventLastSeenCache.set(eventCacheKey, timestamp) } // TODO: Allow configuring this via env vars // We flush here every 2 mins (as a failsafe) because the main thread flushes every minute @@ -145,18 +172,44 @@ export class TeamManager { } } } + } + private async syncEventProperties(team: Team, event: string, propertyKeys: string[]) { + if (!this.propertyCounterTeams.has(team.id)) { + return + } + const key = JSON.stringify([team.id, event]) + let properties = this.eventPropertiesCache.get(key) + if (!properties) { + properties = new Set() + this.eventPropertiesCache.set(key, properties) + } + for (const property of propertyKeys) { + if (!properties.has(property)) { + properties.add(property) + await this.db.postgresQuery( + `INSERT INTO posthog_eventproperty (event, property, team_id) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING`, + [event, property, team.id], + 'insertEventProperty' + ) + } + } + } + + private async syncPropertyDefinitions(properties: Properties, team: Team) { for (const [key, value] of Object.entries(properties)) { - if (!this.eventPropertiesCache.get(team.id)?.has(key)) { + if (!this.propertyDefinitionsCache.get(team.id)?.has(key)) { await this.db.postgresQuery( `INSERT INTO posthog_propertydefinition (id, name, is_numerical, volume_30_day, query_usage_30_day, team_id) VALUES ($1, $2, $3, NULL, NULL, $4) ON CONFLICT DO NOTHING`, [new UUIDT().toString(), key, typeof value === 'number', team.id], 'insertPropertyDefinition' ) - this.eventPropertiesCache.get(team.id)?.add(key) + this.propertyDefinitionsCache.get(team.id)?.add(key) } } + } + private async setTeamIngestedEvent(team: Team, properties: Properties) { if (team && !team.ingested_event) { await this.db.postgresQuery( `UPDATE posthog_team SET ingested_event = $1 WHERE id = $2`, @@ -186,34 +239,55 @@ export class TeamManager { }) } } - clearTimeout(timeout) - const statsDEvent = this.experimentalLastSeenAtEnabled - ? 'updateEventNamesAndProperties.lastSeenAtEnabled' - : 'updateEventNamesAndProperties' - this.statsd?.timing(statsDEvent, DateTime.now().diff(startTime).as('milliseconds')) } - public async cacheEventNamesAndProperties(teamId: number): Promise { - let eventNamesCache = this.eventNamesCache.get(teamId) - if (!eventNamesCache) { - const eventData = await this.db.postgresQuery( + public async cacheEventNamesAndProperties(teamId: number, event: string): Promise { + let eventDefinitionsCache = this.eventDefinitionsCache.get(teamId) + if (!eventDefinitionsCache) { + const eventNames = await this.db.postgresQuery( 'SELECT name FROM posthog_eventdefinition WHERE team_id = $1', [teamId], 'fetchEventDefinitions' ) - eventNamesCache = new Set(eventData.rows.map((r) => r.name)) - this.eventNamesCache.set(teamId, eventNamesCache) + eventDefinitionsCache = new Set(eventNames.rows.map((r) => r.name)) + this.eventDefinitionsCache.set(teamId, eventDefinitionsCache) } - let eventPropertiesCache = this.eventPropertiesCache.get(teamId) - if (!eventPropertiesCache) { + let propertyDefinitionsCache = this.propertyDefinitionsCache.get(teamId) + if (!propertyDefinitionsCache) { const eventProperties = await this.db.postgresQuery( 'SELECT name FROM posthog_propertydefinition WHERE team_id = $1', [teamId], 'fetchPropertyDefinitions' ) - eventPropertiesCache = new Set(eventProperties.rows.map((r) => r.name)) - this.eventPropertiesCache.set(teamId, eventPropertiesCache) + propertyDefinitionsCache = new Set(eventProperties.rows.map((r) => r.name)) + this.propertyDefinitionsCache.set(teamId, propertyDefinitionsCache) + } + + // Run only if the feature is enabled for this team + if (this.propertyCounterTeams.has(teamId)) { + const cacheKey = JSON.stringify([teamId, event]) + let properties = this.eventPropertiesCache.get(cacheKey) + if (!properties) { + properties = new Set() + this.eventPropertiesCache.set(cacheKey, properties) + + // The code above and below introduces a race condition. At this point we have an empty set in the cache, + // and will be waiting for the query below to return. If at the same time, asynchronously, we start to + // process another event with the same name for this team, `syncEventProperties` above will see the empty + // cache and will start to insert (on conflict do nothing) all the properties for the event. This will + // continue until either 1) the inserts will fill up the cache, or 2) the query below returns. + // All-in-all, not the end of the world, but a slight nuisance. + + const eventProperties = await this.db.postgresQuery( + 'SELECT property FROM posthog_eventproperty WHERE team_id = $1 and event = $2', + [teamId, event], + 'fetchEventProperties' + ) + for (const { property } of eventProperties.rows) { + properties.add(property) + } + } } } } diff --git a/plugin-server/tests/clickhouse/e2e.test.ts b/plugin-server/tests/clickhouse/e2e.test.ts index de4a2de3ee8dc..e6639226274e5 100644 --- a/plugin-server/tests/clickhouse/e2e.test.ts +++ b/plugin-server/tests/clickhouse/e2e.test.ts @@ -1,5 +1,6 @@ import Piscina from '@posthog/piscina' +import { ONE_HOUR } from '../../src/config/constants' import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/config/kafka-topics' import { startPluginsServer } from '../../src/main/pluginsServer' import { LogLevel, PluginsServerConfig } from '../../src/types' @@ -16,7 +17,6 @@ import { resetTestDatabase } from '../helpers/sql' import { delayUntilEventIngested } from '../shared/process-event' const { console: testConsole } = writeToFile -const ONE_HOUR = 1000 * 60 * 60 jest.mock('../../src/utils/status') jest.setTimeout(60000) // 60 sec timeout @@ -43,7 +43,7 @@ export async function processEvent (event) { } export function onEvent (event, { global }) { - // we use this to mock setupPlugin being + // we use this to mock setupPlugin being // run after some events were already ingested global.timestampBoundariesForTeam = { max: new Date(), diff --git a/plugin-server/tests/postgres/e2e.test.ts b/plugin-server/tests/postgres/e2e.test.ts index 5ff88e859b541..5eddf57f3d0b8 100644 --- a/plugin-server/tests/postgres/e2e.test.ts +++ b/plugin-server/tests/postgres/e2e.test.ts @@ -1,6 +1,7 @@ import Piscina from '@posthog/piscina' import * as IORedis from 'ioredis' +import { ONE_HOUR } from '../../src/config/constants' import { startPluginsServer } from '../../src/main/pluginsServer' import { LogLevel } from '../../src/types' import { Hub } from '../../src/types' @@ -15,7 +16,6 @@ import { delayUntilEventIngested } from '../shared/process-event' const { console: testConsole } = writeToFile const HISTORICAL_EVENTS_COUNTER_CACHE_KEY = '@plugin/60/2/historical_events_seen' -const ONE_HOUR = 1000 * 60 * 60 jest.mock('../../src/utils/status') jest.setTimeout(60000) // 60 sec timeout @@ -34,7 +34,7 @@ const indexJs = ` } export function onEvent (event, { global }) { - // we use this to mock setupPlugin being + // we use this to mock setupPlugin being // run after some events were already ingested global.timestampBoundariesForTeam = { max: new Date(), diff --git a/plugin-server/tests/shared/process-event.ts b/plugin-server/tests/shared/process-event.ts index 60a4dd18c961d..ccc8f18e0a4e1 100644 --- a/plugin-server/tests/shared/process-event.ts +++ b/plugin-server/tests/shared/process-event.ts @@ -121,6 +121,7 @@ export const createProcessEventTests = ( PLUGINS_CELERY_QUEUE: 'test-plugins-celery-queue', CELERY_DEFAULT_QUEUE: 'test-celery-default-queue', LOG_LEVEL: LogLevel.Log, + EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS: '2', ...(extraServerConfig ?? {}), ...(additionalProps ?? {}), }) @@ -1903,6 +1904,7 @@ export const createProcessEventTests = ( test('team event_properties', async () => { expect(await hub.db.fetchEventDefinitions()).toEqual([]) + expect(await hub.db.fetchEventProperties()).toEqual([]) expect(await hub.db.fetchPropertyDefinitions()).toEqual([]) await processEvent( @@ -1955,6 +1957,28 @@ export const createProcessEventTests = ( volume_30_day: null, }, ]) + + // flushed every minute normally, triggering flush now, it's tested elsewhere + expect(await hub.db.fetchEventProperties()).toEqual([ + { + id: expect.any(Number), + event: 'purchase', + property: 'price', + team_id: 2, + }, + { + id: expect.any(Number), + event: 'purchase', + property: 'name', + team_id: 2, + }, + { + id: expect.any(Number), + event: 'purchase', + property: '$ip', + team_id: 2, + }, + ]) }) test('event name object json', async () => { diff --git a/plugin-server/tests/worker/ingestion/team-manager.test.ts b/plugin-server/tests/worker/ingestion/team-manager.test.ts index f48fd3a77f5a4..4a934ff98ec66 100644 --- a/plugin-server/tests/worker/ingestion/team-manager.test.ts +++ b/plugin-server/tests/worker/ingestion/team-manager.test.ts @@ -1,4 +1,4 @@ -import { DateTime } from 'luxon' +import { DateTime, Settings } from 'luxon' import { mocked } from 'ts-jest/utils' import { defaultConfig } from '../../../src/config/config' @@ -22,7 +22,9 @@ describe('TeamManager()', () => { let teamManager: TeamManager beforeEach(async () => { - ;[hub, closeHub] = await createHub() + ;[hub, closeHub] = await createHub({ + EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED_TEAMS: '2', + }) await resetTestDatabase() teamManager = hub.teamManager }) @@ -69,6 +71,7 @@ describe('TeamManager()', () => { await hub.db.postgresQuery("UPDATE posthog_team SET ingested_event = 't'", undefined, 'testTag') await hub.db.postgresQuery('DELETE FROM posthog_eventdefinition', undefined, 'testTag') await hub.db.postgresQuery('DELETE FROM posthog_propertydefinition', undefined, 'testTag') + await hub.db.postgresQuery('DELETE FROM posthog_eventproperty', undefined, 'testTag') await hub.db.postgresQuery( `INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id, created_at) VALUES ($1, $2, $3, $4, $5, NOW())`, [new UUIDT().toString(), '$pageview', 3, 2, 2], @@ -89,9 +92,16 @@ describe('TeamManager()', () => { [new UUIDT().toString(), 'numeric_prop', true, null, null, 2], 'testTag' ) + await hub.db.postgresQuery( + `INSERT INTO posthog_eventproperty (event, property, team_id) VALUES ($1, $2, $3)`, + ['new-event', 'numeric_prop', 2], + 'testTag' + ) }) it('updates event properties', async () => { + jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27T11:00:36.000Z').getTime()) + await teamManager.updateEventNamesAndProperties(2, 'new-event', { property_name: 'efg', number: 4, @@ -141,6 +151,27 @@ describe('TeamManager()', () => { } } + expect(await hub.db.fetchEventProperties()).toEqual([ + { + id: expect.any(Number), + event: 'new-event', + property: 'numeric_prop', + team_id: 2, + }, + { + id: expect.any(Number), + event: 'new-event', + property: 'property_name', + team_id: 2, + }, + { + id: expect.any(Number), + event: 'new-event', + property: 'number', + team_id: 2, + }, + ]) + expect(await hub.db.fetchPropertyDefinitions()).toEqual([ { id: expect.any(String), @@ -231,16 +262,17 @@ describe('TeamManager()', () => { it('flushes lastSeenCache properly', async () => { jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-01-01T00:00:00.000Z').getTime()) + await teamManager.updateEventNamesAndProperties(2, 'new-event', {}) await hub.db.postgresQuery( "UPDATE posthog_eventdefinition SET last_seen_at = to_timestamp(1497307499) WHERE team_id = 2 AND name = '$pageview'", undefined, 'test' ) - teamManager.eventLastSeenCache.set(JSON.stringify([2, '$pageview']), 1497307450) // older than currently last_seen_at - teamManager.eventLastSeenCache.set(JSON.stringify([2, 'new-event']), 1626129850) // regular - teamManager.eventLastSeenCache.set(JSON.stringify([2, 'another_test_event']), 1623537850) - teamManager.eventLastSeenCache.set(JSON.stringify([3, '$pageview']), 1528843450) // inexistent team + teamManager.eventLastSeenCache.set(JSON.stringify([2, '$pageview']), 1497307450000) // older than currently last_seen_at + teamManager.eventLastSeenCache.set(JSON.stringify([2, 'new-event']), 1626129850000) // regular + teamManager.eventLastSeenCache.set(JSON.stringify([2, 'another_test_event']), 1623537850000) + teamManager.eventLastSeenCache.set(JSON.stringify([3, '$pageview']), 1528843450000) // inexistent team jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-03-03T03:03:03Z').getTime()) jest.spyOn(hub.db, 'postgresQuery') @@ -249,8 +281,8 @@ describe('TeamManager()', () => { expect(teamManager.lastFlushAt.valueOf()).toBe(DateTime.fromISO('2020-03-03T03:03:03Z').valueOf()) expect(hub.db.postgresQuery).toHaveBeenCalledTimes(1) // only a single query is fired expect(hub.db.postgresQuery).toHaveBeenCalledWith( - `UPDATE posthog_eventdefinition AS t1 SET last_seen_at = GREATEST(t1.last_seen_at, to_timestamp(t2.last_seen_at::integer)) - FROM (VALUES ($1, $2, $3),($4, $5, $6),($7, $8, $9),($10, $11, $12)) AS t2(team_id, name, last_seen_at) + `UPDATE posthog_eventdefinition AS t1 SET last_seen_at = GREATEST(t1.last_seen_at, to_timestamp(t2.last_seen_at::numeric)) + FROM (VALUES ($1,$2,$3),($4,$5,$6),($7,$8,$9),($10,$11,$12)) AS t2(team_id, name, last_seen_at) WHERE t1.name = t2.name AND t1.team_id = t2.team_id::integer`, [ 2, @@ -321,7 +353,7 @@ describe('TeamManager()', () => { it('handles cache invalidation properly', async () => { await teamManager.fetchTeam(2) - await teamManager.cacheEventNamesAndProperties(2) + await teamManager.cacheEventNamesAndProperties(2, '$foobar') await hub.db.postgresQuery( `INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id) VALUES ($1, $2, NULL, NULL, $3) ON CONFLICT DO NOTHING`, [new UUIDT().toString(), '$foobar', 2], @@ -342,7 +374,8 @@ describe('TeamManager()', () => { await teamManager.updateEventNamesAndProperties(2, '$newevent', {}) expect(teamManager.fetchTeam).toHaveBeenCalledTimes(1) - expect(hub.db.postgresQuery).toHaveBeenCalledTimes(1) + // extra query for `cacheEventNamesAndProperties` that we did manually before + expect(hub.db.postgresQuery).toHaveBeenCalledTimes(2) }) describe('first event has not yet been ingested', () => { diff --git a/posthog/migrations/0192_event_properties.py b/posthog/migrations/0192_event_properties.py new file mode 100644 index 0000000000000..110fda790dedb --- /dev/null +++ b/posthog/migrations/0192_event_properties.py @@ -0,0 +1,38 @@ +# Generated by Django 3.2.5 on 2021-12-03 09:12 + +import django.db.models.deletion +import django.utils.timezone +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("posthog", "0191_rename_specialmigration_asyncmigration"), + ] + + operations = [ + migrations.CreateModel( + name="EventProperty", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("event", models.CharField(max_length=400)), + ("property", models.CharField(max_length=400)), + ("team", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team",),), + ], + ), + migrations.AddIndex( + model_name="eventproperty", + index=models.Index(fields=["team", "event"], name="posthog_eve_team_id_22de03_idx"), + ), + migrations.AddIndex( + model_name="eventproperty", + index=models.Index(fields=["team", "property"], name="posthog_eve_team_id_26dbfb_idx"), + ), + migrations.AddConstraint( + model_name="eventproperty", + constraint=models.UniqueConstraint( + fields=("team", "event", "property"), name="posthog_event_property_unique_team_event_property" + ), + ), + ] diff --git a/posthog/models/__init__.py b/posthog/models/__init__.py index d1f35faadd32c..5b7f2c470306c 100644 --- a/posthog/models/__init__.py +++ b/posthog/models/__init__.py @@ -8,6 +8,7 @@ from .entity import Entity from .event import Event from .event_definition import EventDefinition +from .event_property import EventProperty from .experiment import Experiment from .feature_flag import FeatureFlag from .filters import Filter, RetentionFilter diff --git a/posthog/models/event_property.py b/posthog/models/event_property.py new file mode 100644 index 0000000000000..e3f8e050fa73d --- /dev/null +++ b/posthog/models/event_property.py @@ -0,0 +1,23 @@ +from django.db import models + +from posthog.models.team import Team +from posthog.models.utils import sane_repr + + +class EventProperty(models.Model): + team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) + event: models.CharField = models.CharField(max_length=400, null=False) + property: models.CharField = models.CharField(max_length=400, null=False) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["team", "event", "property"], name="posthog_event_property_unique_team_event_property" + ), + ] + indexes = [ + models.Index(fields=["team", "event"]), + models.Index(fields=["team", "property"]), + ] + + __repr__ = sane_repr("event", "property", "team_id") diff --git a/task-definition.plugins.json b/task-definition.plugins.json index 55b286b44dde4..fab8df38fc5e8 100644 --- a/task-definition.plugins.json +++ b/task-definition.plugins.json @@ -91,6 +91,10 @@ { "name": "PERSON_DISTINCT_ID_OPTIMIZATION_TEAM_IDS", "value": "2,2635" + }, + { + "name": "EXPERIMENTAL_EVENT_PROPERTY_COUNTER_ENABLED_TEAMS", + "value": "2" } ], "secrets": [