Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read metadata from object storage #10810

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions posthog/api/session_recording.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import dataclasses
from typing import Any, Union
import json
from datetime import datetime, timezone
from typing import Any, List, Union

from rest_framework import exceptions, request, response, serializers, viewsets
from rest_framework.decorators import action
Expand All @@ -8,14 +10,16 @@

from posthog.api.person import PersonSerializer
from posthog.api.routing import StructuredViewSetMixin
from posthog.helpers.session_recording import RecordingEventSummary, get_metadata_from_event_summaries
from posthog.models import Filter, PersonDistinctId
from posthog.models.filters.session_recordings_filter import SessionRecordingsFilter
from posthog.models.person import Person
from posthog.models.session_recording_event import SessionRecordingViewed
from posthog.permissions import ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission
from posthog.queries.session_recordings.session_recording import SessionRecording
from posthog.queries.session_recordings.session_recording import RecordingMetadata, SessionRecording
from posthog.queries.session_recordings.session_recording_list import SessionRecordingList
from posthog.utils import format_query_params_absolute_url
from posthog.storage.object_storage import list_all_objects, read, read_all
from posthog.utils import format_query_params_absolute_url, should_read_recordings_from_object_storage

DEFAULT_RECORDING_CHUNK_LIMIT = 20 # Should be tuned to find the best value

Expand Down Expand Up @@ -57,7 +61,44 @@ def _get_session_recording_snapshots(self, request, session_recording_id, limit,
request=request, team=self.team, session_recording_id=session_recording_id
).get_snapshots(limit, offset)

def _get_session_recording_metadata_from_object_storage(self, session_recording_id):
base_folder = f"session_recordings/team_id/{self.team_id}/session_id/{session_recording_id}/metadata/"
metadata_file = f"{base_folder}metadata.json"
metadata_file_content = read(metadata_file)
distinct_id = json.loads(metadata_file_content).get("distinctId")

event_summary_files = list_all_objects(f"{base_folder}event_summaries/")

event_summary_file_names = [object["Key"] for object in event_summary_files]
event_summary_file_contents = read_all(event_summary_file_names)
event_summary_strings = [
event_summary_string
for _, metadata_string in event_summary_file_contents
for event_summary_string in metadata_string.split("\n")
]
event_summary_json_string = "[" + ",".join(event_summary_strings) + "]"
event_summaries = json.loads(event_summary_json_string)
parsed_event_summaries: List[RecordingEventSummary] = [
RecordingEventSummary(
timestamp=datetime.fromtimestamp(event_summary.get("timestamp", 0) / 1000, timezone.utc),
window_id=event_summary.get("windowId", ""),
type=event_summary.get("eventType"),
source=event_summary.get("eventSource"),
)
for event_summary in event_summaries
]

segments, start_and_end_times_by_window_id = get_metadata_from_event_summaries(parsed_event_summaries)

return RecordingMetadata(
segments=segments,
start_and_end_times_by_window_id=start_and_end_times_by_window_id,
distinct_id=distinct_id,
)

def _get_session_recording_meta_data(self, request, session_recording_id):
if should_read_recordings_from_object_storage(self.team_id):
return self._get_session_recording_metadata_from_object_storage(session_recording_id)
return SessionRecording(
request=request, team=self.team, session_recording_id=session_recording_id
).get_metadata()
Expand Down Expand Up @@ -133,7 +174,7 @@ def retrieve(self, request: request.Request, *args: Any, **kwargs: Any) -> respo
persondistinctid__distinct_id=session_recording_meta_data.distinct_id,
persondistinctid__team_id=self.team,
team=self.team,
)
) if session_recording_meta_data.distinct_id else None
except Person.DoesNotExist:
person = None

Expand Down
188 changes: 168 additions & 20 deletions posthog/helpers/session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import gzip
import json
from collections import defaultdict
from datetime import datetime, timedelta
from typing import DefaultDict, Dict, Generator, List, Optional
from datetime import datetime, timedelta, timezone
from typing import DefaultDict, Dict, Generator, List, Optional, Tuple, cast

from sentry_sdk.api import capture_exception, capture_message

Expand Down Expand Up @@ -37,6 +37,14 @@ class EventActivityData:
is_active: bool


@dataclasses.dataclass
class RecordingEventSummary:
timestamp: datetime
window_id: str
type: int
source: Optional[int]


