Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New admin method for fetching group offsets for multiple topics #992

Merged
merged 17 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,32 @@ await admin.fetchTopicOffsetsByTimestamp(topic, timestamp)

## <a name="fetch-offsets"></a> Fetch consumer group offsets

`fetchOffsets` returns the consumer group offset for a topic.
`fetchOffsets` returns the consumer group offset for a list of topics.

```javascript
await admin.fetchOffsets({ groupId, topic, })
await admin.fetchOffsets({ groupId, topics, })
nirga marked this conversation as resolved.
Show resolved Hide resolved
// [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// {
// topic: 'topic1',
// partitions: [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// ],
// },
// {
// topic: 'topic2',
// partitions: [
// { partition: 0, offset: '1234' },
// { partition: 1, offset: '4567' },
// ],
// },
// ]
```

Omit `topics` altogether if you want to get the consumer group offsets for all of its consumed topics.
nirga marked this conversation as resolved.
Show resolved Hide resolved

Include the optional `resolveOffsets` flag to resolve the offsets without having to start a consumer, useful when fetching directly after calling [resetOffets](#a-name-reset-offsets-a-reset-consumer-group-offsets):

```javascript
Expand Down
79 changes: 73 additions & 6 deletions src/admin/__tests__/fetchOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ const {
} = require('testHelpers')

describe('Admin', () => {
let admin, cluster, groupId, logger, topicName
let admin, cluster, groupId, logger, topicName, anotherTopicName, yetAnotherTopicName

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
anotherTopicName = `another-topic-${secureRandom()}`
yetAnotherTopicName = `yet-another-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`

await createTopic({ topic: topicName })
Expand All @@ -40,11 +42,16 @@ describe('Admin', () => {
)
})

test('throws an error if the topic name is not a valid string', async () => {
await expect(admin.fetchOffsets({ groupId: 'groupId', topic: null })).rejects.toHaveProperty(
'message',
'Invalid topic null'
)
test('throws an error if the topics argument is not a valid list', async () => {
await expect(
admin.fetchOffsets({ groupId: 'groupId', topics: topicName })
).rejects.toHaveProperty('message', `Expected topic or topics array to be set`)
nirga marked this conversation as resolved.
Show resolved Hide resolved
})

test('throws an error if both topic and topics are set', async () => {
await expect(
admin.fetchOffsets({ groupId: 'groupId', topic: topicName, topics: [topicName] })
).rejects.toHaveProperty('message', `Either topic or topics must be set, not both`)
})

test('returns unresolved consumer group offsets', async () => {
Expand All @@ -71,6 +78,66 @@ describe('Admin', () => {
expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }])
})

test('returns consumer group offsets for all topics', async () => {
await admin.setOffsets({
groupId,
topic: topicName,
partitions: [{ partition: 0, offset: 13 }],
})
await admin.setOffsets({
groupId,
topic: anotherTopicName,
partitions: [{ partition: 0, offset: 23 }],
})
await admin.setOffsets({
groupId,
topic: yetAnotherTopicName,
partitions: [{ partition: 0, offset: 42 }],
})

const offsets = await admin.fetchOffsets({
groupId,
})

// There's no guarantee for the order of topics so we compare sets to avoid flaky tests.
expect(new Set(offsets)).toEqual(
new Set([
{
topic: yetAnotherTopicName,
partitions: [{ partition: 0, offset: '42', metadata: null }],
},
{ topic: anotherTopicName, partitions: [{ partition: 0, offset: '23', metadata: null }] },
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
)
nirga marked this conversation as resolved.
Show resolved Hide resolved
})

test('returns consumer group offsets for list of topics', async () => {
await admin.setOffsets({
groupId,
topic: topicName,
partitions: [{ partition: 0, offset: 13 }],
})
await admin.setOffsets({
groupId,
topic: anotherTopicName,
partitions: [{ partition: 0, offset: 42 }],
})

const offsets = await admin.fetchOffsets({
groupId,
topics: [topicName, anotherTopicName],
})

// There's no guarantee for the order of topics so we compare sets to avoid flaky tests.
expect(new Set(offsets)).toEqual(
new Set([
{ topic: anotherTopicName, partitions: [{ partition: 0, offset: '42', metadata: null }] },
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
)
})

describe('when used with the resolvedOffsets option', () => {
let producer, consumer

Expand Down
98 changes: 63 additions & 35 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -371,60 +371,88 @@ module.exports = ({
/**
* @param {string} groupId
* @param {string} topic
* @param {string[]} topics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to write the JSDoc for this, now that the interplay between topic and topics is more complicated.

Both topic and topics are optional. If neither is set, the default is for topics to get the value []. Do you know if there's any way to express that in JSDoc, @ankon? Not a big deal, but I just don't know.

Copy link
Contributor

@ankon ankon Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, from the top of my head: I think this might be too much to express with JSDoc. There could be a way to do it in the TypeScript definition, and then you could do magic with @type here.

But it also might not be needed: If these are optional, then from the callers point that's all they need to know. The specific defaulting could just be considered an implementation detail.

I guess I would just go with

/**
 * Fetch offsets for a topic or multiple topics
 *
 * Note that `topics` will be defaulted to `[]` if neither `topic` nor `topics` is provided.
 * 
 * @param {object} options
 * @param {string} options.groupId
 * @param {string} [options.topic]
 * @param {string[]} [options.topics]
 * @param {boolean} [optins.resolveOffsets=false]
 */

I guess the reason you need both topic and topics is that the function is part of the API, i.e. you're providing backwards-compatibility. Could this not be achieved by adding a new function (say fetchTopicOffsets), and then eventually deprecate the old one?


Another approach for consideration, closer to what the .d.ts would do given your later comment :)

/**
 * @param {import("../../types").FetchOffsetsTopicOptions|import("../../types").FetchOffsetsTopicsOptions} options
 * @returns {Promise<...>}
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments as @ankon suggested, inline in the parameter list as well.

* @param {boolean} [resolveOffsets=false]
* @return {Promise}
*/
const fetchOffsets = async ({ groupId, topic, resolveOffsets = false }) => {
const fetchOffsets = async ({ groupId, topic, topics, resolveOffsets = false }) => {
if (!groupId) {
throw new KafkaJSNonRetriableError(`Invalid groupId ${groupId}`)
}

if (!topic) {
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`)
if (!topic && !topics) {
topics = []
}

