From dba58f4467d14662d4d5113229be3ac6ada1a9f2 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Wed, 27 Jan 2021 23:12:55 +0200 Subject: [PATCH] Hotfix for missing session recording events (#3069) * Hotfix for missing session recording events Temporarily solves https://github.com/PostHog/posthog/issues/2927, though the error will rear it's head again with plugin-based ingestion. Opened https://github.com/PostHog/posthog/issues/3068 for long-term solution. * kwargs everywhere * use settings over kwargs --- ee/clickhouse/client.py | 26 +++++++++---------- .../models/session_recording_event.py | 25 +++++++++++++----- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/ee/clickhouse/client.py b/ee/clickhouse/client.py index d8f9fbc7dd048..f3f1c4a48590c 100644 --- a/ee/clickhouse/client.py +++ b/ee/clickhouse/client.py @@ -9,7 +9,7 @@ from asgiref.sync import async_to_sync from clickhouse_driver import Client as SyncClient from clickhouse_pool import ChPool -from django.conf import settings +from django.conf import settings as app_settings from django.core.cache import cache from django.utils.timezone import now from sentry_sdk.api import capture_exception @@ -37,13 +37,13 @@ ch_client = None # type: Client ch_sync_pool = None # type: ChPool - def async_execute(query, args=None): + def async_execute(query, args=None, settings=None): return - def sync_execute(query, args=None): + def sync_execute(query, args=None, settings=None): return - def cache_sync_execute(query, args=None, redis_client=None, ttl=None): + def cache_sync_execute(query, args=None, redis_client=None, ttl=None, settings=None): return @@ -60,9 +60,9 @@ def cache_sync_execute(query, args=None, redis_client=None, ttl=None): ) @async_to_sync - async def async_execute(query, args=None): + async def async_execute(query, args=None, settings=None): loop = asyncio.get_event_loop() - task = loop.create_task(ch_client.execute(query, args)) + task = loop.create_task(ch_client.execute(query, args, settings=settings)) return task else: @@ -77,8 +77,8 @@ async def async_execute(query, args=None): verify=CLICKHOUSE_VERIFY, ) - def async_execute(query, args=None): - return sync_execute(query, args) + def async_execute(query, args=None, settings=None): + return sync_execute(query, args, settings=settings) ch_sync_pool = ChPool( host=CLICKHOUSE_HOST, @@ -92,7 +92,7 @@ def async_execute(query, args=None): connections_max=100, ) - def cache_sync_execute(query, args=None, redis_client=None, ttl=CACHE_TTL): + def cache_sync_execute(query, args=None, redis_client=None, ttl=CACHE_TTL, settings=None): if not redis_client: redis_client = redis.get_client() key = _key_hash(query, args) @@ -100,18 +100,18 @@ def cache_sync_execute(query, args=None, redis_client=None, ttl=CACHE_TTL): result = _deserialize(redis_client.get(key)) return result else: - result = sync_execute(query, args) + result = sync_execute(query, args, settings=settings) redis_client.set(key, _serialize(result), ex=ttl) return result - def sync_execute(query, args=None): + def sync_execute(query, args=None, settings=None): start_time = time() try: with ch_sync_pool.get_client() as client: - result = client.execute(query, args) + result = client.execute(query, args, settings=settings) finally: execution_time = time() - start_time - if settings.SHELL_PLUS_PRINT_SQL: + if app_settings.SHELL_PLUS_PRINT_SQL: print(format_sql(query, args)) print("Execution time: %.6fs" % (execution_time,)) if _save_query_user_id: diff --git a/ee/clickhouse/models/session_recording_event.py b/ee/clickhouse/models/session_recording_event.py index bb786d9987cc6..6ea5eaab2fcc2 100644 --- a/ee/clickhouse/models/session_recording_event.py +++ b/ee/clickhouse/models/session_recording_event.py @@ -1,16 +1,22 @@ import datetime import json +import logging import uuid -from typing import Dict, List, Optional, Tuple, Union +from typing import Union -from dateutil.parser import isoparse -from django.utils import timezone +from sentry_sdk import capture_exception +from ee.clickhouse.client import sync_execute from ee.clickhouse.models.util import cast_timestamp_or_now from ee.clickhouse.sql.session_recording_events import INSERT_SESSION_RECORDING_EVENT_SQL from ee.kafka_client.client import ClickhouseProducer from ee.kafka_client.topics import KAFKA_SESSION_RECORDING_EVENTS +logger = logging.getLogger(__name__) + +MAX_KAFKA_MESSAGE_LENGTH = 800_000 +MAX_INSERT_LENGTH = 15_000_000 + def create_session_recording_event( uuid: uuid.UUID, @@ -22,15 +28,22 @@ def create_session_recording_event( ) -> str: timestamp = cast_timestamp_or_now(timestamp) + snapshot_data_json = json.dumps(snapshot_data) data = { "uuid": str(uuid), "team_id": team_id, "distinct_id": distinct_id, "session_id": session_id, - "snapshot_data": json.dumps(snapshot_data), + "snapshot_data": snapshot_data_json, "timestamp": timestamp, "created_at": timestamp, } - p = ClickhouseProducer() - p.produce(sql=INSERT_SESSION_RECORDING_EVENT_SQL, topic=KAFKA_SESSION_RECORDING_EVENTS, data=data) + if len(snapshot_data_json) <= MAX_KAFKA_MESSAGE_LENGTH: + p = ClickhouseProducer() + p.produce(sql=INSERT_SESSION_RECORDING_EVENT_SQL, topic=KAFKA_SESSION_RECORDING_EVENTS, data=data) + elif len(snapshot_data_json) <= MAX_INSERT_LENGTH: + sync_execute(INSERT_SESSION_RECORDING_EVENT_SQL, data, settings={"max_query_size": MAX_INSERT_LENGTH}) + else: + capture_exception(Exception(f"Session recording event data too large - {len(snapshot_data_json)}")) + return str(uuid)