Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a session recording metadata table #10294

Closed
wants to merge 26 commits into from

Conversation

pauldambra
Copy link
Member

Problem

see #2142 or https://github.com/PostHog/product-internal/pull/316 for context

Changes

Adds a session_recording_metadata table. This has similar schema and setup to the session_recording_events table. Adds storage of start and end time for a session and a column to store data about snapshot locations

table is populated via kafka and uses ReplacingMergeTree engine. If processing a session more than once this would allow a client to load the metadata, alter it, and write a "new" metadata row for the table

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

How did you test this code?

running it to see that the tables are created

@pauldambra pauldambra requested a review from hazzadous June 14, 2022 12:00
SESSION_RECORDING_METADATA_TABLE_SQL = lambda: (
SESSION_RECORDING_METADATA_TABLE_BASE_SQL
+ """PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (toHour(timestamp), session_id, timestamp, uuid)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should order by have team id since we will query by team id and session id?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. See ee/clickhouse/sql/session_recording_events.py - this should be the first key.

@pauldambra pauldambra requested a review from macobo June 14, 2022 12:01
@pauldambra
Copy link
Member Author

Not certain how best to test this...

@hazzadous what's the best way to create new kafka topics... Terraform to create in staging and then manually in prod?

@hazzadous
Copy link
Contributor

@pauldambra this is manual at the moment. Yakko was implementing something here although it looks like it still needs work. Something along these lines would be preferable over Terraform IMO so we can handle this uniformly across the types of deployments.

@hazzadous
Copy link
Contributor

(fwiw I believe self-hosted may be configured to automatically create topics although I'd need to check that)

SESSION_RECORDING_METADATA_TABLE_BASE_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
(
uuid UUID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this UUID? Who sets it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! It's the UUID of the snapshot event... so redundant here!

window_id VARCHAR,
session_start DateTime64(6, 'UTC'),
session_end DateTime64(6, 'UTC'),
snapshot_data_location VARCHAR -- no trailing comma, extra_fields leads with one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this comment? This would generate confusion in the SQL log.

SESSION_RECORDING_METADATA_TABLE_BASE_SQL
+ """PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (toHour(timestamp), session_id, timestamp, uuid)
SETTINGS index_granularity=512
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the lower granularity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was copied from session_recording_events :)

I guess it's in there to reduce the amount of snapshot_data CH reads unnecessarily while querying. And so not applicable to this table

