Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip(session-recordings): store session recordings to S3 #10142

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
253705b
wip(session-recordings): store session recordings to S3
Jun 6, 2022
40778dc
add todo for cleanup.policy=compact
Jun 6, 2022
a19afaf
so that kafka connect starts on m1
pauldambra Jun 7, 2022
9f64674
flyby: shift arm64 services npm scripts to dev
pauldambra Jun 7, 2022
5373bfc
update services npm script to launch dbs too
pauldambra Jun 7, 2022
d0391ca
create connector in migrate script
pauldambra Jun 7, 2022
2c4f87d
add gzip middleware
pauldambra Jun 7, 2022
b4dcf3a
don't explode on empty s3 result
pauldambra Jun 7, 2022
08be323
remove services
pauldambra Jun 7, 2022
a064720
Merge branch 'master' into feat/session-recordings-to-s3
pauldambra Jun 7, 2022
f25b979
add to migrate with waiting
pauldambra Jun 7, 2022
7ec2317
don't add middleware in this PR
pauldambra Jun 7, 2022
28f1da6
behind an instance setting at team level
pauldambra Jun 7, 2022
aedebf3
only read session recordings from storage when flag is on
pauldambra Jun 7, 2022
96dec97
correct docker compose args
pauldambra Jun 7, 2022
f8b4817
account for the new query for instance settings
pauldambra Jun 7, 2022
8683cc6
obey mypy
pauldambra Jun 7, 2022
1cb7fa5
remove TODO that will be handled by #10171
pauldambra Jun 7, 2022
a6151e1
there's one run where django cloud tests have a different name for ob…
pauldambra Jun 7, 2022
2ae1a57
correct docker compose in several places
pauldambra Jun 7, 2022
fd52a14
test the read side
pauldambra Jun 7, 2022
d5dfdc3
less magical path generation
pauldambra Jun 7, 2022
d0dca97
fix tests from object_storage refactor :/
pauldambra Jun 7, 2022
6cb6163
no kafka connect in master for cloud tests yet
pauldambra Jun 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/run-backend-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ runs:
run: |
export CLICKHOUSE_SERVER_IMAGE=${{ inputs.clickhouse-server-image }}
docker-compose -f docker-compose.dev.yml down
docker-compose -f docker-compose.dev.yml up -d db clickhouse zookeeper kafka redis object_storage &
docker-compose -f docker-compose.dev.yml up -d db clickhouse zookeeper kafka redis object-storage kafka-connect &

- name: Set up Python
uses: actions/setup-python@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-plugin-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
ping -c 1 kafka
ping -c 1 zookeeper
- name: Start Kafka, ClickHouse, Zookeeper, Object Storage
run: docker-compose -f docker-compose.dev.yml up -d zookeeper kafka clickhouse object_storage
run: docker-compose -f docker-compose.dev.yml up -d zookeeper kafka clickhouse object-storage

- name: Set up Python 3.8.12
uses: actions/setup-python@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
- name: Start stack with Docker Compose
run: |
docker-compose -f docker-compose.dev.yml down
docker-compose -f docker-compose.dev.yml up -d db clickhouse zookeeper kafka redis object_storage &
docker-compose -f docker-compose.dev.yml up -d db clickhouse zookeeper kafka redis object-storage kafka-connect &
- name: Add kafka host to /etc/hosts for kafka connectivity
run: sudo echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts

Expand Down
44 changes: 44 additions & 0 deletions bin/create_connector.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash

# Check Kafka Connect is available at all
while true; do
KAFKA_CONNECT=$(curl -sb -I 'http://localhost:8083')
if [[ $KAFKA_CONNECT =~ .*version.* ]]; then
break
fi

echo 'Waiting for Kafka Connect...' && sleep 1
done

while true; do
RESULT=$(# Creates a Kafka Connect S3 Sink for session recordings
curl -X PUT http://localhost:8083/connectors/session-recordings/config \
-H "Content-Type: application/json" \
-d '{
"name": "session-recordings",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "session-recordings",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"partition.field.name": "team_id,session_id,window_id",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"s3.bucket.name": "posthog",
"s3.region": "us-east-1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"store.url": "http://object-storage:19000",
"topics.dir": "session-recordings",
"flush.size": 10,
"s3.part.size": 5242880
}'
)