const partitions = await findTopicPartitions(cluster, topic)
const coordinator = await cluster.findGroupCoordinator({ groupId })
const partitionsToFetch = partitions.map(partition => ({ partition }))
if (!topic && !Array.isArray(topics)) {
throw new KafkaJSNonRetriableError(`Expected topic or topics array to be set`)
}

if (topic && topics) {
throw new KafkaJSNonRetriableError(`Either topic or topics must be set, not both`)
}

if (topic) {
topics = [topic]
}

const coordinator = await cluster.findGroupCoordinator({ groupId })
const topicsToFetch = await Promise.all(
topics.map(async topic => {
const partitions = await findTopicPartitions(cluster, topic)
const partitionsToFetch = partitions.map(partition => ({ partition }))
return { topic, partitions: partitionsToFetch }
})
)
let { responses: consumerOffsets } = await coordinator.offsetFetch({
groupId,
topics: [{ topic, partitions: partitionsToFetch }],
topics: topicsToFetch,
})

if (resolveOffsets) {
const indexedOffsets = indexByPartition(await fetchTopicOffsets(topic))
consumerOffsets = consumerOffsets.map(({ topic, partitions }) => ({
topic,
partitions: partitions.map(({ offset, partition, ...props }) => {
let resolvedOffset = offset
if (Number(offset) === EARLIEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].low
}
if (Number(offset) === LATEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].high
}
consumerOffsets = await Promise.all(
consumerOffsets.map(async ({ topic, partitions }) => {
const indexedOffsets = indexByPartition(await fetchTopicOffsets(topic))
const recalculatedPartitions = partitions.map(({ offset, partition, ...props }) => {
let resolvedOffset = offset
if (Number(offset) === EARLIEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].low
}
if (Number(offset) === LATEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].high
}
return {
partition,
offset: resolvedOffset,
...props,
}
})

await setOffsets({ groupId, topic, partitions: recalculatedPartitions })

return {
partition,
offset: resolvedOffset,
...props,
topic,
partitions: recalculatedPartitions,
}
}),
}))
const [{ partitions }] = consumerOffsets
await setOffsets({ groupId, topic, partitions })
})
)
}

return consumerOffsets
.filter(response => response.topic === topic)
.map(({ partitions }) =>
partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
metadata: metadata || null,
}))
)
.pop()
const result = consumerOffsets.map(({ topic, partitions }) => {
const completePartitions = partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
metadata: metadata || null,
Nevon marked this conversation as resolved.
Show resolved Hide resolved
}))

return { topic, partitions: completePartitions }
})

if (topic) {
return result.pop().partitions
} else {
return result
}
}

/**
Expand Down
3 changes: 2 additions & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ export type Admin = {
fetchTopicMetadata(options?: { topics: string[] }): Promise<{ topics: Array<ITopicMetadata> }>
fetchOffsets(options: {
groupId: string
topic: string
topic?: string
Copy link
Collaborator

@Nevon Nevon Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github won't let me create a suggestion that goes outside of the lines of the diff, but there's an issue here where the return type of this function is different depending on the input, so we need to be a little more clever here. There's probably some fancy-pants thing you can do with conditional types, but we can also just use plain old method overloading:

interface FetchOffsetsBaseOptions {
  groupId: string,
  resolveOffsets?: boolean
}

export interface FetchOffsetsTopicOptions extends FetchOffsetsBaseOptions {
  topic: string
}

export interface FetchOffsetsTopicsOptions extends FetchOffsetsBaseOptions {
  topics?: string[]
}

export type Admin = {
  /**
   * @deprecated "topic: string" property now replaced by "topics: string[]"
   */
  fetchOffsets(options: FetchOffsetsTopicOptions): Promise<Array<PartitionOffset & { metadata: string | null }>>,
  fetchOffsets(options: FetchOffsetsTopicsOptions): Promise<Array<{ topic: string, partitions: Array<PartitionOffset & { metadata: string | null }>}>>,
}

This has a few benefits. The first is that the type system will prevent admin.fetchOffsets({ topic: 'foo', topics: ['bar'] }), because the topic and topics properties are mutually exclusive. The second is that it allows us to define the relationship between the input type and the return type. The return type is a bit of an abomination. If we didn't return metadata it could be simplified to just Promise<TopicOffsets[]> and Promise<PartitionOffset[]>, but alas, here we are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've opted to keep the options parameter inline though, but moved the Array<PartitionOffset & { metadata: string | null }> to a new type. LMK if you prefer the options to be extracted to a type as well.

topics?: string[]
resolveOffsets?: boolean
}): Promise<Array<SeekEntry & { metadata: string | null }>>
fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>>
Expand Down
3 changes: 3 additions & 0 deletions types/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ const runAdmin = async () => {

await admin.listTopics()

await admin.fetchOffsets({ groupId: 'test-group', topic: 'topic1' })
nirga marked this conversation as resolved.
Show resolved Hide resolved
await admin.fetchOffsets({ groupId: 'test-group', topics: ['topic1', 'topic2'] })

await admin.createTopics({
topics: [{ topic, numPartitions: 10, replicationFactor: 1 }],
timeout: 30000,
Expand Down