""".format(
target_table=(
"writable_session_recording_metadata"
if settings.CLICKHOUSE_REPLICATION
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume this is always true in this code.

from ee.kafka_client.topics import KAFKA_SESSION_RECORDING_METADATA

SESSION_RECORDING_METADATA_DATA_TABLE = (
lambda: "sharded_session_recording_metadata" if settings.CLICKHOUSE_REPLICATION else "session_recording_metadata"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build this as if CLICKHOUSE_REPLICATION was true.

migrations.RunSQL(SESSION_RECORDING_METADATA_TABLE_MV_SQL()),
]

if CLICKHOUSE_REPLICATION:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build this as if CLICKHOUSE_REPLICATION was true.

@pauldambra
Copy link
Member Author

https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/

Says "Process streams as they become available."

Which I'm reading as "it is safe to add ClickHouse changes before the kafka topic is available"

CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
(
uuid UUID,
timestamp DateTime64(6, 'UTC'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this timestamp represent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with UUID. It's the snapshot event timestamp. Not applicable here!

SESSION_RECORDING_METADATA_TABLE_SQL = lambda: (
SESSION_RECORDING_METADATA_TABLE_BASE_SQL
+ """PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (toHour(timestamp), session_id, timestamp, uuid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This order by key is out of whack. Basically this should match your list/fetch query for efficient lookups.

  1. Include team_id
  2. What is timestamp? I don't think this would be used in any queries, meaning all queries devolve into full table scans with this order by?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting...

This is copied over from session_recording_events

When loading the snapshot data we query by team_id and session_id.

But when listing sessions...

MIN(timestamp) AS start_time,
MAX(timestamp) AS end_time,
dateDiff('second', toDateTime(MIN(timestamp)), toDateTime(MAX(timestamp))) as duration,

The listing API allows date_from and date_to filters

we query on the API filter date_from being after the recording start_time and the API filter date_to being before the recording end_time. Both explicitly on the aggregates above and implicitly by searching for snapshot_event timestamp is within that range.

So we would be querying by start and end time... (and by duration which needs adding to table setup in this PR)

In case it affects order by we'll be joining this table with the events table on distinct_id

SESSION_RECORDING_METADATA_TABLE_SQL = lambda: (
SESSION_RECORDING_METADATA_TABLE_BASE_SQL
+ """PARTITION BY toYYYYMMDD(session_end)
ORDER BY (team_id, session_id, session_start, session_end)
Copy link
Contributor

@macobo macobo Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort key doesn't work for listing sessions - all queries basically devolve into full table scans since session_id is unique per session and doesn't help with filtering. Having session_start/session_end after that in the sort key doesn't affect behavior at all.

Can you list out what the full list and fetch queries look like against this table? We should design the sort key accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading the recordings list page

payload by default includes date_from: -7d and
session_recording_duration: {"type":"recording","key":"duration","value":60,"operator":"gt"} and lets you edit these as well as adding person/cohort and event filters

These get templated into ClickhouseSessionRecordingList

We'll be joining between session_recording_events and session_recording_metadata. Ignoring the join (to make the query smaller here) this will generate a query close to:

        SELECT
            session_id,
            window_id,
            session_start,
            session_end,
            duration,
            distinct_id,
        FROM session_recording_metadata
        WHERE team_id = %(team_id)s
        AND session_start >= %(start_time)s --defaults to seven days ago
        AND session_end <= %(end_time)s -- defaults to end of today
        AND duration > %(recording_duration)s --defaults to one minute

To generate the session recording list. That would be joined with the equivalent for session_recording_events and person. And sometimes with events. To allow filtering. I'm assuming you don't need that context here

Loading Snapshot Data (playback)

(if we don't|before we) calculate and load direct from object storage

    SELECT session_id, window_id, snapshot_data_location
    FROM session_recording_metadata
        WHERE
            team_id = %(team_id)s
            AND session_id = %(session_id)s
        ORDER BY session_start

API expects to use this query to get the snapshot_data, decompress it, reconstruct the chunks, and then return paged chunks grouped by window_id

Loading Metadata

This follows "Loading Snapshot Data" and then calculates "segments", and "start and end times by window id"

For session_recording_metadata these could be written as JSON to CH or to object_storage and loaded directly. Probably best in object storage to support loading everything for a session without going to DB

Copy link
Contributor

@macobo macobo Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for partition and sort key based on these queries:

PARTITION BY toYYYYMMDD(session_start)
ORDER BY (team_id, toStartOfHour(session_start), session_id)

Why this ordering? It's optimized for the list query - we only look at the hours of data where a session started.

I also include session_id for the single session fetching.

@pauldambra
Copy link
Member Author

I wanted to see this work locally before proposing to merge it...

But I get an error whether I use a python or console producer. With the script

producer = KafkaProducer(
            retries=KAFKA_PRODUCER_RETRIES,
            bootstrap_servers=KAFKA_HOSTS,
            security_protocol="PLAINTEXT",
            request_timeout_ms=2000,
        )

        producer.send(
            topic=KAFKA_SESSION_RECORDING_METADATA,
            value=json.dumps(
                {
                    "team_id": 1,
                    "distinct_id": "12345",
                    "session_id": "12345",
                    "window_id": "12345",
                    "session_start": "2012-04-01T12:34:56",
                    "session_end": "2012-04-01T16:34:56",
                    "duration": 4,
                    "snapshot_data_location": "somewhere",
                }
            ).encode("utf-8"),
            key="12345".encode("utf-8"),
        )

        query_result = []
        while not query_result:
            query_result = sync_execute(
                """
                select * from session_recording_metadata
                """
            )
            print(f"query result is: {query_result}")
            time.sleep(1)

I can see that the message reaches kafka but ClickHouse prints an error

Code: 41. DB::ParsingException: Cannot parse datetime: Cannot parse DateTime from String: while converting source column _timestamp to destination column _timestamp: while executing 'FUNCTION _CAST(_timestamp :: 7, DateTime :: 9) -> _CAST(_timestamp, DateTime) DateTime : 10': while pushing to view default.session_recording_metadata_mv (2c160edd-2e26-4dfe-b832-c784ada963a7). (CANNOT_PARSE_DATETIME), Stack trace (when copying this message, always include the lines below):

cc @macobo in case this means anything to you :)

@pauldambra
Copy link
Member Author

The materialized view table was missing a comma in the definition. This didn't fail the migration because it meant _timestamp was actually being defined as an alias for snapshot_data_location

Screenshot 2022-06-16 at 12 49 51

I've now produced to the kafka topic from Python and using the console producer in the bitnami kafka image and seen the data arrive in the table

@pauldambra pauldambra requested a review from macobo June 16, 2022 11:53
@pauldambra pauldambra marked this pull request as ready for review June 16, 2022 11:53
Copy link
Contributor

@macobo macobo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable to me.

Couple of considerations:

  • It might be wise to hold off merging this until you have other prototype PRs ready in case there's anything needing fixuping here. Adding additional migrations is more expensive.
  • If you have a review buddy for this project, have them review this as well!

@macobo
Copy link
Contributor

macobo commented Jun 17, 2022

Actually, please also update posthog/clickhouse/schema.py.

Note that @EDsCODE also seems to have recently updated code conventions around these files so paths likely need updating.

@posthog-bot
Copy link
Contributor

This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in another week.

@posthog-bot
Copy link
Contributor

This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in another week.

@posthog-bot
Copy link
Contributor

This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in another week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants