Skip to content

Commit

Permalink
concurrency test
Browse files Browse the repository at this point in the history
  • Loading branch information
rcmarron committed Jul 26, 2022
1 parent aefd393 commit cbdb50c
Showing 1 changed file with 58 additions and 63 deletions.
121 changes: 58 additions & 63 deletions session-recordings/test/ingestion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,69 +112,6 @@ describe.concurrent('ingester', () => {
expect(sessionRecording.events[0]).toMatchObject(JSON.parse(fullEventString))
})

// TODO: Handle this case and re-enable the test
it.skip('handles duplicate parts of chunked events', async ({ producer }) => {
const teamId = '1'
const sessionId = uuidv4()
const windowId = uuidv4()
const eventUuid = uuidv4()

const fullEventString = `{"uuid": "${eventUuid}", "type": 3, "data": {"source": 1, "positions": [{"x": 829, "y": 154, "id": 367, "timeOffset": 0}]}, "timestamp": 1657682896740}`

const firstEvent = {
partition: 0,
headers: {
unixTimestamp: '1657682896740',
eventId: eventUuid,
sessionId: sessionId,
windowId: windowId,
distinctId: '123',
chunkIndex: '0',
chunkCount: '2',
teamId: '1',
eventSource: '1',
eventType: '3',
},
value: fullEventString.slice(0, 100),
}
const [first] = await producer.send({
topic: RECORDING_EVENTS_TOPIC,
messages: [firstEvent],
})
await producer.send({
topic: RECORDING_EVENTS_TOPIC,
messages: [firstEvent],
})

await producer.send({
topic: RECORDING_EVENTS_TOPIC,
messages: [
{
partition: 0,
headers: {
unixTimestamp: '1657682896740',
eventId: eventUuid,
sessionId: sessionId,
windowId: windowId,
distinctId: '123',
chunkIndex: '1',
chunkCount: '2',
chunkOffset: first.baseOffset,
teamId: '1',
eventSource: '1',
eventType: '3',
},
value: fullEventString.slice(100),
},
],
})

const sessionRecording = await waitForSessionRecording(teamId, sessionId, eventUuid)

expect(sessionRecording.events.length).toBe(1)
expect(sessionRecording.events[0]).toMatchObject(JSON.parse(fullEventString))
})

// TODO: Handle this case and re-enable the test
it.skip('does not write incomplete events', async ({ producer }) => {
const teamId = '1'
Expand Down Expand Up @@ -286,6 +223,64 @@ describe.concurrent('ingester', () => {
)
})

it.skip('handles the timer being interrupted by a large event', async ({ producer }) => {
const teamId = '1'
const sessionId = uuidv4()
const windowId = uuidv4()
const firstEventUuid = uuidv4()

await producer.send({
topic: RECORDING_EVENTS_TOPIC,
messages: [
{
partition: 0,
headers: {
unixTimestamp: '1657682896740',
eventId: firstEventUuid,
sessionId: sessionId,
windowId: windowId,
distinctId: '123',
chunkIndex: '0',
chunkCount: '1',
teamId: '1',
eventSource: '1',
eventType: '3',
},
value: `{"uuid": "${firstEventUuid}", "type": 3, "data": {"source": 1, "positions": [{"x": 829, "y": 154, "id": 367, "timeOffset": 0}]}, "timestamp": 1657682896740}`,
},
],
})
const secondEventUuid = uuidv4()

await producer.send({
topic: RECORDING_EVENTS_TOPIC,
messages: [
{
partition: 0,
headers: {
unixTimestamp: '1657682896740',
eventId: secondEventUuid,
sessionId: sessionId,
windowId: windowId,
distinctId: '123',
chunkIndex: '0',
chunkCount: '1',
teamId: '1',
eventSource: '1',
eventType: '3',
},
value: `{"uuid": "${secondEventUuid}", "type": 3, "data": ${JSON.stringify(
Array.from(new Array(10000).keys())
)}, "timestamp": 1657682896740}`,
},
],
})

const sessionRecording = await waitForSessionRecording(teamId, sessionId, secondEventUuid)

expect(sessionRecording.events.map((event) => event.uuid)).toStrictEqual([firstEventUuid, secondEventUuid])
})

// TODO: implement producing messages for ClickHouse to read
it.skip('produces messages to Kafka for the purposes of analytics, e.g. ClickHouse queries', () => {})
})
Expand Down

0 comments on commit cbdb50c

Please sign in to comment.