if [[ $RESULT =~ .*session-recordings.* ]]; then
echo "✅ session recordings connector is created"
break
fi

echo 'Creating session recordings connector...' && sleep 1
done

4 changes: 2 additions & 2 deletions bin/e2e-test-runner
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ export PGPASSWORD="${PGPASSWORD:=posthog}"
export PGPORT="${PGPORT:=5432}"
export DATABASE_URL="postgres://${PGUSER}:${PGPASSWORD}@${PGHOST}:${PGPORT}/${DATABASE}"

nc -z localhost 9092 || ( echo -e "\033[0;31mKafka isn't running. Please run\n\tdocker compose -f docker-compose.arm64.yml up zookeeper kafka clickhouse db redis\nI'll wait while you do that.\033[0m" ; bin/check_kafka_clickhouse_up )
wget -nv -t1 --spider 'http://localhost:8123/' || ( echo -e "\033[0;31mClickhouse isn't running. Please run\n\tdocker compose -f docker-compose.arm64.yml up zookeeper kafka clickhouse db redis.\nI'll wait while you do that.\033[0m" ; bin/check_kafka_clickhouse_up )
nc -z localhost 9092 || ( echo -e "\033[0;31mKafka isn't running. Please run\n\tdocker compose -f docker-compose.dev.yml up zookeeper kafka clickhouse db redis object-storage kafka-connect\nI'll wait while you do that.\033[0m" ; bin/check_kafka_clickhouse_up )
wget -nv -t1 --spider 'http://localhost:8123/' || ( echo -e "\033[0;31mClickhouse isn't running. Please run\n\tdocker compose -f docker-compose.dev.yml up zookeeper kafka clickhouse db redis object-storage kafka-connect.\nI'll wait while you do that.\033[0m" ; bin/check_kafka_clickhouse_up )


trap "trap - SIGTERM && yarn remove cypress cypress-terminal-report @cypress/react @cypress/webpack-preprocessor && kill -- -$$" SIGINT SIGTERM EXIT
Expand Down
4 changes: 3 additions & 1 deletion bin/migrate
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ set -e
python manage.py migrate
python manage.py migrate_clickhouse
python manage.py run_async_migrations --check
python manage.py sync_replicated_schema
python manage.py sync_replicated_schema

./bin/create_connector.sh
48 changes: 45 additions & 3 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,48 @@ services:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: 'true'

# TODO: add Kafka Connect support to Helm chart
# TODO: use MSK Connect in production
kafka-connect:
image: confluentinc/cp-kafka-connect-base:7.1.1
platform: linux/amd64
depends_on:
- zookeeper
- kafka
ports:
- '8083:8083'
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: '8083'
CONNECT_GROUP_ID: connect
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
AWS_ACCESS_KEY_ID: object_storage_root_user
AWS_SECRET_ACCESS_KEY: object_storage_root_password
command:
- bash
- -c
- |
# TODO: create connect-configs, connect-offsets, connect-status
# topics with cleanup.policy=compact

echo "Installing S3 connector sink"
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.0.8
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity

worker: &worker
build:
context: .
Expand Down Expand Up @@ -89,7 +131,7 @@ services:
- redis
- clickhouse
- kafka
- object_storage
- object-storage
web:
<<: *worker
command: '${CH_WEB_SCRIPT:-./ee/bin/docker-ch-dev-web}'
Expand All @@ -115,9 +157,9 @@ services:
- redis
- clickhouse
- kafka
- object_storage
- object-storage

object_storage:
object-storage:
image: minio/minio
restart: on-failure
ports:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const EDITABLE_INSTANCE_SETTINGS = [
'EMAIL_REPLY_TO',
'AGGREGATE_BY_DISTINCT_IDS_TEAMS',
'ENABLE_ACTOR_ON_EVENTS_TEAMS',
'ENABLE_SESSION_RECORDING_INGESTION_TO_STORAGE_TEAMS',
]

