From 8ac1294e8346d21d3aef22bd6bc8115863db2c8c Mon Sep 17 00:00:00 2001 From: Rick Marron Date: Thu, 14 Jul 2022 18:22:05 -0700 Subject: [PATCH 1/6] it works --- posthog/api/session_recording.py | 49 ++++- posthog/helpers/session_recording.py | 188 ++++++++++++++++-- .../session_recordings/session_recording.py | 116 +---------- posthog/settings/object_storage.py | 6 +- posthog/storage/object_storage.py | 70 ++++++- posthog/utils.py | 4 + session-recordings/src/ingester/index.ts | 21 +- session-recordings/src/ingester/utils.ts | 8 +- session-recordings/src/s3.ts | 6 +- session-recordings/src/types.ts | 1 + 10 files changed, 323 insertions(+), 146 deletions(-) diff --git a/posthog/api/session_recording.py b/posthog/api/session_recording.py index 292548f5af595..0caae51acc8a7 100644 --- a/posthog/api/session_recording.py +++ b/posthog/api/session_recording.py @@ -1,5 +1,7 @@ import dataclasses -from typing import Any, Union +import json +from datetime import datetime, timezone +from typing import Any, List, Union from rest_framework import exceptions, request, response, serializers, viewsets from rest_framework.decorators import action @@ -8,14 +10,16 @@ from posthog.api.person import PersonSerializer from posthog.api.routing import StructuredViewSetMixin +from posthog.helpers.session_recording import RecordingEventSummary, get_metadata_from_event_summaries from posthog.models import Filter, PersonDistinctId from posthog.models.filters.session_recordings_filter import SessionRecordingsFilter from posthog.models.person import Person from posthog.models.session_recording_event import SessionRecordingViewed from posthog.permissions import ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission -from posthog.queries.session_recordings.session_recording import SessionRecording +from posthog.queries.session_recordings.session_recording import RecordingMetadata, SessionRecording from posthog.queries.session_recordings.session_recording_list import SessionRecordingList -from posthog.utils import format_query_params_absolute_url +from posthog.storage.object_storage import list_all_objects, read, read_all +from posthog.utils import format_query_params_absolute_url, should_read_recordings_from_object_storage DEFAULT_RECORDING_CHUNK_LIMIT = 20 # Should be tuned to find the best value @@ -57,7 +61,44 @@ def _get_session_recording_snapshots(self, request, session_recording_id, limit, request=request, team=self.team, session_recording_id=session_recording_id ).get_snapshots(limit, offset) + def _get_session_recording_metadata_from_object_storage(self, session_recording_id): + base_folder = f"session_recordings/team_id/{self.team_id}/session_id/{session_recording_id}/metadata/" + metadata_file = f"{base_folder}metadata.json" + metadata_file_content = read(metadata_file) + distinct_id = json.loads(metadata_file_content).get("distinctId") + + event_summary_files = list_all_objects(f"{base_folder}event_summaries/") + + event_summary_file_names = [object["Key"] for object in event_summary_files] + event_summary_file_contents = read_all(event_summary_file_names) + event_summary_strings = [ + event_summary_string + for _, metadata_string in event_summary_file_contents + for event_summary_string in metadata_string.split("\n") + ] + event_summary_json_string = "[" + ",".join(event_summary_strings) + "]" + event_summaries = json.loads(event_summary_json_string) + parsed_event_summaries: List[RecordingEventSummary] = [ + RecordingEventSummary( + timestamp=datetime.fromtimestamp(event_summary.get("timestamp", 0) / 1000, timezone.utc), + window_id=event_summary.get("windowId", ""), + type=event_summary.get("eventType"), + source=event_summary.get("eventSource"), + ) + for event_summary in event_summaries + ] + + segments, start_and_end_times_by_window_id = get_metadata_from_event_summaries(parsed_event_summaries) + + return RecordingMetadata( + segments=segments, + start_and_end_times_by_window_id=start_and_end_times_by_window_id, + distinct_id=distinct_id, + ) + def _get_session_recording_meta_data(self, request, session_recording_id): + if should_read_recordings_from_object_storage(self.team_id): + return self._get_session_recording_metadata_from_object_storage(session_recording_id) return SessionRecording( request=request, team=self.team, session_recording_id=session_recording_id ).get_metadata() @@ -133,7 +174,7 @@ def retrieve(self, request: request.Request, *args: Any, **kwargs: Any) -> respo persondistinctid__distinct_id=session_recording_meta_data.distinct_id, persondistinctid__team_id=self.team, team=self.team, - ) + ) if session_recording_meta_data.distinct_id else None except Person.DoesNotExist: person = None diff --git a/posthog/helpers/session_recording.py b/posthog/helpers/session_recording.py index 4b6c21dcdc687..98b87a77937df 100644 --- a/posthog/helpers/session_recording.py +++ b/posthog/helpers/session_recording.py @@ -3,8 +3,8 @@ import gzip import json from collections import defaultdict -from datetime import datetime, timedelta -from typing import DefaultDict, Dict, Generator, List, Optional +from datetime import datetime, timedelta, timezone +from typing import DefaultDict, Dict, Generator, List, Optional, Tuple, cast from sentry_sdk.api import capture_exception, capture_message @@ -37,6 +37,14 @@ class EventActivityData: is_active: bool +@dataclasses.dataclass +class RecordingEventSummary: + timestamp: datetime + window_id: str + type: int + source: Optional[int] + + @dataclasses.dataclass class RecordingSegment: start_time: datetime @@ -130,13 +138,68 @@ def decompress(base64data: str) -> str: return gzip.decompress(compressed_bytes).decode("utf-16", "surrogatepass") +def get_event_summaries_from_compressed_snapshot_data( + team_id: int, session_recording_id: str, all_recording_events: List[SnapshotDataTaggedWithWindowId], +) -> List[RecordingEventSummary]: + """ + This function takes in chunked + compresses snapshot data and returns + a list of event summaries that can be used for metadata calculation. + + It tries to do this conversion in a way that never holds an entire decompressed recording + to decrease the memory usage. + """ + + if len(all_recording_events) == 0: + return [] + + # Split decompressed recording events into their chunks + chunks_collector: DefaultDict[str, List[SnapshotDataTaggedWithWindowId]] = defaultdict(list) + for event in all_recording_events: + chunks_collector[event.snapshot_data["chunk_id"]].append(event) + + chunk_list: List[List[SnapshotDataTaggedWithWindowId]] = list(chunks_collector.values()) + + event_summaries: List[RecordingEventSummary] = [] + + # Decompress the chunks and split the resulting events by window_id + for chunk in chunk_list: + window_id = chunk[0].window_id + if len(chunk) != chunk[0].snapshot_data["chunk_count"]: + capture_message( + "Did not find all session recording chunks! Team: {}, Session: {}, Chunk-id: {}. Found {} of {} expected chunks".format( + team_id, + session_recording_id, + chunk[0].snapshot_data["chunk_id"], + len(chunk), + chunk[0].snapshot_data["chunk_count"], + ) + ) + continue + + b64_compressed_data = "".join( + chunk.snapshot_data["data"] for chunk in sorted(chunk, key=lambda c: c.snapshot_data["chunk_index"]) + ) + decompressed_data = json.loads(decompress(b64_compressed_data)) + event_summaries.extend( + [ + RecordingEventSummary( + timestamp=datetime.fromtimestamp(recording_event.get("timestamp", 0) / 1000, timezone.utc), + window_id=window_id, + type=recording_event.get("type"), + source=recording_event.get("data", {}).get("source"), + ) + for recording_event in decompressed_data + ] + ) + return event_summaries + + def decompress_chunked_snapshot_data( team_id: int, session_recording_id: str, all_recording_events: List[SnapshotDataTaggedWithWindowId], limit: Optional[int] = None, offset: int = 0, - return_only_activity_data: bool = False, ) -> DecompressedRecordingData: """ Before data is stored in clickhouse, it is compressed and then chunked. This function @@ -146,8 +209,7 @@ def decompress_chunked_snapshot_data( you can't decompress an incomplete chunk). Depending on the size of the recording, this function can return a lot of data. To decrease the - memory used, you should either use the pagination parameters or pass in 'return_only_activity_data' which - drastically reduces the size of the data returned if you only want the activity data (used for metadata calculation) + memory used, you should use the pagination parameters. """ if len(all_recording_events) == 0: @@ -194,21 +256,11 @@ def decompress_chunked_snapshot_data( ) decompressed_data = json.loads(decompress(b64_compressed_data)) - # Decompressed data can be large, and in metadata calculations, we only care if the event is "active" - # This pares down the data returned, so we're not passing around a massive object - if return_only_activity_data: - events_with_only_activity_data = [ - {"timestamp": recording_event.get("timestamp"), "is_active": is_active_event(recording_event)} - for recording_event in decompressed_data - ] - snapshot_data_by_window_id[chunks[0].window_id].extend(events_with_only_activity_data) - - else: - snapshot_data_by_window_id[chunks[0].window_id].extend(decompressed_data) + snapshot_data_by_window_id[chunks[0].window_id].extend(decompressed_data) return DecompressedRecordingData(has_next=has_next, snapshot_data_by_window_id=snapshot_data_by_window_id) -def is_active_event(event: SnapshotData) -> bool: +def is_active_event(event: RecordingEventSummary) -> bool: """ Determines which rr-web events are "active" - meaning user generated """ @@ -222,21 +274,21 @@ def is_active_event(event: SnapshotData) -> bool: 7, # "MediaInteraction" 12, # "Drag" ] - return event.get("type") == 3 and event.get("data", {}).get("source") in active_rr_web_sources + return event.type == 3 and event.source in active_rr_web_sources ACTIVITY_THRESHOLD_SECONDS = 60 def get_active_segments_from_event_list( - event_list: List[EventActivityData], window_id: WindowId, activity_threshold_seconds=ACTIVITY_THRESHOLD_SECONDS + event_list: List[RecordingEventSummary], window_id: WindowId, activity_threshold_seconds=ACTIVITY_THRESHOLD_SECONDS ) -> List[RecordingSegment]: """ Processes a list of events for a specific window_id to determine the segments of the recording where the user is "active". And active segment ends when there isn't another active event for activity_threshold_seconds seconds """ - active_event_timestamps = [event.timestamp for event in event_list if event.is_active] + active_event_timestamps = [event.timestamp for event in event_list if is_active_event(event)] active_recording_segments: List[RecordingSegment] = [] current_active_segment: Optional[RecordingSegment] = None @@ -368,3 +420,99 @@ def get_session_recording_events_for_object_storage( ) ) return recording_events_for_object_storage + + +def get_metadata_from_event_summaries( + event_summaries: List[RecordingEventSummary], +) -> Tuple[List[RecordingSegment], Dict[WindowId, Dict]]: + """ + This function processes the recording events into metadata. + + A recording can be composed of events from multiple windows/tabs. Recording events are seperated by + `window_id`, so the playback experience is consistent (changes in one tab don't impact the recording + of a different tab). However, we still want to playback the recording to the end user as the user interacted + with their product. + + This function creates a "playlist" of recording segments that designates the order in which the front end + should flip between players of different windows/tabs. To create this playlist, this function does the following: + + (1) For each recording event, we determine if it is "active" or not. An active event designates user + activity (e.g. mouse movement). + + (2) We then generate "active segments" based on these lists of events. Active segments are segments + of recordings where the maximum time between events determined to be active is less than a threshold (set to 60 seconds). + + (3) Next, we merge the active segments from all of the window_ids + sort them by start time. We now have the + list of active segments. (note, it's very possible that active segments overlap if a user is flipping back + and forth between tabs) + + (4) To complete the recording, we fill in the gaps between active segments with "inactive segments". In + determining which window should be used for the inactive segment, we try to minimize the switching of windows. + """ + + event_summaries_by_window_id: DefaultDict[WindowId, List[RecordingEventSummary]] = defaultdict(list) + for event_summary in event_summaries: + event_summaries_by_window_id[event_summary.window_id].append(event_summary) + + start_and_end_times_by_window_id = {} + + # Get the active segments for each window_id + all_active_segments: List[RecordingSegment] = [] + for window_id, event_list in event_summaries_by_window_id.items(): + # Not sure why, but events are sometimes slightly out of order + event_list.sort(key=lambda x: x.timestamp) + + active_segments_for_window_id = get_active_segments_from_event_list(event_list, window_id) + + all_active_segments.extend(active_segments_for_window_id) + + start_and_end_times_by_window_id[window_id] = { + "start_time": event_list[0].timestamp, + "end_time": event_list[-1].timestamp, + } + + # Sort the active segments by start time. This will interleave active segments + # from different windows + all_active_segments.sort(key=lambda segment: segment.start_time) + + # These start and end times are used to make sure the segments span the entire recording + first_start_time = min([cast(datetime, x["start_time"]) for x in start_and_end_times_by_window_id.values()]) + last_end_time = max([cast(datetime, x["end_time"]) for x in start_and_end_times_by_window_id.values()]) + + # Now, we fill in the gaps between the active segments with inactive segments + all_segments = [] + current_timestamp = first_start_time + current_window_id: WindowId = sorted( + start_and_end_times_by_window_id, key=lambda x: start_and_end_times_by_window_id[x]["start_time"] + )[0] + + for index, segment in enumerate(all_active_segments): + # It's possible that segments overlap and we don't need to fill a gap + if segment.start_time > current_timestamp: + all_segments.extend( + generate_inactive_segments_for_range( + current_timestamp, + segment.start_time, + current_window_id, + start_and_end_times_by_window_id, + is_first_segment=index == 0, + ) + ) + all_segments.append(segment) + current_window_id = segment.window_id + current_timestamp = max(segment.end_time, current_timestamp) + + # If the last segment ends before the recording ends, we need to fill in the gap + if current_timestamp < last_end_time: + all_segments.extend( + generate_inactive_segments_for_range( + current_timestamp, + last_end_time, + current_window_id, + start_and_end_times_by_window_id, + is_last_segment=True, + is_first_segment=current_timestamp == first_start_time, + ) + ) + + return all_segments, start_and_end_times_by_window_id diff --git a/posthog/queries/session_recordings/session_recording.py b/posthog/queries/session_recordings/session_recording.py index 6938e613a5eae..99fe3328d273b 100644 --- a/posthog/queries/session_recordings/session_recording.py +++ b/posthog/queries/session_recordings/session_recording.py @@ -1,20 +1,18 @@ import dataclasses import json -from datetime import datetime, timezone -from typing import Dict, List, Optional, Tuple, cast +from typing import Dict, List, Optional, cast from rest_framework.request import Request from posthog.client import sync_execute from posthog.helpers.session_recording import ( DecompressedRecordingData, - EventActivityData, RecordingSegment, SnapshotDataTaggedWithWindowId, WindowId, decompress_chunked_snapshot_data, - generate_inactive_segments_for_range, - get_active_segments_from_event_list, + get_event_summaries_from_compressed_snapshot_data, + get_metadata_from_event_summaries, ) from posthog.models import SessionRecordingEvent, Team @@ -85,112 +83,14 @@ def get_metadata(self) -> Optional[RecordingMetadata]: if len(all_snapshots) == 0: return None - segments, start_and_end_times_by_window_id = self._process_snapshots_for_metadata(all_snapshots) + event_summaries = get_event_summaries_from_compressed_snapshot_data( + self._team.pk, self._session_recording_id, all_snapshots + ) + + segments, start_and_end_times_by_window_id = get_metadata_from_event_summaries(event_summaries) return RecordingMetadata( segments=segments, start_and_end_times_by_window_id=start_and_end_times_by_window_id, distinct_id=cast(str, distinct_id), ) - - def _process_snapshots_for_metadata(self, all_snapshots) -> Tuple[List[RecordingSegment], Dict[WindowId, Dict]]: - """ - This function processes the recording events into metadata. - - A recording can be composed of events from multiple windows/tabs. Recording events are seperated by - `window_id`, so the playback experience is consistent (changes in one tab don't impact the recording - of a different tab). However, we still want to playback the recording to the end user as the user interacted - with their product. - - This function creates a "playlist" of recording segments that designates the order in which the front end - should flip between players of different windows/tabs. To create this playlist, this function does the following: - - (1) For each recording event, we determine if it is "active" or not. An active event designates user - activity (e.g. mouse movement). - - (2) We then generate "active segments" based on these lists of events. Active segments are segments - of recordings where the maximum time between events determined to be active is less than a threshold (set to 60 seconds). - - (3) Next, we merge the active segments from all of the window_ids + sort them by start time. We now have the - list of active segments. (note, it's very possible that active segments overlap if a user is flipping back - and forth between tabs) - - (4) To complete the recording, we fill in the gaps between active segments with "inactive segments". In - determining which window should be used for the inactive segment, we try to minimize the switching of windows. - """ - - decompressed_recording_data = decompress_chunked_snapshot_data( - self._team.pk, self._session_recording_id, all_snapshots, return_only_activity_data=True - ) - - start_and_end_times_by_window_id = {} - - # Get the active segments for each window_id - all_active_segments: List[RecordingSegment] = [] - for window_id, event_list in decompressed_recording_data.snapshot_data_by_window_id.items(): - events_with_processed_timestamps = [ - EventActivityData( - timestamp=datetime.fromtimestamp(event.get("timestamp", 0) / 1000, timezone.utc), - is_active=event.get("is_active", False), - ) - for event in event_list - ] - # Not sure why, but events are sometimes slightly out of order - events_with_processed_timestamps.sort(key=lambda x: cast(datetime, x.timestamp)) - - active_segments_for_window_id = get_active_segments_from_event_list( - events_with_processed_timestamps, window_id - ) - - all_active_segments.extend(active_segments_for_window_id) - - start_and_end_times_by_window_id[window_id] = { - "start_time": events_with_processed_timestamps[0].timestamp, - "end_time": events_with_processed_timestamps[-1].timestamp, - } - - # Sort the active segments by start time. This will interleave active segments - # from different windows - all_active_segments.sort(key=lambda segment: segment.start_time) - - # These start and end times are used to make sure the segments span the entire recording - first_start_time = min([cast(datetime, x["start_time"]) for x in start_and_end_times_by_window_id.values()]) - last_end_time = max([cast(datetime, x["end_time"]) for x in start_and_end_times_by_window_id.values()]) - - # Now, we fill in the gaps between the active segments with inactive segments - all_segments = [] - current_timestamp = first_start_time - current_window_id: WindowId = sorted( - start_and_end_times_by_window_id, key=lambda x: start_and_end_times_by_window_id[x]["start_time"] - )[0] - - for index, segment in enumerate(all_active_segments): - # It's possible that segments overlap and we don't need to fill a gap - if segment.start_time > current_timestamp: - all_segments.extend( - generate_inactive_segments_for_range( - current_timestamp, - segment.start_time, - current_window_id, - start_and_end_times_by_window_id, - is_first_segment=index == 0, - ) - ) - all_segments.append(segment) - current_window_id = segment.window_id - current_timestamp = max(segment.end_time, current_timestamp) - - # If the last segment ends before the recording ends, we need to fill in the gap - if current_timestamp < last_end_time: - all_segments.extend( - generate_inactive_segments_for_range( - current_timestamp, - last_end_time, - current_window_id, - start_and_end_times_by_window_id, - is_last_segment=True, - is_first_segment=current_timestamp == first_start_time, - ) - ) - - return all_segments, start_and_end_times_by_window_id diff --git a/posthog/settings/object_storage.py b/posthog/settings/object_storage.py index 16a38a19953ac..43d2a58ba9475 100644 --- a/posthog/settings/object_storage.py +++ b/posthog/settings/object_storage.py @@ -26,5 +26,9 @@ WRITE_RECORDINGS_TO_OBJECT_STORAGE_FOR_TEAM = get_from_env( - "WRITE_RECORDINGS_TO_OBJECT_STORAGE_FOR_TEAM", None, type_cast=int + "WRITE_RECORDINGS_TO_OBJECT_STORAGE_FOR_TEAM", None, type_cast=int, optional=True +) + +READ_RECORDINGS_FROM_OBJECT_STORAGE_FOR_TEAM = get_from_env( + "READ_RECORDINGS_FROM_OBJECT_STORAGE_FOR_TEAM", None, type_cast=int, optional=True ) diff --git a/posthog/storage/object_storage.py b/posthog/storage/object_storage.py index 301386ce253c0..39345165934e1 100644 --- a/posthog/storage/object_storage.py +++ b/posthog/storage/object_storage.py @@ -1,5 +1,6 @@ import abc -from typing import Optional, Union +from threading import Thread +from typing import List, Optional, Tuple, Union import structlog from boto3 import client @@ -28,6 +29,14 @@ def read(self, bucket: str, key: str) -> Optional[str]: def read_bytes(self, bucket: str, key: str) -> Optional[bytes]: pass + @abc.abstractmethod + def list_all_objects(self, bucket: str, prefix: str) -> Optional[List[str]]: + pass + + @abc.abstractmethod + def read_all(self, bucket: str, keys: List[str]) -> Optional[List[Tuple[str, str]]]: + pass + @abc.abstractmethod def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None: pass @@ -43,6 +52,12 @@ def read(self, bucket: str, key: str) -> Optional[str]: def read_bytes(self, bucket: str, key: str) -> Optional[bytes]: pass + def list_all_objects(self, bucket: str, prefix: str) -> Optional[List[str]]: + pass + + def read_all(self, bucket: str, keys: List[str]) -> Optional[List[Tuple[str, str]]]: + pass + def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None: pass @@ -74,6 +89,49 @@ def read_bytes(self, bucket: str, key: str) -> Optional[bytes]: logger.error("object_storage.read_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response) raise ObjectStorageError("read failed") from e + def list_all_objects(self, bucket: str, prefix: str) -> Optional[List[str]]: + objects = [] + try: + has_next = True + while has_next: + last_key = objects[-1]["Key"] if len(objects) > 0 else "" + s3_response = self.aws_client.list_objects_v2( + Bucket=bucket, Prefix=prefix, Delimiter="/", StartAfter=last_key + ) + has_next = s3_response["IsTruncated"] + objects.extend(s3_response["Contents"]) + return objects + except Exception as e: + logger.error("object_storage.list_all_objects_failed", bucket=bucket, prefix=prefix, error=e) + raise ObjectStorageError("list_all_objects failed") from e + + def _read_for_threading(self, result: List, index: int, bucket: str, key: str): + result[index] = self.read(bucket, key) + + def read_all(self, bucket: str, keys: List[str], max_concurrent_requests: int) -> Optional[List[Tuple[str, str]]]: + all_results: List[Tuple[str, str]] = [] + chunked_keys = [ + keys[0 + offset : max_concurrent_requests + offset] + for offset in range(0, len(keys), max_concurrent_requests) + ] + for chunk_of_keys in chunked_keys: + jobs: List[Thread] = [] + chunk_result: List[Optional[str]] = [None] * len(chunk_of_keys) + for index, key in enumerate(chunk_of_keys): + thread = Thread(target=self._read_for_threading, args=(chunk_result, index, bucket, key)) + jobs.append(thread) + + for j in jobs: + j.start() + + for j in jobs: + j.join() + + for index, key in enumerate(chunk_of_keys): + all_results.append((key, chunk_result[index])) + + return all_results + def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None: s3_response = {} try: @@ -114,6 +172,16 @@ def read(file_name: str) -> Optional[str]: return object_storage_client().read(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name) +def list_all_objects(prefix: str) -> Optional[List[str]]: + return object_storage_client().list_all_objects(bucket=settings.OBJECT_STORAGE_BUCKET, prefix=prefix) + + +def read_all(file_names: List[str], max_concurrent_requests: int = 100) -> Optional[List[Tuple[str, str]]]: + return object_storage_client().read_all( + bucket=settings.OBJECT_STORAGE_BUCKET, keys=file_names, max_concurrent_requests=max_concurrent_requests + ) + + def read_bytes(file_name: str) -> Optional[bytes]: return object_storage_client().read_bytes(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name) diff --git a/posthog/utils.py b/posthog/utils.py index 669de5169fc96..5737a6a81123d 100644 --- a/posthog/utils.py +++ b/posthog/utils.py @@ -994,3 +994,7 @@ def get_crontab(schedule: Optional[str]) -> Optional[crontab]: def should_write_recordings_to_object_storage(team_id: int) -> bool: return settings.OBJECT_STORAGE_ENABLED and team_id == settings.WRITE_RECORDINGS_TO_OBJECT_STORAGE_FOR_TEAM + + +def should_read_recordings_from_object_storage(team_id: int) -> bool: + return settings.OBJECT_STORAGE_ENABLED and team_id == settings.READ_RECORDINGS_FROM_OBJECT_STORAGE_FOR_TEAM diff --git a/session-recordings/src/ingester/index.ts b/session-recordings/src/ingester/index.ts index 657b95dbfbe67..2d1f9c8c36bbd 100644 --- a/session-recordings/src/ingester/index.ts +++ b/session-recordings/src/ingester/index.ts @@ -4,7 +4,7 @@ import { RecordingEvent, RecordingEventGroup } from '../types' import { s3Client } from '../s3' import { meterProvider } from './metrics' import { performance } from 'perf_hooks' -import { getEventGroupDataString, getEventGroupMetadata } from './utils' +import { getEventGroupDataString, getEventSummaryMetadata } from './utils' const maxEventGroupAge = Number.parseInt( process.env.MAX_EVENT_GROUP_AGE || process.env.NODE_ENV === 'dev' ? '1000' : '300000' @@ -59,6 +59,7 @@ consumer.run({ const sessionId = message.headers.sessionId.toString() const windowId = message.headers.windowId.toString() const eventId = message.headers.eventId.toString() + const distinctId = message.headers.distinctId.toString() const eventSource = Number.parseInt(message.headers.eventSource.toString()) const eventType = Number.parseInt(message.headers.eventType.toString()) const teamId = Number.parseInt(message.headers.teamId.toString()) @@ -75,19 +76,26 @@ consumer.run({ }) const commitEventGroupToS3 = async () => { - const dataKey = `team_id/${eventGroup.teamId}/session_id/${eventGroup.sessionId}/data/${eventGroup.oldestEventTimestamp}-${eventGroup.oldestOffset}` - - // TODO: calculate metadata and write it to this key - const metaDataKey = `team_id/${eventGroup.teamId}/session_id/${eventGroup.sessionId}/metadata/${eventGroup.oldestEventTimestamp}-${eventGroup.oldestOffset}` + const baseKey = `session_recordings/team_id/${eventGroup.teamId}/session_id/${eventGroup.sessionId}` + const dataKey = `${baseKey}/data/${eventGroup.oldestEventTimestamp}-${eventGroup.oldestOffset}` + const metaDataEventSummaryKey = `${baseKey}/metadata/event_summaries/${eventGroup.oldestEventTimestamp}-${eventGroup.oldestOffset}` + const metaDataKey = `${baseKey}/metadata/metadata.json` console.debug({ action: 'committing_event_group', sessionId: eventGroup.sessionId, key: dataKey }) const sendStartTime = performance.now() + await s3Client.send( + new PutObjectCommand({ + Bucket: 'posthog', + Key: metaDataEventSummaryKey, + Body: getEventSummaryMetadata(eventGroup), + }) + ) await s3Client.send( new PutObjectCommand({ Bucket: 'posthog', Key: metaDataKey, - Body: getEventGroupMetadata(eventGroup), + Body: JSON.stringify({ distinctId: eventGroup.distinctId }), }) ) await s3Client.send( @@ -145,6 +153,7 @@ consumer.run({ teamId: teamId, sessionId: sessionId, oldestEventTimestamp: unixTimestamp, + distinctId: distinctId, oldestOffset: message.offset, newestOffset: message.offset, } diff --git a/session-recordings/src/ingester/utils.ts b/session-recordings/src/ingester/utils.ts index b390754c13fb0..2121e7e17d99c 100644 --- a/session-recordings/src/ingester/utils.ts +++ b/session-recordings/src/ingester/utils.ts @@ -1,4 +1,4 @@ -import { RecordingEvent, RecordingEventGroup } from '../types' +import { RecordingEventGroup } from '../types' export const getEventGroupDataString = (recordingEventGroup: RecordingEventGroup) => { const events = Object.values(recordingEventGroup.events) @@ -6,14 +6,14 @@ export const getEventGroupDataString = (recordingEventGroup: RecordingEventGroup return eventDataStrings.join('\n') } -export const getEventGroupMetadata = (recordingEventGroup: RecordingEventGroup) => { +export const getEventSummaryMetadata = (recordingEventGroup: RecordingEventGroup) => { const eventSummaries = Object.values(recordingEventGroup.events) .filter((event) => event.complete) .sort((a, b) => a.timestamp - b.timestamp) .map((event) => JSON.stringify({ - eventSource: event.eventSource, - eventType: event.eventType, + source: event.eventSource, + type: event.eventType, windowId: event.windowId, timestamp: event.timestamp, }) diff --git a/session-recordings/src/s3.ts b/session-recordings/src/s3.ts index 7d992a67f02d8..aef1a424c9315 100644 --- a/session-recordings/src/s3.ts +++ b/session-recordings/src/s3.ts @@ -1,7 +1,9 @@ import { S3Client } from '@aws-sdk/client-s3' -const ACCESS_KEY_ID = process.env.NODE_ENV === 'dev' ? 'root' : process.env.OBJECT_STORAGE_ACCESS_KEY_ID -const SECRET_ACCESS_KEY = process.env.NODE_ENV === 'dev' ? 'password' : process.env.OBJECT_STORAGE_SECRET_ACCESS_KEY +const ACCESS_KEY_ID = + process.env.NODE_ENV === 'dev' ? 'object_storage_root_user' : process.env.OBJECT_STORAGE_ACCESS_KEY_ID +const SECRET_ACCESS_KEY = + process.env.NODE_ENV === 'dev' ? 'object_storage_root_password' : process.env.OBJECT_STORAGE_SECRET_ACCESS_KEY const ENDPOINT = process.env.NODE_ENV === 'dev' ? 'http://localhost:19000' : process.env.OBJECT_STORAGE_ENDPOINT const credentials = ACCESS_KEY_ID diff --git a/session-recordings/src/types.ts b/session-recordings/src/types.ts index ec1dba2751147..011f8bb9c8e97 100644 --- a/session-recordings/src/types.ts +++ b/session-recordings/src/types.ts @@ -1,6 +1,7 @@ export type RecordingEventGroup = { teamId: number sessionId: string + distinctId: string // OK if this distinct ID changes through the recording, we just need to store a single distinct ID // TODO: replace string[] with a file handle that we can append to events: Record size: number From dd3c1e2363e21437325054d47eb79f3d6925b33a Mon Sep 17 00:00:00 2001 From: Rick Marron Date: Thu, 14 Jul 2022 18:32:08 -0700 Subject: [PATCH 2/6] update username + password in docker --- session-recordings/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/session-recordings/docker-compose.yml b/session-recordings/docker-compose.yml index 67c8af9db99e0..0509c4a5ca28d 100644 --- a/session-recordings/docker-compose.yml +++ b/session-recordings/docker-compose.yml @@ -27,7 +27,7 @@ services: - '19000:19000' - '19001:19001' environment: - MINIO_ROOT_USER: root - MINIO_ROOT_PASSWORD: password + MINIO_ROOT_USER: object_storage_root_user + MINIO_ROOT_PASSWORD: object_storage_root_password entrypoint: sh command: -c 'mkdir -p /data/posthog && minio server --address ":19000" --console-address ":19001" /data' # create the 'posthog' bucket before starting the service From 30ffaa3e9544780ef4e4af6813a910246c2d0aec Mon Sep 17 00:00:00 2001 From: Rick Marron Date: Thu, 14 Jul 2022 18:42:46 -0700 Subject: [PATCH 3/6] fix some tests --- session-recordings/src/api.ts | 2 +- session-recordings/test/ingestion.test.ts | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/session-recordings/src/api.ts b/session-recordings/src/api.ts index 562b5467d3ba3..ee393fad8c88f 100644 --- a/session-recordings/src/api.ts +++ b/session-recordings/src/api.ts @@ -11,7 +11,7 @@ routes.get('/api/team/:teamId/session_recordings/:sessionId', async ({ params: { // Fetch events for the specified session recording // TODO: habdle time range querying, list pagination - const prefix = `team_id/${teamId}/session_id/${sessionId}/data/` + const prefix = `session_recordings/team_id/${teamId}/session_id/${sessionId}/data/` console.debug({ action: 'fetch_session', teamId, sessionId, prefix }) diff --git a/session-recordings/test/ingestion.test.ts b/session-recordings/test/ingestion.test.ts index cc32b8ed027fd..ece44c4cf78b9 100644 --- a/session-recordings/test/ingestion.test.ts +++ b/session-recordings/test/ingestion.test.ts @@ -189,6 +189,7 @@ describe.concurrent('ingester', () => { eventId: eventUuid, sessionId: sessionId, windowId: windowId, + distinctId: 'abc123', chunkIndex: '0', chunkCount: '2', teamId: '1', @@ -221,6 +222,7 @@ describe.concurrent('ingester', () => { eventId: eventUuid, sessionId: sessionId, windowId: windowId, + distinctId: 'abc123', chunkIndex: '0', chunkCount: '1', teamId: teamId, @@ -253,6 +255,7 @@ describe.concurrent('ingester', () => { eventId: eventUuid, sessionId: sessionId, windowId: windowId, + distinctId: 'abc123', chunkIndex: '0', chunkCount: '1', teamId: teamId, From 432ce3db10434fe51f22baf5f0ce5fe33b931d21 Mon Sep 17 00:00:00 2001 From: Harry Waye Date: Fri, 15 Jul 2022 12:36:28 +0000 Subject: [PATCH 4/6] add distinctIds to tests --- session-recordings/test/ingestion.test.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/session-recordings/test/ingestion.test.ts b/session-recordings/test/ingestion.test.ts index ece44c4cf78b9..2276b7330c007 100644 --- a/session-recordings/test/ingestion.test.ts +++ b/session-recordings/test/ingestion.test.ts @@ -36,6 +36,7 @@ describe.concurrent('ingester', () => { eventId: eventUuid, sessionId: sessionId, windowId: windowId, + distinctId: '123', chunkIndex: '0', chunkCount: '1', teamId: '1', @@ -70,6 +71,7 @@ describe.concurrent('ingester', () => { eventId: eventUuid, sessionId: sessionId, windowId: windowId, + distinctId: '123', chunkIndex: '0', chunkCount: '2', teamId: '1', @@ -91,6 +93,7 @@ describe.concurrent('ingester', () => { eventId: eventUuid, sessionId: sessionId, windowId: windowId, + distinctId: '123', chunkIndex: '1', chunkCount: '2', chunkOffset: first.baseOffset, @@ -125,6 +128,7 @@ describe.concurrent('ingester', () => { eventId: eventUuid, sessionId: sessionId, windowId: windowId, + distinctId: '123', chunkIndex: '0', chunkCount: '2', teamId: '1', @@ -152,6 +156,7 @@ describe.concurrent('ingester', () => { eventId: eventUuid, sessionId: sessionId, windowId: windowId, + distinctId: '123', chunkIndex: '1', chunkCount: '2', chunkOffset: first.baseOffset, From fe33f05392115baf9a8a6a3638e8b93cd1186931 Mon Sep 17 00:00:00 2001 From: Harry Waye Date: Mon, 18 Jul 2022 10:51:23 +0000 Subject: [PATCH 5/6] update tests to use new minio creds --- session-recordings/bin/test.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/session-recordings/bin/test.sh b/session-recordings/bin/test.sh index 94ccdd348bc1d..ce567d77e256a 100755 --- a/session-recordings/bin/test.sh +++ b/session-recordings/bin/test.sh @@ -12,8 +12,8 @@ docker-compose up -d CONTAINER_NAME=session-recordings-test trap 'docker kill $(docker ps -q --filter="name=$CONTAINER_NAME")' EXIT docker run --rm \ - -e OBJECT_STORAGE_ACCESS_KEY_ID=root \ - -e OBJECT_STORAGE_SECRET_ACCESS_KEY=password \ + -e OBJECT_STORAGE_ACCESS_KEY_ID=object_storage_root_user \ + -e OBJECT_STORAGE_SECRET_ACCESS_KEY=object_storage_root_password \ -e OBJECT_STORAGE_ENDPOINT=http://localhost:19000 \ -e MAX_EVENT_GROUP_AGE=1000 \ -e MAX_EVENT_GROUP_SIZE=1000 \ From 19d6cc39a723c8e029961dc1a56906f78d42a490 Mon Sep 17 00:00:00 2001 From: Harry Waye Date: Tue, 19 Jul 2022 14:21:09 +0000 Subject: [PATCH 6/6] add healthcheck --- session-recordings/bin/test.sh | 2 +- .../src/ingester/healthcheck.ts | 44 +++++++++++++++++++ session-recordings/src/ingester/index.ts | 17 +++++-- session-recordings/src/ingester/metrics.ts | 33 ++++++++++---- 4 files changed, 83 insertions(+), 13 deletions(-) create mode 100644 session-recordings/src/ingester/healthcheck.ts diff --git a/session-recordings/bin/test.sh b/session-recordings/bin/test.sh index ce567d77e256a..d53a9f0286435 100755 --- a/session-recordings/bin/test.sh +++ b/session-recordings/bin/test.sh @@ -22,7 +22,7 @@ docker run --rm \ "${1:-session-recordings}" & # Wait for the ingester to be up -until (curl --silent http://localhost:3001/metrics); +until (curl --silent http://localhost:3001/_readyz); do echo "Waiting for instance to come up" sleep 5 diff --git a/session-recordings/src/ingester/healthcheck.ts b/session-recordings/src/ingester/healthcheck.ts new file mode 100644 index 0000000000000..cab56d76dc5a4 --- /dev/null +++ b/session-recordings/src/ingester/healthcheck.ts @@ -0,0 +1,44 @@ +import { Router } from 'express' +import { Consumer, ConsumerConfig } from 'kafkajs' + +export const getHealthcheckRoutes = ({ + consumer, + consumerConfig: { sessionTimeout }, +}: { + consumer: Consumer + consumerConfig: ConsumerConfig +}) => { + const { HEARTBEAT } = consumer.events + let lastHeartbeat: number | undefined + consumer.on(HEARTBEAT, ({ timestamp }) => (lastHeartbeat = timestamp)) + + const routes = Router() + + routes.get('/_livez', async (_, res) => { + // For liveness we just check that the Node event loop is still running, + // nothing else. It's possible that the pod is in an unrecoverable + // state regarding consuming, in which case this liveness check wouldn't + // help. + return res.status(200).json({ http: true }) + }) + + routes.get('/_readyz', async (_, res) => { + // Consumer has heartbeat within the session timeout, + // so it is healthy + if (lastHeartbeat && Date.now() - lastHeartbeat < sessionTimeout) { + return res.status(200).json({ consumer: true }) + } + + // Consumer has no heartbeat, but maybe it's because the group is currently rebalancing + try { + const { state } = await consumer.describeGroup() + + const ready = ['CompletingRebalance', 'PreparingRebalance'].includes(state) + return res.status(200).json({ consumer: ready }) + } catch (err) { + return res.status(503).json({ consumer: false }) + } + }) + + return routes +} diff --git a/session-recordings/src/ingester/index.ts b/session-recordings/src/ingester/index.ts index 2d1f9c8c36bbd..9938e2ee6a706 100644 --- a/session-recordings/src/ingester/index.ts +++ b/session-recordings/src/ingester/index.ts @@ -2,9 +2,11 @@ import { Kafka } from 'kafkajs' import { PutObjectCommand } from '@aws-sdk/client-s3' import { RecordingEvent, RecordingEventGroup } from '../types' import { s3Client } from '../s3' -import { meterProvider } from './metrics' +import { meterProvider, metricRoutes } from './metrics' import { performance } from 'perf_hooks' import { getEventGroupDataString, getEventSummaryMetadata } from './utils' +import { getHealthcheckRoutes } from './healthcheck' +import express from 'express' const maxEventGroupAge = Number.parseInt( process.env.MAX_EVENT_GROUP_AGE || process.env.NODE_ENV === 'dev' ? '1000' : '300000' @@ -20,13 +22,21 @@ const kafka = new Kafka({ brokers: ['localhost:9092'], }) -const consumer = kafka.consumer({ +const consumerConfig = { groupId: `object-storage-ingester`, -}) + sessionTimeout: 30000, + heartbeatInterval: 6000, +} +const consumer = kafka.consumer(consumerConfig) consumer.connect() consumer.subscribe({ topic: RECORDING_EVENTS_TOPIC }) +const app = express() +app.use(getHealthcheckRoutes({ consumer, consumerConfig })) +app.use(metricRoutes) +const httpServer = app.listen(3001) + const eventsBySessionId: { [key: string]: RecordingEventGroup } = {} // Define the metrics we'll be exposing at /metrics @@ -220,6 +230,7 @@ signalTraps.map((type) => { process.once(type, async () => { try { await consumer.disconnect() + await httpServer.close() } finally { process.kill(process.pid, type) } diff --git a/session-recordings/src/ingester/metrics.ts b/session-recordings/src/ingester/metrics.ts index 0e8764c3ff15d..ed233b44939d1 100644 --- a/session-recordings/src/ingester/metrics.ts +++ b/session-recordings/src/ingester/metrics.ts @@ -8,19 +8,34 @@ * together. */ -import { PrometheusExporter } from '@opentelemetry/exporter-prometheus' -import { MeterProvider } from '@opentelemetry/sdk-metrics-base' +import { PrometheusSerializer } from '@opentelemetry/exporter-prometheus' +import { AggregationTemporality, MeterProvider, MetricReader } from '@opentelemetry/sdk-metrics-base' +import { Router } from 'express' -const exporter = new PrometheusExporter({ port: 3001, host: '0.0.0.0', preventServerStart: false }) export const meterProvider = new MeterProvider() +class Exporter extends MetricReader { + selectAggregationTemporality() { + return AggregationTemporality.CUMULATIVE + } + + protected onForceFlush(): Promise { + return + } + + protected onShutdown(): Promise { + return + } +} + +const exporter = new Exporter() + meterProvider.addMetricReader(exporter) -// Make sure we kill the exporter on shutdown -const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] +export const metricRoutes = Router() -signalTraps.map((type) => { - process.once(type, async () => { - await exporter.stopServer() - }) +metricRoutes.get('/_metrics', async (req, res) => { + const results = await exporter.collect() + res.setHeader('content-type', 'text/plain') + return res.send(new PrometheusSerializer().serialize(results.resourceMetrics)) })