Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Refactor event names and properties #331

Merged
merged 17 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}
key: ${{ env.pythonLocation }}-v1-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
Expand Down Expand Up @@ -151,7 +151,7 @@ jobs:
- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}
key: ${{ env.pythonLocation }}-v1-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
Expand Down Expand Up @@ -241,7 +241,7 @@ jobs:
- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}
key: ${{ env.pythonLocation }}-v1-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
Expand Down Expand Up @@ -332,7 +332,7 @@ jobs:
- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}
key: ${{ env.pythonLocation }}-v1-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
Expand Down Expand Up @@ -423,7 +423,7 @@ jobs:
- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}
key: ${{ env.pythonLocation }}-v1-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
Expand Down Expand Up @@ -499,7 +499,7 @@ jobs:
- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}
key: ${{ env.pythonLocation }}-v1-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
Expand Down Expand Up @@ -574,7 +574,7 @@ jobs:
- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}
key: ${{ env.pythonLocation }}-v1-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
Expand Down Expand Up @@ -649,7 +649,7 @@ jobs:
- uses: actions/cache@v2
with:
path: ${{ env.pythonLocation }}
key: ${{ env.pythonLocation }}-${{ hashFiles('posthog/requirements.txt') }}
key: ${{ env.pythonLocation }}-v1-${{ hashFiles('posthog/requirements.txt') }}

- name: Install requirements.txt dependencies with pip
run: |
Expand Down
13 changes: 13 additions & 0 deletions src/shared/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import {
Element,
ElementGroup,
Event,
EventDefinitionType,
Person,
PersonDistinctId,
PluginConfig,
PluginLogEntry,
PluginLogEntrySource,
PluginLogEntryType,
PostgresSessionRecordingEvent,
PropertyDefinitionType,
RawOrganization,
RawPerson,
SessionRecordingEvent,
Expand Down Expand Up @@ -642,4 +644,15 @@ export class DB {

return entry
}

public async fetchEventDefinitions(): Promise<EventDefinitionType[]> {
mariusandra marked this conversation as resolved.
Show resolved Hide resolved
return (await this.postgresQuery('SELECT * FROM posthog_eventdefinition', undefined, 'fetchEventDefinitions'))
.rows as EventDefinitionType[]
}

public async fetchPropertyDefinitions(): Promise<PropertyDefinitionType[]> {
return (
await this.postgresQuery('SELECT * FROM posthog_propertydefinition', undefined, 'fetchPropertyDefinitions')
).rows as PropertyDefinitionType[]
}
}
22 changes: 17 additions & 5 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,6 @@ export interface Team {
api_token: string
app_urls: string[]
completed_snippet_onboarding: boolean
event_names: string[]
event_properties: string[]
event_properties_numerical: string[]
event_names_with_usage: EventUsage[]
event_properties_with_usage: PropertyUsage[]
opt_out_capture: boolean
slack_incoming_webhook: string
session_recording_opt_in: boolean
Expand Down Expand Up @@ -415,3 +410,20 @@ export interface ScheduleControl {
}

export type IngestEventResponse = { success?: boolean; error?: string }

export interface EventDefinitionType {
id: string
name: string
volume_30_day: number | null
query_usage_30_day: number | null
team_id: number
}

export interface PropertyDefinitionType {
id: string
name: string
is_numerical: boolean
volume_30_day: number | null
query_usage_30_day: number | null
team_id: number
}
4 changes: 2 additions & 2 deletions src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ export class EventsProcessor {
}))
}

const team = await this.teamManager.fetchTeam(teamId, eventUuid)
const team = await this.teamManager.fetchTeam(teamId)