export const systemStatusLogic = kea<systemStatusLogicType>({
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
"prepublishOnly": "yarn build",
"setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse",
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment",
"services:start": "cd .. && docker-compose -f docker-compose.dev.yml up zookeeper kafka clickhouse object_storage",
"services:start": "cd .. && docker-compose -f docker-compose.dev.yml up zookeeper kafka clickhouse object-storage",
"services:stop": "cd .. && docker-compose -f docker-compose.dev.yml down",
"services:clean": "cd .. && docker-compose -f docker-compose.dev.yml rm -v zookeeper kafka clickhouse object_storage",
"services:clean": "cd .. && docker-compose -f docker-compose.dev.yml rm -v zookeeper kafka clickhouse object-storage",
"services": "yarn services:stop && yarn services:clean && yarn services:start"
},
"bin": {
Expand Down
82 changes: 81 additions & 1 deletion posthog/api/capture.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import base64
import gzip
import hashlib
import json
import logging
import re
from datetime import datetime
from typing import Any, Dict, Optional
Expand Down Expand Up @@ -27,7 +30,9 @@
from posthog.exceptions import generate_exception_response
from posthog.helpers.session_recording import preprocess_session_recording_events
from posthog.models.feature_flag import get_overridden_feature_flags
from posthog.models.instance_setting import get_instance_setting
from posthog.models.utils import UUIDT
from posthog.settings import get_list
from posthog.utils import cors_response, get_ip_address


Expand Down Expand Up @@ -59,11 +64,86 @@ def log_event(data: Dict, event_name: str, partition_key: str) -> None:

# TODO: Handle Kafka being unavailable with exponential backoff retries
try:
if (
# TODO what happens to sessions that are running when this is enabled
any(
int(team) == data.get("team_id", None)
for team in get_list(get_instance_setting("ENABLE_SESSION_RECORDING_INGESTION_TO_STORAGE_TEAMS"))
)
and event_name == "$snapshot"
):
# If we have a rrweb event, push it to a separate topic
# TODO: remove unneeded wrapping of rrweb data
# TODO: remove pushing of message to events topic, instead we can
# just read into ClickHouse, from the 'session-recordings' topic the
# details we need to e.g. generate the "session length" etc.
# TODO: possibly find a nicer way to handle JSON Schemas
# TODO: avoid doing any expensive processing e.g. base64/gzip in
# this method (we could just offload it to read time for now as
# modifying at source may be too much for one refactor)
session_recording_data = json.loads(data["data"])
session_recording_data = {
"payload": {
# Get rid of the multiple encodings and compression, we'll
# have compression at S3 and Kafka levels and on transport
# to handle making this smaller, plus it makes it difficult
# to debug. The base64 likely increases the compression
# ratio overall(?)
#
# We also need to ensure that it is a string as the Kafka
# Connect JsonConverter doesn't use Json Schema and I can't
# find an equivalent of `additionalProperties: true`
"data": json.dumps(
{
**data,
"data": {
**session_recording_data,
"properties": {
**session_recording_data["properties"],
"$snapshot_data": {
**session_recording_data["properties"]["$snapshot_data"],
"data": json.loads(
gzip.decompress(
base64.b64decode(
session_recording_data["properties"]["$snapshot_data"]["data"]
)
).decode("utf-16", "surrogatepass")
),
},
},
},
}
),
"window_id": session_recording_data["properties"]["$window_id"],
"session_id": session_recording_data["properties"]["$session_id"],
"team_id": data["team_id"],
},
# To avoid needing to use a SchemaRegistry but still be able to
# use fields as partition key, we add an inline schema as
# expected by Kafka Connect JsonConverter. Perhaps a simpler
# approach would be to push with an appropriate key and use
# that.
"schema": {
"type": "struct",
"optional": False,
"version": 1,
"fields": [
{"field": "data", "type": "string", "optional": False},
{"field": "team_id", "type": "int32", "optional": False,},
{"field": "session_id", "type": "string", "optional": False,},
{"field": "window_id", "type": "string", "optional": False,},
],
},
}
KafkaProducer().produce(topic="session-recordings", data=session_recording_data, key=partition_key)

KafkaProducer().produce(topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, data=data, key=partition_key)
statsd.incr("posthog_cloud_plugin_server_ingestion")
except Exception as e:
statsd.incr("capture_endpoint_log_event_error")
print(f"Failed to produce event to Kafka topic {KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC} with error:", e)
logging.exception(
f"Failed to produce event to Kafka topic {KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC} with error:", e
)
raise e


Expand Down
Loading