Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a session recording metadata table #10294

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ea37715
quick first steps of adding a snapshot_data table
pauldambra Jun 10, 2022
6f7e6f2
feat: add a session_recordings_metadata table
pauldambra Jun 14, 2022
dbbc03a
remove sql comment
pauldambra Jun 14, 2022
b48d00c
remove uuid and timestamp
pauldambra Jun 14, 2022
e8b46e7
remove index granularity setting
pauldambra Jun 14, 2022
037531c
pre-calculate duration
pauldambra Jun 14, 2022
ae430d9
assume replicated
pauldambra Jun 14, 2022
dfedf6b
does order of order by clauses matter (big to small to affect locatio…
pauldambra Jun 14, 2022
563baa5
why tohour on date after date
pauldambra Jun 14, 2022
fffd962
can query by only start or only end so need both in order?
pauldambra Jun 14, 2022
c57a062
Merge branch 'master' into feat/metadata-table
pauldambra Jun 14, 2022
a8491a3
move file to non-ee location
pauldambra Jun 14, 2022
57d9af9
remove unnecessary format parameters
pauldambra Jun 15, 2022
6bcb58e
update partition and orber by
pauldambra Jun 15, 2022
fa0e137
slightly more deliberately
pauldambra Jun 15, 2022
13a4af6
rename file to match others
pauldambra Jun 16, 2022
98938da
add a comma
pauldambra Jun 16, 2022
68a9074
Merge branch 'master' into feat/metadata-table
pauldambra Jun 17, 2022
59d1013
Merge branch 'master' into feat/metadata-table
pauldambra Jun 17, 2022
717c48c
Merge branch 'master' into feat/metadata-table
pauldambra Jun 17, 2022
30226e3
Merge branch 'master' into feat/metadata-table
pauldambra Jun 21, 2022
13427dd
add calculated metadata to table
pauldambra Jun 21, 2022
bde70e8
fix column definitions
pauldambra Jun 21, 2022
410bd74
metadata is across windows
pauldambra Jun 21, 2022
a4f4ac5
it's a not a metadata table, it's a table with metadata
pauldambra Jun 21, 2022
64a26fb
merge from master
pauldambra Jun 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ee/clickhouse/migrations/0030_session_recording_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from infi.clickhouse_orm import migrations

from posthog.models.session_recordings.sql import (
DISTRIBUTED_SESSION_RECORDINGS_TABLE_SQL,
KAFKA_SESSION_RECORDINGS_TABLE_SQL,
SESSION_RECORDINGS_TABLE_MV_SQL,
SESSION_RECORDINGS_TABLE_SQL,
WRITABLE_SESSION_RECORDINGS_TABLE_SQL,
)

operations = [
migrations.RunSQL(SESSION_RECORDINGS_TABLE_SQL()),
migrations.RunSQL(KAFKA_SESSION_RECORDINGS_TABLE_SQL()),
migrations.RunSQL(SESSION_RECORDINGS_TABLE_MV_SQL()),
migrations.RunSQL(WRITABLE_SESSION_RECORDINGS_TABLE_SQL()),
migrations.RunSQL(DISTRIBUTED_SESSION_RECORDINGS_TABLE_SQL()),
]
Empty file added ee/kafka_client/topics.py
Empty file.
1 change: 1 addition & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const KAFKA_PERSON = `${prefix}clickhouse_person${suffix}`
export const KAFKA_PERSON_UNIQUE_ID = `${prefix}clickhouse_person_unique_id${suffix}`
export const KAFKA_PERSON_DISTINCT_ID = `${prefix}clickhouse_person_distinct_id${suffix}`
export const KAFKA_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
export const KAFKA_SESSION_RECORDING_METADATA = `${prefix}clickhouse_session_recording_metadata${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION = `${prefix}events_plugin_ingestion${suffix}`
export const KAFKA_PLUGIN_LOG_ENTRIES = `${prefix}plugin_log_entries${suffix}`
export const KAFKA_EVENTS_DEAD_LETTER_QUEUE = `${prefix}events_dead_letter_queue${suffix}`
Expand Down
1 change: 1 addition & 0 deletions posthog/kafka_client/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
KAFKA_PERSON_UNIQUE_ID = f"{KAFKA_PREFIX}clickhouse_person_unique_id{SUFFIX}"
KAFKA_PERSON_DISTINCT_ID = f"{KAFKA_PREFIX}clickhouse_person_distinct_id{SUFFIX}"
KAFKA_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}"
KAFKA_SESSION_RECORDINGS = f"{KAFKA_PREFIX}clickhouse_session_recordings{SUFFIX}"
KAFKA_PLUGIN_LOG_ENTRIES = f"{KAFKA_PREFIX}plugin_log_entries{SUFFIX}"
KAFKA_DEAD_LETTER_QUEUE = f"{KAFKA_PREFIX}events_dead_letter_queue{SUFFIX}"
KAFKA_GROUPS = f"{KAFKA_PREFIX}clickhouse_groups{SUFFIX}"
Empty file.
81 changes: 81 additions & 0 deletions posthog/models/session_recordings/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from django.conf import settings

from posthog.kafka_client.topics import KAFKA_SESSION_RECORDINGS
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, kafka_engine
from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree, ReplicationScheme

SESSION_RECORDINGS_DATA_TABLE = "sharded_session_recordings"

SESSION_RECORDINGS_TABLE_BASE_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
(
team_id Int64,
distinct_id VARCHAR,
session_id VARCHAR,
session_start DateTime64(6, 'UTC'),
session_end DateTime64(6, 'UTC'),
duration Int64,
metadata VARCHAR,
snapshot_data_location VARCHAR
{extra_fields}
) ENGINE = {engine}
"""

SESSION_RECORDINGS_DATA_TABLE_ENGINE = lambda: ReplacingMergeTree(
"session_recordings", ver="_timestamp", replication_scheme=ReplicationScheme.SHARDED
)

SESSION_RECORDINGS_TABLE_SQL = lambda: (
SESSION_RECORDINGS_TABLE_BASE_SQL
+ """PARTITION BY toYYYYMMDD(session_start)
ORDER BY (team_id, toStartOfHour(session_start), session_id)
"""
).format(
table_name=SESSION_RECORDINGS_DATA_TABLE,
cluster=settings.CLICKHOUSE_CLUSTER,
extra_fields=KAFKA_COLUMNS,
engine=SESSION_RECORDINGS_DATA_TABLE_ENGINE(),
)

KAFKA_SESSION_RECORDINGS_TABLE_SQL = lambda: SESSION_RECORDINGS_TABLE_BASE_SQL.format(
table_name="kafka_session_recordings",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=kafka_engine(topic=KAFKA_SESSION_RECORDINGS),
extra_fields="",
)

SESSION_RECORDINGS_TABLE_MV_SQL = lambda: """
CREATE MATERIALIZED VIEW session_recordings_mv ON CLUSTER '{cluster}'
TO {database}.{target_table}
AS SELECT
team_id,
distinct_id,
session_id,
session_start,
session_end,
duration,
snapshot_data_location,
_timestamp,
_offset
FROM {database}.kafka_session_recordings
""".format(
target_table=("writable_session_recordings"),
cluster=settings.CLICKHOUSE_CLUSTER,
database=settings.CLICKHOUSE_DATABASE,
)

# This table is responsible for writing to sharded_session_recordings based on a sharding key.
WRITABLE_SESSION_RECORDINGS_TABLE_SQL = lambda: SESSION_RECORDINGS_TABLE_BASE_SQL.format(
table_name="writable_session_recordings",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=Distributed(data_table=SESSION_RECORDINGS_DATA_TABLE, sharding_key="sipHash64(session_id)"),
extra_fields=KAFKA_COLUMNS,
)

# This table is responsible for reading from session_recording_events on a cluster setting
DISTRIBUTED_SESSION_RECORDINGS_TABLE_SQL = lambda: SESSION_RECORDINGS_TABLE_BASE_SQL.format(
table_name="session_recordings",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=Distributed(data_table=SESSION_RECORDINGS_DATA_TABLE, sharding_key="sipHash64(session_id)"),
extra_fields=KAFKA_COLUMNS,
)