@dataclasses.dataclass
class RecordingSegment:
start_time: datetime
Expand Down Expand Up @@ -130,13 +138,68 @@ def decompress(base64data: str) -> str:
return gzip.decompress(compressed_bytes).decode("utf-16", "surrogatepass")


def get_event_summaries_from_compressed_snapshot_data(
team_id: int, session_recording_id: str, all_recording_events: List[SnapshotDataTaggedWithWindowId],
) -> List[RecordingEventSummary]:
"""
This function takes in chunked + compresses snapshot data and returns
a list of event summaries that can be used for metadata calculation.

It tries to do this conversion in a way that never holds an entire decompressed recording
to decrease the memory usage.
"""

if len(all_recording_events) == 0:
return []

# Split decompressed recording events into their chunks
chunks_collector: DefaultDict[str, List[SnapshotDataTaggedWithWindowId]] = defaultdict(list)
for event in all_recording_events:
chunks_collector[event.snapshot_data["chunk_id"]].append(event)

chunk_list: List[List[SnapshotDataTaggedWithWindowId]] = list(chunks_collector.values())

event_summaries: List[RecordingEventSummary] = []

# Decompress the chunks and split the resulting events by window_id
for chunk in chunk_list:
window_id = chunk[0].window_id
if len(chunk) != chunk[0].snapshot_data["chunk_count"]:
capture_message(
"Did not find all session recording chunks! Team: {}, Session: {}, Chunk-id: {}. Found {} of {} expected chunks".format(
team_id,
session_recording_id,
chunk[0].snapshot_data["chunk_id"],
len(chunk),
chunk[0].snapshot_data["chunk_count"],
)
)
continue

b64_compressed_data = "".join(
chunk.snapshot_data["data"] for chunk in sorted(chunk, key=lambda c: c.snapshot_data["chunk_index"])
)
decompressed_data = json.loads(decompress(b64_compressed_data))
event_summaries.extend(
[
RecordingEventSummary(
timestamp=datetime.fromtimestamp(recording_event.get("timestamp", 0) / 1000, timezone.utc),
window_id=window_id,
type=recording_event.get("type"),
source=recording_event.get("data", {}).get("source"),
)
for recording_event in decompressed_data
]
)
return event_summaries


def decompress_chunked_snapshot_data(
team_id: int,
session_recording_id: str,
all_recording_events: List[SnapshotDataTaggedWithWindowId],
limit: Optional[int] = None,
offset: int = 0,
return_only_activity_data: bool = False,
) -> DecompressedRecordingData:
"""
Before data is stored in clickhouse, it is compressed and then chunked. This function
Expand All @@ -146,8 +209,7 @@ def decompress_chunked_snapshot_data(
you can't decompress an incomplete chunk).

Depending on the size of the recording, this function can return a lot of data. To decrease the
memory used, you should either use the pagination parameters or pass in 'return_only_activity_data' which
drastically reduces the size of the data returned if you only want the activity data (used for metadata calculation)
memory used, you should use the pagination parameters.
"""

if len(all_recording_events) == 0:
Expand Down Expand Up @@ -194,21 +256,11 @@ def decompress_chunked_snapshot_data(
)
decompressed_data = json.loads(decompress(b64_compressed_data))

# Decompressed data can be large, and in metadata calculations, we only care if the event is "active"
# This pares down the data returned, so we're not passing around a massive object
if return_only_activity_data:
events_with_only_activity_data = [
{"timestamp": recording_event.get("timestamp"), "is_active": is_active_event(recording_event)}
for recording_event in decompressed_data
]
snapshot_data_by_window_id[chunks[0].window_id].extend(events_with_only_activity_data)

else:
snapshot_data_by_window_id[chunks[0].window_id].extend(decompressed_data)
snapshot_data_by_window_id[chunks[0].window_id].extend(decompressed_data)
return DecompressedRecordingData(has_next=has_next, snapshot_data_by_window_id=snapshot_data_by_window_id)


def is_active_event(event: SnapshotData) -> bool:
def is_active_event(event: RecordingEventSummary) -> bool:
"""
Determines which rr-web events are "active" - meaning user generated
"""
Expand All @@ -222,21 +274,21 @@ def is_active_event(event: SnapshotData) -> bool:
7, # "MediaInteraction"
12, # "Drag"
]
return event.get("type") == 3 and event.get("data", {}).get("source") in active_rr_web_sources
return event.type == 3 and event.source in active_rr_web_sources


ACTIVITY_THRESHOLD_SECONDS = 60