if (!team) {
throw new Error(`No team found with ID ${teamId}. Can't ingest event.`)
Expand All @@ -383,7 +383,7 @@ export class EventsProcessor {
properties['$ip'] = ip
}

await this.teamManager.updateEventNamesAndProperties(teamId, event, eventUuid, properties, this.posthog)
await this.teamManager.updateEventNamesAndProperties(teamId, event, properties, this.posthog)

if (await this.personManager.isNewPerson(this.db, teamId, distinctId)) {
// Catch race condition where in between getting and creating, another request already created this user
Expand Down
125 changes: 54 additions & 71 deletions src/worker/ingestion/team-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@ import { nodePostHog } from 'posthog-js-lite/dist/src/targets/node'

import { DB } from '../../shared/db'
import { timeoutGuard } from '../../shared/ingestion/utils'
import { UUIDT } from '../../shared/utils'
import { Team, TeamId } from '../../types'

interface TeamWithEventUuid extends Team {
__fetch_event_uuid?: string
}
type TeamCache<T> = Map<TeamId, [T, number]>

export class TeamManager {
db: DB
teamCache: TeamCache<TeamWithEventUuid | null>
teamCache: TeamCache<Team | null>
eventNamesCache: Map<TeamId, Set<string>>
eventPropertiesCache: Map<TeamId, Set<string>>
shouldSendWebhooksCache: TeamCache<boolean>

constructor(db: DB) {
this.db = db
this.teamCache = new Map()
this.eventNamesCache = new Map()
this.eventPropertiesCache = new Map()
this.shouldSendWebhooksCache = new Map()
}

public async fetchTeam(teamId: number, eventUuid?: string): Promise<TeamWithEventUuid | null> {
public async fetchTeam(teamId: number): Promise<Team | null> {
const cachedTeam = this.getByAge(this.teamCache, teamId)
if (cachedTeam) {
return cachedTeam
Expand All @@ -34,10 +36,7 @@ export class TeamManager {
[teamId],
'selectTeam'
)
const team: TeamWithEventUuid | null = teamQueryResult.rows[0] || null
if (team) {
team.__fetch_event_uuid = eventUuid
}
const team: Team | null = teamQueryResult.rows[0] || null

this.teamCache.set(teamId, [team, Date.now()])
return team
Expand Down Expand Up @@ -87,11 +86,10 @@ export class TeamManager {
public async updateEventNamesAndProperties(
teamId: number,
event: string,
eventUuid: string,
properties: Properties,
posthog: ReturnType<typeof nodePostHog>
): Promise<void> {
let team: TeamWithEventUuid | null = await this.fetchTeam(teamId)
const team: Team | null = await this.fetchTeam(teamId)

if (!team) {
return
Expand All @@ -101,42 +99,36 @@ export class TeamManager {
event: event,
ingested: team.ingested_event,
})
let shouldUpdate = this.calculateUpdates(team, event, properties)
if (shouldUpdate && team.__fetch_event_uuid !== eventUuid) {
mariusandra marked this conversation as resolved.
Show resolved Hide resolved
// :TRICKY: Double-check if we're updating based on cached data, if so, re-validate.
// :TODO: Switch all of this to a sane schema that can be updated without races.
this.teamCache.delete(teamId)
team = await this.fetchTeam(teamId, eventUuid)
shouldUpdate = this.calculateUpdates(team, event, properties)
}
if (team && shouldUpdate) {
const timeout2 = timeoutGuard(
'Still running "updateEventNamesAndProperties" save. Timeout warning after 30 sec!',
{ event }
)

await this.cacheEventNamesAndProperties(team.id)

if (!this.eventNamesCache.get(team.id)?.has(event)) {
await this.db.postgresQuery(
`UPDATE posthog_team SET
ingested_event = $1,
event_names = $2,
event_names_with_usage = $3,
event_properties = $4,
event_properties_with_usage = $5,
event_properties_numerical = $6
WHERE id = $7`,
[
true,
JSON.stringify(team.event_names),
JSON.stringify(team.event_names_with_usage),
JSON.stringify(team.event_properties),
JSON.stringify(team.event_properties_with_usage),
JSON.stringify(team.event_properties_numerical),
team.id,
],
'updateEventNamesAndProperties'
`INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id) VALUES ($1, $2, NULL, NULL, $3) ON CONFLICT DO NOTHING`,
[new UUIDT().toString(), event, team.id],
'insertEventDefinition'
)
clearTimeout(timeout2)
this.eventNamesCache.get(team.id)?.add(event)
}

for (const [key, value] of Object.entries(properties)) {
if (!this.eventPropertiesCache.get(team.id)?.has(key)) {
await this.db.postgresQuery(
`INSERT INTO posthog_propertydefinition (id, name, is_numerical, volume_30_day, query_usage_30_day, team_id) VALUES ($1, $2, $3, NULL, NULL, $4) ON CONFLICT DO NOTHING`,
[new UUIDT().toString(), key, typeof value === 'number', team.id],
'insertPropertyDefinition'
)
mariusandra marked this conversation as resolved.
Show resolved Hide resolved
this.eventPropertiesCache.get(team.id)?.add(key)
}
}

if (team && !team.ingested_event) {
await this.db.postgresQuery(
`UPDATE posthog_team SET ingested_event = $1 WHERE id = $2`,
[true, team.id],
mariusandra marked this conversation as resolved.
Show resolved Hide resolved
'setTeamIngestedEvent'
)

// First event for the team captured
const organizationMembers = await this.db.postgresQuery(
'SELECT distinct_id FROM posthog_user JOIN posthog_organizationmembership ON posthog_user.id = posthog_organizationmembership.user_id WHERE organization_id = $1',
Expand All @@ -157,37 +149,28 @@ export class TeamManager {
clearTimeout(timeout)
}

private calculateUpdates(team: Team | null, event: string, properties: Properties): boolean {
if (!team) {
return false
}

let shouldUpdate = false
if (!team.ingested_event) {
shouldUpdate = true
public async cacheEventNamesAndProperties(teamId: number): Promise<void> {
let eventNamesCache = this.eventNamesCache.get(teamId)
if (!eventNamesCache) {
const eventNames = await this.db.postgresQuery(
'SELECT name FROM posthog_eventdefinition WHERE team_id = $1',
[teamId],
'fetchEventDefinitions'
)
eventNamesCache = new Set(eventNames.rows.map((r) => r.name))
this.eventNamesCache.set(teamId, eventNamesCache)
}

if (team.event_names && !team.event_names.includes(event)) {
shouldUpdate = true
team.event_names.push(event)
team.event_names_with_usage.push({ event: event, usage_count: null, volume: null })
}
for (const [key, value] of Object.entries(properties)) {
if (!team.event_properties || !team.event_properties.includes(key)) {
team.event_properties.push(key)
team.event_properties_with_usage.push({ key: key, usage_count: null, volume: null })
shouldUpdate = true
}
if (
typeof value === 'number' &&
(!team.event_properties_numerical || !team.event_properties_numerical.includes(key))
) {
team.event_properties_numerical.push(key)
shouldUpdate = true
}
let eventPropertiesCache = this.eventPropertiesCache.get(teamId)
if (!eventPropertiesCache) {
const eventProperties = await this.db.postgresQuery(
'SELECT name FROM posthog_propertydefinition WHERE team_id = $1',
[teamId],
'fetchPropertyDefinitions'
)
eventPropertiesCache = new Set(eventProperties.rows.map((r) => r.name))
this.eventPropertiesCache.set(teamId, eventPropertiesCache)
}

return shouldUpdate
}

private getByAge<K, V>(cache: Map<K, [V, number]>, key: K, maxAgeMs = 30_000): V | undefined {
Expand Down
1 change: 1 addition & 0 deletions src/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export const createTaskRunner = (server: PluginsServer): TaskWorker => async ({
if (task === 'flushKafkaMessages') {
await server.kafkaProducer?.flush()
}

server.statsd?.timing(`piscina_task.${task}`, timer)
return response
}
2 changes: 2 additions & 0 deletions tests/helpers/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export async function resetTestDatabase(
${config.ENABLE_PERSISTENT_CONSOLE ? 'DELETE FROM posthog_pluginlogentry;' : '' /* TODO: remove this if */}
DELETE FROM posthog_pluginconfig;
DELETE FROM posthog_plugin;
DELETE FROM posthog_eventdefinition;
DELETE FROM posthog_propertydefinition;
DELETE FROM posthog_team;
DELETE FROM posthog_organizationmembership;
DELETE FROM posthog_organization;
Expand Down
Loading