diff --git a/session-recordings/src/ingester/index.ts b/session-recordings/src/ingester/index.ts index a6ca46ff7052c..f6b0a9494ac69 100644 --- a/session-recordings/src/ingester/index.ts +++ b/session-recordings/src/ingester/index.ts @@ -1,6 +1,6 @@ import { Kafka } from 'kafkajs' import { PutObjectCommand } from '@aws-sdk/client-s3' -import { RecordingEvent, RecordingEventGroup, RecordingMessage } from '../types' +import { KafkaTopic, RecordingEvent, RecordingEventGroup, RecordingMessage } from '../types' import { s3Client } from '../s3' import { meterProvider, metricRoutes } from './metrics' import { performance } from 'perf_hooks' @@ -10,6 +10,8 @@ import { getEventGroupDataString, getEventSize, getEventSummaryMetadata, + getTopicAndPartitionFromKey, + getTopicPartitionKey, } from './utils' import { getHealthcheckRoutes } from './healthcheck' import express from 'express' @@ -26,13 +28,19 @@ const eventGroupSizeUploadThreshold = Number.parseInt( ) const RECORDING_EVENTS_DEAD_LETTER_TOPIC = 'recording_events_dead_letter' -const RECORDING_EVENTS_TOPICS_CONFIGS = { + +type TopicConfig = { + retryTopic: string + timeout: number +} + +const RECORDING_EVENTS_TOPICS_CONFIGS: Record = { recording_events: { timeout: 0, retryTopic: 'recording_events_retry_1' }, recording_events_retry_1: { timeout: 2 * 1000, retryTopic: 'recording_events_retry_2' }, recording_events_retry_2: { timeout: 30 * 1000, retryTopic: 'recording_events_retry_3' }, recording_events_retry_3: { timeout: 5 * 60 * 1000, retryTopic: RECORDING_EVENTS_DEAD_LETTER_TOPIC }, } -const RECORDING_EVENTS_TOPICS = Object.keys(RECORDING_EVENTS_TOPICS_CONFIGS) +const RECORDING_EVENTS_TOPICS = Object.keys(RECORDING_EVENTS_TOPICS_CONFIGS) as KafkaTopic[] const kafka = new Kafka({ clientId: 'ingester', @@ -102,23 +110,32 @@ const retryEventGroup = async (eventGroup: RecordingEventGroup) => { // TODO: Make this handle multiple topics + partitions const getOffsetOfOldestMessageInBuffers = (topic: string, partition: number): number => { - const oldestMessageOffsetInEventBuffer = Object.values(eventBuffers).reduce((acc, event) => { - return Math.min(acc, event.oldestOffset) - }, -1) + const oldestMessageOffsetInEventBufferForTopicAndPartition = Object.values(eventBuffers) + .filter((event) => { + return event.kafkaTopic === topic && event.kafkaPartition === partition + }) + .reduce((acc, event) => { + return Math.min(acc, event.oldestOffset) + }, -1) + + const topicPartitionKey = getTopicPartitionKey(topic, partition) - const oldestMessageOffsetInEventGroups = Object.values(eventGroupsBySessionId) + const oldestMessageOffsetInEventGroupsForTopicAndPartition = Object.values(eventGroupsBySessionId) .flat() .reduce((acc, eventGroup) => { - return Math.min(acc, eventGroup.oldestOffset) + return Math.min(acc, eventGroup.oldestOffsets[topicPartitionKey]) }, -1) - return Math.min(oldestMessageOffsetInEventBuffer, oldestMessageOffsetInEventGroups) + return Math.min( + oldestMessageOffsetInEventBufferForTopicAndPartition, + oldestMessageOffsetInEventGroupsForTopicAndPartition + ) } -const commitEventGroupToS3 = async (eventGroupToSend: RecordingEventGroup, topic: string, partition: number) => { +const commitEventGroupToS3 = async (eventGroupToSend: RecordingEventGroup) => { const baseKey = `session_recordings/team_id/${eventGroupToSend.teamId}/session_id/${eventGroupToSend.sessionId}` - const dataKey = `${baseKey}/data/${eventGroupToSend.oldestEventTimestamp}-${eventGroupToSend.oldestOffset}` - const metaDataEventSummaryKey = `${baseKey}/metadata/event_summaries/${eventGroupToSend.oldestEventTimestamp}-${eventGroupToSend.oldestOffset}` + const dataKey = `${baseKey}/data/${eventGroupToSend.oldestEventTimestamp}-${eventGroupToSend.oldestOriginalOffset}` + const metaDataEventSummaryKey = `${baseKey}/metadata/event_summaries/${eventGroupToSend.oldestEventTimestamp}-${eventGroupToSend.oldestOriginalOffset}` const metaDataKey = `${baseKey}/metadata/metadata.json` logger.debug({ action: 'committing_event_group', sessionId: eventGroupToSend.sessionId, key: dataKey }) @@ -152,14 +169,14 @@ const commitEventGroupToS3 = async (eventGroupToSend: RecordingEventGroup, topic logger.error({ action: 'failed_to_commit_event_group', sessionId: eventGroupToSend.sessionId, + teamId: eventGroupToSend.teamId, key: dataKey, error: err, - topic, }) retryEventGroup(eventGroupToSend) } - // We've sent the event group to S3 or teh retry queues, so we can remove it from the buffer + // We've sent the event group to S3 or the retry queues, so we can remove it from the buffer eventGroupsBySessionId[eventGroupToSend.sessionId] = eventGroupsBySessionId[eventGroupToSend.sessionId].filter( (eventGroup) => { eventGroup.id !== eventGroupToSend.id @@ -170,37 +187,43 @@ const commitEventGroupToS3 = async (eventGroupToSend: RecordingEventGroup, topic observableResult.observe(Object.keys(eventGroupsBySessionId).flat().length) ) - // Update the Kafka offset - const oldestOffsetInBuffers = getOffsetOfOldestMessageInBuffers(topic, partition) - const offsetToCommit = oldestOffsetInBuffers === -1 ? eventGroupToSend.newestOffset + 1 : oldestOffsetInBuffers - consumer.commitOffsets([ - { - topic, - partition, - offset: offsetToCommit.toString(), - }, - ]) - - logger.debug({ action: 'committing_offset', offset: offsetToCommit, partition }) + // Update the Kafka offsets for each topic/partition in the event group + Object.keys(eventGroupToSend.oldestOffsets).forEach((topicPartitionKey) => { + const { topic, partition } = getTopicAndPartitionFromKey(topicPartitionKey) + const oldestOffsetInBuffers = getOffsetOfOldestMessageInBuffers(topic, partition) + const offsetToCommit = + oldestOffsetInBuffers === -1 ? eventGroupToSend.newestOffsets[topicPartitionKey] + 1 : oldestOffsetInBuffers + consumer.commitOffsets([ + { + topic, + partition, + offset: offsetToCommit.toString(), + }, + ]) + logger.debug({ action: 'committing_offset', offset: offsetToCommit, partition, topic }) + }) + eventGroupsCommittedCounter.add(1) } const createEventGroup = (event: RecordingEvent) => { + const topicPartitionKey = getTopicPartitionKey(event.kafkaTopic, event.kafkaPartition) const eventGroup: RecordingEventGroup = { id: randomUUID(), - events: {} as Record, + events: [] as RecordingEvent[], size: 0, teamId: event.teamId, sessionId: event.sessionId, oldestEventTimestamp: event.timestamp, distinctId: event.distinctId, - oldestOffset: event.oldestOffset, - newestOffset: event.newestOffset, + oldestOffsets: { [topicPartitionKey]: event.oldestOffset }, + newestOffsets: { [topicPartitionKey]: event.newestOffset }, + oldestOriginalOffset: event.oldestOriginalOffset, status: 'active', } eventGroup.timer = setTimeout(() => { eventGroup.status = 'sending' - commitEventGroupToS3({ ...eventGroup }, event.kafkaTopic, event.kafkaPartition) + commitEventGroupToS3({ ...eventGroup }) }, maxEventGroupAge) logger.debug({ action: 'create_event_group', sessionId: eventGroup.sessionId }) @@ -215,7 +238,7 @@ const createEventGroup = (event: RecordingEvent) => { const processCompleteEvent = async (event: RecordingEvent) => { logger.debug({ - action: 'start', + action: 'process event', uuid: event.eventId, sessionId: event.sessionId, }) @@ -227,14 +250,22 @@ const processCompleteEvent = async (event: RecordingEvent) => { eventGroupsStarted.add(1, { reason: 'no-existing-event-group' }) } - eventGroup.events[event.eventId] = event + eventGroup.events.push(event) eventGroup.size += getEventSize(event) - eventGroup.newestOffset = event.newestOffset + + const topicPartitionKey = getTopicPartitionKey(event.kafkaTopic, event.kafkaPartition) + + eventGroup.newestOffsets[topicPartitionKey] = event.newestOffset + if (eventGroup.oldestOffsets[topicPartitionKey] === undefined) { + eventGroup.oldestOffsets[topicPartitionKey] = event.oldestOffset + } + + eventGroup.oldestOriginalOffset = Math.min(eventGroup.oldestOriginalOffset, event.oldestOriginalOffset) if (eventGroup.size > eventGroupSizeUploadThreshold) { clearTimeout(eventGroup.timer) eventGroup.status = 'sending' - commitEventGroupToS3({ ...eventGroup }, event.kafkaTopic, event.kafkaPartition) + commitEventGroupToS3({ ...eventGroup }) } } @@ -254,30 +285,47 @@ const handleMessage = async (message: RecordingMessage) => { kafkaPartition: message.kafkaPartition, oldestOffset: message.kafkaOffset, newestOffset: message.kafkaOffset, + oldestOriginalOffset: message.originalKafkaOffset, } - recordingEvent.complete = message.chunkIndex === message.chunkCount - 1 - recordingEvent.messages.push(message) - recordingEvent.newestOffset = message.kafkaOffset + // Ensures that a single event only comes from a single topic and therefore a single partition (because the key is the sessionId) + if (message.kafkaTopic === recordingEvent.kafkaTopic) { + recordingEvent.complete = message.chunkIndex === message.chunkCount - 1 + recordingEvent.messages.push(message) + recordingEvent.newestOffset = message.kafkaOffset + if (!recordingEvent.complete) { + eventBuffers[message.eventId] = recordingEvent + } else { + if (message.chunkCount !== recordingEvent.messages.length) { + logger.error({ + action: 'chunk_count_mismatch', + sessionId: message.sessionId, + eventId: message.eventId, + chunkCount: message.chunkCount, + chunkIndex: message.chunkIndex, + messageCount: recordingEvent.messages.length, + }) + } + processCompleteEvent(recordingEvent) + delete eventBuffers[message.eventId] + } - if (!recordingEvent.complete) { - eventBuffers[message.eventId] = recordingEvent + snapshotMessagesProcessed.add(1) } else { - if (message.chunkCount !== recordingEvent.messages.length) { - logger.error({ - action: 'chunk_count_mismatch', - sessionId: message.sessionId, - eventId: message.eventId, - chunkCount: message.chunkCount, - chunkIndex: message.chunkIndex, - messageCount: recordingEvent.messages.length, - }) - } - processCompleteEvent(recordingEvent) - delete eventBuffers[message.eventId] + logger.error({ + action: 'kafka_topic_mismatch', + eventId: message.eventId, + sessionId: message.sessionId, + kafkaTopic: message.kafkaTopic, + kafkaPartition: message.kafkaPartition, + expectedKafkaTopic: recordingEvent.kafkaTopic, + expectedKafkaPartition: recordingEvent.kafkaPartition, + }) + producer.send({ + topic: RECORDING_EVENTS_DEAD_LETTER_TOPIC, + messages: [convertRecordingMessageToKafkaMessage(message)], + }) } - - snapshotMessagesProcessed.add(1) } consumer.run({ @@ -290,10 +338,10 @@ consumer.run({ // TODO: Handle duplicated data being stored in the case of a consumer restart messagesReceived.add(1) - const recordingMessage = convertKafkaMessageToRecordingMessage(message, topic, partition) + const recordingMessage = convertKafkaMessageToRecordingMessage(message, topic as KafkaTopic, partition) - const timeout_ms = - RECORDING_EVENTS_TOPICS_CONFIGS[topic as keyof typeof RECORDING_EVENTS_TOPICS_CONFIGS].timeout + // TODO Do this without timeouts so order is maintained + const timeout_ms = RECORDING_EVENTS_TOPICS_CONFIGS[topic as KafkaTopic].timeout if (timeout_ms !== 0) { setTimeout(() => { handleMessage(recordingMessage) diff --git a/session-recordings/src/ingester/utils.ts b/session-recordings/src/ingester/utils.ts index cbc2d23fe6c8a..272ae1a0c7603 100644 --- a/session-recordings/src/ingester/utils.ts +++ b/session-recordings/src/ingester/utils.ts @@ -1,9 +1,8 @@ import { KafkaMessage, Message } from 'kafkajs' -import { RecordingEvent, RecordingEventGroup, RecordingMessage } from '../types' +import { KafkaTopic, RecordingEvent, RecordingEventGroup, RecordingMessage } from '../types' export const getEventGroupDataString = (recordingEventGroup: RecordingEventGroup) => { - const events = Object.values(recordingEventGroup.events) - const eventDataStrings = events + const eventDataStrings = recordingEventGroup.events .sort((a, b) => a.timestamp - b.timestamp) .map((event) => event.messages.map((message) => message.value).join('')) return eventDataStrings.join('\n') @@ -43,13 +42,14 @@ export const convertRecordingMessageToKafkaMessage = (recordingMessage: Recordin unixTimestamp: recordingMessage.timestamp.toString(), chunkCount: recordingMessage.chunkCount.toString(), chunkIndex: recordingMessage.chunkIndex.toString(), + originalKafkaOffset: recordingMessage.originalKafkaOffset.toString(), }, } } export const convertKafkaMessageToRecordingMessage = ( kafkaMessage: KafkaMessage, - topic: string, + topic: KafkaTopic, partition: number ): RecordingMessage => { return { @@ -65,8 +65,18 @@ export const convertKafkaMessageToRecordingMessage = ( chunkIndex: Number.parseInt(kafkaMessage.headers.chunkIndex.toString()), value: kafkaMessage.value.toString(), kafkaOffset: Number.parseInt(kafkaMessage.offset), + originalKafkaOffset: + topic === 'recording_events' + ? Number.parseInt(kafkaMessage.offset) + : Number.parseInt(kafkaMessage.headers.originalKafkaOffset.toString()), kafkaTopic: topic, kafkaPartition: partition, kafkaKey: kafkaMessage.key?.toString(), } } + +export const getTopicPartitionKey = (topic: string, partition: number) => `${topic}-${partition}` +export const getTopicAndPartitionFromKey = (topicPartitionKey: string) => { + const [topic, partition] = topicPartitionKey.split('-') + return { topic, partition: Number.parseInt(partition) } +} diff --git a/session-recordings/src/types.ts b/session-recordings/src/types.ts index 1d3dffebdd6b0..240ff2d44898c 100644 --- a/session-recordings/src/types.ts +++ b/session-recordings/src/types.ts @@ -1,14 +1,20 @@ +export type KafkaTopic = + | 'recording_events' + | 'recording_events_retry_1' + | 'recording_events_retry_2' + | 'recording_events_retry_3' + export type RecordingEventGroup = { id: string teamId: number sessionId: string distinctId: string // OK if this distinct ID changes through the recording, we just need to store a single distinct ID - // TODO: replace string[] with a file handle that we can append to - events: Record + events: RecordingEvent[] size: number oldestEventTimestamp: number - oldestOffset: number - newestOffset: number + oldestOffsets: Record // Key is '{topic}-{partition}' + newestOffsets: Record // Key is '{topic}-{partition}' + oldestOriginalOffset: number // The original offset of the oldest message in the event group. Used for ordering timer?: NodeJS.Timeout status?: 'sending' | 'active' } @@ -24,9 +30,10 @@ export type RecordingEvent = { sessionId: string distinctId: string teamId: number - kafkaTopic: string + kafkaTopic: KafkaTopic kafkaPartition: number oldestOffset: number + oldestOriginalOffset: number // The original offset of the oldest message in the event. Used for ordering newestOffset: number } @@ -42,8 +49,9 @@ export type RecordingMessage = { chunkCount: number chunkIndex: number teamId: number - kafkaTopic: string + kafkaTopic: KafkaTopic kafkaPartition: number kafkaOffset: number + originalKafkaOffset: number // If this message was re-sent on a retry topic, this is the original Kafka offset. Used for ordering kafkaKey: string } diff --git a/session-recordings/test/ingestion.test.ts b/session-recordings/test/ingestion.test.ts index 97682d8b0c576..7430ff98a98f2 100644 --- a/session-recordings/test/ingestion.test.ts +++ b/session-recordings/test/ingestion.test.ts @@ -2,12 +2,10 @@ import { beforeEach, afterEach, expect, it, describe } from 'vitest' import { Kafka, Partitioners, Producer } from 'kafkajs' import http from 'http' import { v4 as uuidv4 } from 'uuid' -import { resetIngester } from '../src/ingester/index' declare module 'vitest' { export interface TestContext { - producer: Produ - cer + producer: Producer } }