def get_active_segments_from_event_list(
event_list: List[EventActivityData], window_id: WindowId, activity_threshold_seconds=ACTIVITY_THRESHOLD_SECONDS
event_list: List[RecordingEventSummary], window_id: WindowId, activity_threshold_seconds=ACTIVITY_THRESHOLD_SECONDS
) -> List[RecordingSegment]:
"""
Processes a list of events for a specific window_id to determine
the segments of the recording where the user is "active". And active segment ends
when there isn't another active event for activity_threshold_seconds seconds
"""
active_event_timestamps = [event.timestamp for event in event_list if event.is_active]
active_event_timestamps = [event.timestamp for event in event_list if is_active_event(event)]

active_recording_segments: List[RecordingSegment] = []
current_active_segment: Optional[RecordingSegment] = None
Expand Down Expand Up @@ -368,3 +420,99 @@ def get_session_recording_events_for_object_storage(
)
)
return recording_events_for_object_storage


def get_metadata_from_event_summaries(
event_summaries: List[RecordingEventSummary],
) -> Tuple[List[RecordingSegment], Dict[WindowId, Dict]]:
"""
This function processes the recording events into metadata.

A recording can be composed of events from multiple windows/tabs. Recording events are seperated by
`window_id`, so the playback experience is consistent (changes in one tab don't impact the recording
of a different tab). However, we still want to playback the recording to the end user as the user interacted
with their product.

This function creates a "playlist" of recording segments that designates the order in which the front end
should flip between players of different windows/tabs. To create this playlist, this function does the following:

(1) For each recording event, we determine if it is "active" or not. An active event designates user
activity (e.g. mouse movement).

(2) We then generate "active segments" based on these lists of events. Active segments are segments
of recordings where the maximum time between events determined to be active is less than a threshold (set to 60 seconds).

(3) Next, we merge the active segments from all of the window_ids + sort them by start time. We now have the
list of active segments. (note, it's very possible that active segments overlap if a user is flipping back
and forth between tabs)

(4) To complete the recording, we fill in the gaps between active segments with "inactive segments". In
determining which window should be used for the inactive segment, we try to minimize the switching of windows.
"""

event_summaries_by_window_id: DefaultDict[WindowId, List[RecordingEventSummary]] = defaultdict(list)
for event_summary in event_summaries:
event_summaries_by_window_id[event_summary.window_id].append(event_summary)

start_and_end_times_by_window_id = {}

# Get the active segments for each window_id
all_active_segments: List[RecordingSegment] = []
for window_id, event_list in event_summaries_by_window_id.items():
# Not sure why, but events are sometimes slightly out of order
event_list.sort(key=lambda x: x.timestamp)

active_segments_for_window_id = get_active_segments_from_event_list(event_list, window_id)

all_active_segments.extend(active_segments_for_window_id)

start_and_end_times_by_window_id[window_id] = {
"start_time": event_list[0].timestamp,
"end_time": event_list[-1].timestamp,
}

# Sort the active segments by start time. This will interleave active segments
# from different windows
all_active_segments.sort(key=lambda segment: segment.start_time)

# These start and end times are used to make sure the segments span the entire recording
first_start_time = min([cast(datetime, x["start_time"]) for x in start_and_end_times_by_window_id.values()])
last_end_time = max([cast(datetime, x["end_time"]) for x in start_and_end_times_by_window_id.values()])

# Now, we fill in the gaps between the active segments with inactive segments
all_segments = []
current_timestamp = first_start_time
current_window_id: WindowId = sorted(
start_and_end_times_by_window_id, key=lambda x: start_and_end_times_by_window_id[x]["start_time"]
)[0]

for index, segment in enumerate(all_active_segments):
# It's possible that segments overlap and we don't need to fill a gap
if segment.start_time > current_timestamp:
all_segments.extend(
generate_inactive_segments_for_range(
current_timestamp,
segment.start_time,
current_window_id,
start_and_end_times_by_window_id,
is_first_segment=index == 0,
)
)
all_segments.append(segment)
current_window_id = segment.window_id
current_timestamp = max(segment.end_time, current_timestamp)

# If the last segment ends before the recording ends, we need to fill in the gap
if current_timestamp < last_end_time:
all_segments.extend(
generate_inactive_segments_for_range(
current_timestamp,
last_end_time,
current_window_id,
start_and_end_times_by_window_id,
is_last_segment=True,
is_first_segment=current_timestamp == first_start_time,
)
)

return all_segments, start_and_end_times_by_window_id
Loading