Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(recording-ingester):handle offsets #11040

Merged
merged 1 commit into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 104 additions & 56 deletions session-recordings/src/ingester/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Kafka } from 'kafkajs'
import { PutObjectCommand } from '@aws-sdk/client-s3'
import { RecordingEvent, RecordingEventGroup, RecordingMessage } from '../types'
import { KafkaTopic, RecordingEvent, RecordingEventGroup, RecordingMessage } from '../types'
import { s3Client } from '../s3'
import { meterProvider, metricRoutes } from './metrics'
import { performance } from 'perf_hooks'
Expand All @@ -10,6 +10,8 @@ import {
getEventGroupDataString,
getEventSize,
getEventSummaryMetadata,
getTopicAndPartitionFromKey,
getTopicPartitionKey,
} from './utils'
import { getHealthcheckRoutes } from './healthcheck'
import express from 'express'
Expand All @@ -26,13 +28,19 @@ const eventGroupSizeUploadThreshold = Number.parseInt(
)

const RECORDING_EVENTS_DEAD_LETTER_TOPIC = 'recording_events_dead_letter'
const RECORDING_EVENTS_TOPICS_CONFIGS = {

type TopicConfig = {
retryTopic: string
timeout: number
}

const RECORDING_EVENTS_TOPICS_CONFIGS: Record<KafkaTopic, TopicConfig> = {
recording_events: { timeout: 0, retryTopic: 'recording_events_retry_1' },
recording_events_retry_1: { timeout: 2 * 1000, retryTopic: 'recording_events_retry_2' },
recording_events_retry_2: { timeout: 30 * 1000, retryTopic: 'recording_events_retry_3' },
recording_events_retry_3: { timeout: 5 * 60 * 1000, retryTopic: RECORDING_EVENTS_DEAD_LETTER_TOPIC },
}
const RECORDING_EVENTS_TOPICS = Object.keys(RECORDING_EVENTS_TOPICS_CONFIGS)
const RECORDING_EVENTS_TOPICS = Object.keys(RECORDING_EVENTS_TOPICS_CONFIGS) as KafkaTopic[]

const kafka = new Kafka({
clientId: 'ingester',
Expand Down Expand Up @@ -102,23 +110,32 @@ const retryEventGroup = async (eventGroup: RecordingEventGroup) => {

// TODO: Make this handle multiple topics + partitions
const getOffsetOfOldestMessageInBuffers = (topic: string, partition: number): number => {
const oldestMessageOffsetInEventBuffer = Object.values(eventBuffers).reduce((acc, event) => {
return Math.min(acc, event.oldestOffset)
}, -1)
const oldestMessageOffsetInEventBufferForTopicAndPartition = Object.values(eventBuffers)
.filter((event) => {
return event.kafkaTopic === topic && event.kafkaPartition === partition
})
.reduce((acc, event) => {
return Math.min(acc, event.oldestOffset)
}, -1)

const topicPartitionKey = getTopicPartitionKey(topic, partition)

const oldestMessageOffsetInEventGroups = Object.values(eventGroupsBySessionId)
const oldestMessageOffsetInEventGroupsForTopicAndPartition = Object.values(eventGroupsBySessionId)
.flat()
.reduce((acc, eventGroup) => {
return Math.min(acc, eventGroup.oldestOffset)
return Math.min(acc, eventGroup.oldestOffsets[topicPartitionKey])
}, -1)

return Math.min(oldestMessageOffsetInEventBuffer, oldestMessageOffsetInEventGroups)
return Math.min(
oldestMessageOffsetInEventBufferForTopicAndPartition,
oldestMessageOffsetInEventGroupsForTopicAndPartition
)
}

const commitEventGroupToS3 = async (eventGroupToSend: RecordingEventGroup, topic: string, partition: number) => {
const commitEventGroupToS3 = async (eventGroupToSend: RecordingEventGroup) => {
const baseKey = `session_recordings/team_id/${eventGroupToSend.teamId}/session_id/${eventGroupToSend.sessionId}`
const dataKey = `${baseKey}/data/${eventGroupToSend.oldestEventTimestamp}-${eventGroupToSend.oldestOffset}`
const metaDataEventSummaryKey = `${baseKey}/metadata/event_summaries/${eventGroupToSend.oldestEventTimestamp}-${eventGroupToSend.oldestOffset}`
const dataKey = `${baseKey}/data/${eventGroupToSend.oldestEventTimestamp}-${eventGroupToSend.oldestOriginalOffset}`
const metaDataEventSummaryKey = `${baseKey}/metadata/event_summaries/${eventGroupToSend.oldestEventTimestamp}-${eventGroupToSend.oldestOriginalOffset}`
const metaDataKey = `${baseKey}/metadata/metadata.json`

logger.debug({ action: 'committing_event_group', sessionId: eventGroupToSend.sessionId, key: dataKey })
Expand Down Expand Up @@ -152,14 +169,14 @@ const commitEventGroupToS3 = async (eventGroupToSend: RecordingEventGroup, topic
logger.error({
action: 'failed_to_commit_event_group',
sessionId: eventGroupToSend.sessionId,
teamId: eventGroupToSend.teamId,
key: dataKey,
error: err,
topic,
})
retryEventGroup(eventGroupToSend)
}

// We've sent the event group to S3 or teh retry queues, so we can remove it from the buffer
// We've sent the event group to S3 or the retry queues, so we can remove it from the buffer
eventGroupsBySessionId[eventGroupToSend.sessionId] = eventGroupsBySessionId[eventGroupToSend.sessionId].filter(
(eventGroup) => {
eventGroup.id !== eventGroupToSend.id
Expand All @@ -170,37 +187,43 @@ const commitEventGroupToS3 = async (eventGroupToSend: RecordingEventGroup, topic
observableResult.observe(Object.keys(eventGroupsBySessionId).flat().length)
)

// Update the Kafka offset
const oldestOffsetInBuffers = getOffsetOfOldestMessageInBuffers(topic, partition)
const offsetToCommit = oldestOffsetInBuffers === -1 ? eventGroupToSend.newestOffset + 1 : oldestOffsetInBuffers
consumer.commitOffsets([
{
topic,
partition,
offset: offsetToCommit.toString(),
},
])

logger.debug({ action: 'committing_offset', offset: offsetToCommit, partition })
// Update the Kafka offsets for each topic/partition in the event group
Object.keys(eventGroupToSend.oldestOffsets).forEach((topicPartitionKey) => {
const { topic, partition } = getTopicAndPartitionFromKey(topicPartitionKey)
const oldestOffsetInBuffers = getOffsetOfOldestMessageInBuffers(topic, partition)
const offsetToCommit =
oldestOffsetInBuffers === -1 ? eventGroupToSend.newestOffsets[topicPartitionKey] + 1 : oldestOffsetInBuffers
consumer.commitOffsets([
{
topic,
partition,
offset: offsetToCommit.toString(),
},
])
logger.debug({ action: 'committing_offset', offset: offsetToCommit, partition, topic })
})

eventGroupsCommittedCounter.add(1)
}

const createEventGroup = (event: RecordingEvent) => {
const topicPartitionKey = getTopicPartitionKey(event.kafkaTopic, event.kafkaPartition)
const eventGroup: RecordingEventGroup = {
id: randomUUID(),
events: {} as Record<string, RecordingEvent>,
events: [] as RecordingEvent[],
size: 0,
teamId: event.teamId,
sessionId: event.sessionId,
oldestEventTimestamp: event.timestamp,
distinctId: event.distinctId,
oldestOffset: event.oldestOffset,
newestOffset: event.newestOffset,
oldestOffsets: { [topicPartitionKey]: event.oldestOffset },
newestOffsets: { [topicPartitionKey]: event.newestOffset },
oldestOriginalOffset: event.oldestOriginalOffset,
status: 'active',
}
eventGroup.timer = setTimeout(() => {
eventGroup.status = 'sending'
commitEventGroupToS3({ ...eventGroup }, event.kafkaTopic, event.kafkaPartition)
commitEventGroupToS3({ ...eventGroup })
}, maxEventGroupAge)
logger.debug({ action: 'create_event_group', sessionId: eventGroup.sessionId })

Expand All @@ -215,7 +238,7 @@ const createEventGroup = (event: RecordingEvent) => {

const processCompleteEvent = async (event: RecordingEvent) => {
logger.debug({
action: 'start',
action: 'process event',
uuid: event.eventId,
sessionId: event.sessionId,
})
Expand All @@ -227,14 +250,22 @@ const processCompleteEvent = async (event: RecordingEvent) => {
eventGroupsStarted.add(1, { reason: 'no-existing-event-group' })
}

eventGroup.events[event.eventId] = event
eventGroup.events.push(event)
eventGroup.size += getEventSize(event)
eventGroup.newestOffset = event.newestOffset

const topicPartitionKey = getTopicPartitionKey(event.kafkaTopic, event.kafkaPartition)

eventGroup.newestOffsets[topicPartitionKey] = event.newestOffset
if (eventGroup.oldestOffsets[topicPartitionKey] === undefined) {
eventGroup.oldestOffsets[topicPartitionKey] = event.oldestOffset
}

eventGroup.oldestOriginalOffset = Math.min(eventGroup.oldestOriginalOffset, event.oldestOriginalOffset)

if (eventGroup.size > eventGroupSizeUploadThreshold) {
clearTimeout(eventGroup.timer)
eventGroup.status = 'sending'
commitEventGroupToS3({ ...eventGroup }, event.kafkaTopic, event.kafkaPartition)
commitEventGroupToS3({ ...eventGroup })
}
}

Expand All @@ -254,30 +285,47 @@ const handleMessage = async (message: RecordingMessage) => {
kafkaPartition: message.kafkaPartition,
oldestOffset: message.kafkaOffset,
newestOffset: message.kafkaOffset,
oldestOriginalOffset: message.originalKafkaOffset,
}

recordingEvent.complete = message.chunkIndex === message.chunkCount - 1
recordingEvent.messages.push(message)
recordingEvent.newestOffset = message.kafkaOffset
// Ensures that a single event only comes from a single topic and therefore a single partition (because the key is the sessionId)
if (message.kafkaTopic === recordingEvent.kafkaTopic) {
recordingEvent.complete = message.chunkIndex === message.chunkCount - 1
recordingEvent.messages.push(message)
recordingEvent.newestOffset = message.kafkaOffset
if (!recordingEvent.complete) {
eventBuffers[message.eventId] = recordingEvent
} else {
if (message.chunkCount !== recordingEvent.messages.length) {
logger.error({
action: 'chunk_count_mismatch',
sessionId: message.sessionId,
eventId: message.eventId,
chunkCount: message.chunkCount,
chunkIndex: message.chunkIndex,
messageCount: recordingEvent.messages.length,
})
}
processCompleteEvent(recordingEvent)
delete eventBuffers[message.eventId]
}

if (!recordingEvent.complete) {
eventBuffers[message.eventId] = recordingEvent
snapshotMessagesProcessed.add(1)
} else {
if (message.chunkCount !== recordingEvent.messages.length) {
logger.error({
action: 'chunk_count_mismatch',
sessionId: message.sessionId,
eventId: message.eventId,
chunkCount: message.chunkCount,
chunkIndex: message.chunkIndex,
messageCount: recordingEvent.messages.length,
})
}
processCompleteEvent(recordingEvent)
delete eventBuffers[message.eventId]
logger.error({
action: 'kafka_topic_mismatch',
eventId: message.eventId,
sessionId: message.sessionId,
kafkaTopic: message.kafkaTopic,
kafkaPartition: message.kafkaPartition,
expectedKafkaTopic: recordingEvent.kafkaTopic,
expectedKafkaPartition: recordingEvent.kafkaPartition,
})
producer.send({
topic: RECORDING_EVENTS_DEAD_LETTER_TOPIC,
messages: [convertRecordingMessageToKafkaMessage(message)],
})
}

snapshotMessagesProcessed.add(1)
}

consumer.run({
Expand All @@ -290,10 +338,10 @@ consumer.run({
// TODO: Handle duplicated data being stored in the case of a consumer restart
messagesReceived.add(1)

const recordingMessage = convertKafkaMessageToRecordingMessage(message, topic, partition)
const recordingMessage = convertKafkaMessageToRecordingMessage(message, topic as KafkaTopic, partition)

const timeout_ms =
RECORDING_EVENTS_TOPICS_CONFIGS[topic as keyof typeof RECORDING_EVENTS_TOPICS_CONFIGS].timeout
// TODO Do this without timeouts so order is maintained
const timeout_ms = RECORDING_EVENTS_TOPICS_CONFIGS[topic as KafkaTopic].timeout
if (timeout_ms !== 0) {
setTimeout(() => {
handleMessage(recordingMessage)
Expand Down
18 changes: 14 additions & 4 deletions session-recordings/src/ingester/utils.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { KafkaMessage, Message } from 'kafkajs'
import { RecordingEvent, RecordingEventGroup, RecordingMessage } from '../types'
import { KafkaTopic, RecordingEvent, RecordingEventGroup, RecordingMessage } from '../types'

export const getEventGroupDataString = (recordingEventGroup: RecordingEventGroup) => {
const events = Object.values(recordingEventGroup.events)
const eventDataStrings = events
const eventDataStrings = recordingEventGroup.events
.sort((a, b) => a.timestamp - b.timestamp)
.map((event) => event.messages.map((message) => message.value).join(''))
return eventDataStrings.join('\n')
Expand Down Expand Up @@ -43,13 +42,14 @@ export const convertRecordingMessageToKafkaMessage = (recordingMessage: Recordin
unixTimestamp: recordingMessage.timestamp.toString(),
chunkCount: recordingMessage.chunkCount.toString(),
chunkIndex: recordingMessage.chunkIndex.toString(),
originalKafkaOffset: recordingMessage.originalKafkaOffset.toString(),
},
}
}

export const convertKafkaMessageToRecordingMessage = (
kafkaMessage: KafkaMessage,
topic: string,
topic: KafkaTopic,
partition: number
): RecordingMessage => {
return {
Expand All @@ -65,8 +65,18 @@ export const convertKafkaMessageToRecordingMessage = (
chunkIndex: Number.parseInt(kafkaMessage.headers.chunkIndex.toString()),
value: kafkaMessage.value.toString(),
kafkaOffset: Number.parseInt(kafkaMessage.offset),
originalKafkaOffset:
topic === 'recording_events'
? Number.parseInt(kafkaMessage.offset)
: Number.parseInt(kafkaMessage.headers.originalKafkaOffset.toString()),
kafkaTopic: topic,
kafkaPartition: partition,
kafkaKey: kafkaMessage.key?.toString(),
}
}

export const getTopicPartitionKey = (topic: string, partition: number) => `${topic}-${partition}`
export const getTopicAndPartitionFromKey = (topicPartitionKey: string) => {
const [topic, partition] = topicPartitionKey.split('-')
return { topic, partition: Number.parseInt(partition) }
}
20 changes: 14 additions & 6 deletions session-recordings/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
export type KafkaTopic =
| 'recording_events'
| 'recording_events_retry_1'
| 'recording_events_retry_2'
| 'recording_events_retry_3'

export type RecordingEventGroup = {
id: string
teamId: number
sessionId: string
distinctId: string // OK if this distinct ID changes through the recording, we just need to store a single distinct ID
// TODO: replace string[] with a file handle that we can append to
events: Record<string, RecordingEvent>
events: RecordingEvent[]
size: number
oldestEventTimestamp: number
oldestOffset: number
newestOffset: number
oldestOffsets: Record<string, number> // Key is '{topic}-{partition}'
newestOffsets: Record<string, number> // Key is '{topic}-{partition}'
oldestOriginalOffset: number // The original offset of the oldest message in the event group. Used for ordering
timer?: NodeJS.Timeout
status?: 'sending' | 'active'
}
Expand All @@ -24,9 +30,10 @@ export type RecordingEvent = {
sessionId: string
distinctId: string
teamId: number
kafkaTopic: string
kafkaTopic: KafkaTopic
kafkaPartition: number
oldestOffset: number
oldestOriginalOffset: number // The original offset of the oldest message in the event. Used for ordering
newestOffset: number
}

Expand All @@ -42,8 +49,9 @@ export type RecordingMessage = {
chunkCount: number
chunkIndex: number
teamId: number
kafkaTopic: string
kafkaTopic: KafkaTopic
kafkaPartition: number
kafkaOffset: number
originalKafkaOffset: number // If this message was re-sent on a retry topic, this is the original Kafka offset. Used for ordering
kafkaKey: string
}
4 changes: 1 addition & 3 deletions session-recordings/test/ingestion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ import { beforeEach, afterEach, expect, it, describe } from 'vitest'
import { Kafka, Partitioners, Producer } from 'kafkajs'
import http from 'http'
import { v4 as uuidv4 } from 'uuid'
import { resetIngester } from '../src/ingester/index'

declare module 'vitest' {
export interface TestContext {
producer: Produ
cer
producer: Producer
}
}

Expand Down