diff --git a/docs/Admin.md b/docs/Admin.md index 02ca1ffd0..f1f932827 100644 --- a/docs/Admin.md +++ b/docs/Admin.md @@ -180,18 +180,32 @@ await admin.fetchTopicOffsetsByTimestamp(topic, timestamp) ## Fetch consumer group offsets -`fetchOffsets` returns the consumer group offset for a topic. +`fetchOffsets` returns the consumer group offset for a list of topics. ```javascript -await admin.fetchOffsets({ groupId, topic, }) +await admin.fetchOffsets({ groupId, topics: ['topic1', 'topic2'] }) // [ -// { partition: 0, offset: '31004' }, -// { partition: 1, offset: '54312' }, -// { partition: 2, offset: '32103' }, -// { partition: 3, offset: '28' }, +// { +// topic: 'topic1', +// partitions: [ +// { partition: 0, offset: '31004' }, +// { partition: 1, offset: '54312' }, +// { partition: 2, offset: '32103' }, +// { partition: 3, offset: '28' }, +// ], +// }, +// { +// topic: 'topic2', +// partitions: [ +// { partition: 0, offset: '1234' }, +// { partition: 1, offset: '4567' }, +// ], +// }, // ] ``` +Omit `topics` altogether if you want to get the consumer group offsets for all topics with committed offsets. + Include the optional `resolveOffsets` flag to resolve the offsets without having to start a consumer, useful when fetching directly after calling [resetOffets](#a-name-reset-offsets-a-reset-consumer-group-offsets): ```javascript diff --git a/src/admin/__tests__/fetchOffsets.spec.js b/src/admin/__tests__/fetchOffsets.spec.js index d11a2dbb6..39cf1fd36 100644 --- a/src/admin/__tests__/fetchOffsets.spec.js +++ b/src/admin/__tests__/fetchOffsets.spec.js @@ -11,12 +11,15 @@ const { generateMessages, testIfKafkaAtLeast_0_11, } = require('testHelpers') +const { KafkaJSNonRetriableError } = require('../../errors') describe('Admin', () => { - let admin, cluster, groupId, logger, topicName + let admin, cluster, groupId, logger, topicName, anotherTopicName, yetAnotherTopicName beforeEach(async () => { topicName = `test-topic-${secureRandom()}` + anotherTopicName = `another-topic-${secureRandom()}` + yetAnotherTopicName = `yet-another-topic-${secureRandom()}` groupId = `consumer-group-id-${secureRandom()}` await createTopic({ topic: topicName }) @@ -34,19 +37,25 @@ describe('Admin', () => { describe('fetchOffsets', () => { test('throws an error if the groupId is invalid', async () => { - await expect(admin.fetchOffsets({ groupId: null })).rejects.toHaveProperty( - 'message', + await expect(admin.fetchOffsets({ groupId: null })).rejects.toThrow( + KafkaJSNonRetriableError, 'Invalid groupId null' ) }) - test('throws an error if the topic name is not a valid string', async () => { - await expect(admin.fetchOffsets({ groupId: 'groupId', topic: null })).rejects.toHaveProperty( - 'message', - 'Invalid topic null' + test('throws an error if the topics argument is not a valid list', async () => { + await expect(admin.fetchOffsets({ groupId: 'groupId', topics: topicName })).rejects.toThrow( + KafkaJSNonRetriableError, + 'Expected topic or topics array to be set' ) }) + test('throws an error if both topic and topics are set', async () => { + await expect( + admin.fetchOffsets({ groupId: 'groupId', topic: topicName, topics: [topicName] }) + ).rejects.toThrow(KafkaJSNonRetriableError, 'Either topic or topics must be set, not both') + }) + test('returns unresolved consumer group offsets', async () => { const offsets = await admin.fetchOffsets({ groupId, @@ -71,6 +80,61 @@ describe('Admin', () => { expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }]) }) + test('returns consumer group offsets for all topics', async () => { + await admin.setOffsets({ + groupId, + topic: topicName, + partitions: [{ partition: 0, offset: 13 }], + }) + await admin.setOffsets({ + groupId, + topic: anotherTopicName, + partitions: [{ partition: 0, offset: 23 }], + }) + await admin.setOffsets({ + groupId, + topic: yetAnotherTopicName, + partitions: [{ partition: 0, offset: 42 }], + }) + + const offsets = await admin.fetchOffsets({ + groupId, + }) + + expect(offsets).toIncludeSameMembers([ + { + topic: yetAnotherTopicName, + partitions: [{ partition: 0, offset: '42', metadata: null }], + }, + { topic: anotherTopicName, partitions: [{ partition: 0, offset: '23', metadata: null }] }, + { topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] }, + ]) + }) + + test('returns consumer group offsets for list of topics', async () => { + await admin.setOffsets({ + groupId, + topic: topicName, + partitions: [{ partition: 0, offset: 13 }], + }) + await admin.setOffsets({ + groupId, + topic: anotherTopicName, + partitions: [{ partition: 0, offset: 42 }], + }) + + const offsets = await admin.fetchOffsets({ + groupId, + topics: [topicName, anotherTopicName], + }) + + // There's no guarantee for the order of topics so we compare sets to avoid flaky tests. + expect(offsets).toIncludeSameMembers([ + { topic: anotherTopicName, partitions: [{ partition: 0, offset: '42', metadata: null }] }, + { topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] }, + ]) + }) + describe('when used with the resolvedOffsets option', () => { let producer, consumer diff --git a/src/admin/index.js b/src/admin/index.js index e1c2d20f9..550dbfab0 100644 --- a/src/admin/index.js +++ b/src/admin/index.js @@ -369,62 +369,94 @@ module.exports = ({ } /** + * Fetch offsets for a topic or multiple topics + * + * Note: set either topic or topics but not both. + * * @param {string} groupId - * @param {string} topic + * @param {string} topic - deprecated, use the `topics` parameter. Topic to fetch offsets for. + * @param {string[]} topics - list of topics to fetch offsets for, defaults to `[]` which fetches all topics for `groupId`. * @param {boolean} [resolveOffsets=false] * @return {Promise} */ - const fetchOffsets = async ({ groupId, topic, resolveOffsets = false }) => { + const fetchOffsets = async ({ groupId, topic, topics, resolveOffsets = false }) => { if (!groupId) { throw new KafkaJSNonRetriableError(`Invalid groupId ${groupId}`) } - if (!topic) { - throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`) + if (!topic && !topics) { + topics = [] } - const partitions = await findTopicPartitions(cluster, topic) - const coordinator = await cluster.findGroupCoordinator({ groupId }) - const partitionsToFetch = partitions.map(partition => ({ partition })) + if (!topic && !Array.isArray(topics)) { + throw new KafkaJSNonRetriableError(`Expected topic or topics array to be set`) + } + + if (topic && topics) { + throw new KafkaJSNonRetriableError(`Either topic or topics must be set, not both`) + } + if (topic) { + topics = [topic] + } + + const coordinator = await cluster.findGroupCoordinator({ groupId }) + const topicsToFetch = await Promise.all( + topics.map(async topic => { + const partitions = await findTopicPartitions(cluster, topic) + const partitionsToFetch = partitions.map(partition => ({ partition })) + return { topic, partitions: partitionsToFetch } + }) + ) let { responses: consumerOffsets } = await coordinator.offsetFetch({ groupId, - topics: [{ topic, partitions: partitionsToFetch }], + topics: topicsToFetch, }) if (resolveOffsets) { - const indexedOffsets = indexByPartition(await fetchTopicOffsets(topic)) - consumerOffsets = consumerOffsets.map(({ topic, partitions }) => ({ - topic, - partitions: partitions.map(({ offset, partition, ...props }) => { - let resolvedOffset = offset - if (Number(offset) === EARLIEST_OFFSET) { - resolvedOffset = indexedOffsets[partition].low - } - if (Number(offset) === LATEST_OFFSET) { - resolvedOffset = indexedOffsets[partition].high - } + consumerOffsets = await Promise.all( + consumerOffsets.map(async ({ topic, partitions }) => { + const indexedOffsets = indexByPartition(await fetchTopicOffsets(topic)) + const recalculatedPartitions = partitions.map(({ offset, partition, ...props }) => { + let resolvedOffset = offset + if (Number(offset) === EARLIEST_OFFSET) { + resolvedOffset = indexedOffsets[partition].low + } + if (Number(offset) === LATEST_OFFSET) { + resolvedOffset = indexedOffsets[partition].high + } + return { + partition, + offset: resolvedOffset, + ...props, + } + }) + + await setOffsets({ groupId, topic, partitions: recalculatedPartitions }) + return { - partition, - offset: resolvedOffset, - ...props, + topic, + partitions: recalculatedPartitions, } - }), - })) - const [{ partitions }] = consumerOffsets - await setOffsets({ groupId, topic, partitions }) + }) + ) } - return consumerOffsets - .filter(response => response.topic === topic) - .map(({ partitions }) => - partitions.map(({ partition, offset, metadata }) => ({ - partition, - offset, - metadata: metadata || null, - })) - ) - .pop() + const result = consumerOffsets.map(({ topic, partitions }) => { + const completePartitions = partitions.map(({ partition, offset, metadata }) => ({ + partition, + offset, + metadata: metadata || null, + })) + + return { topic, partitions: completePartitions } + }) + + if (topic) { + return result.pop().partitions + } else { + return result + } } /** diff --git a/types/index.d.ts b/types/index.d.ts index 104acc783..873ae9fac 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -356,6 +356,7 @@ export type RequestQueueSizeEvent = InstrumentationEvent<{ export type SeekEntry = PartitionOffset +export type FetchOffsetsPartitions = Array export interface Acl { principal: string host: string @@ -436,7 +437,12 @@ export type Admin = { groupId: string topic: string resolveOffsets?: boolean - }): Promise> + }): Promise + fetchOffsets(options: { + groupId: string + topics?: string[] + resolveOffsets?: boolean + }): Promise> fetchTopicOffsets(topic: string): Promise> fetchTopicOffsetsByTimestamp(topic: string, timestamp?: number): Promise> describeCluster(): Promise<{ diff --git a/types/tests.ts b/types/tests.ts index 03d16c118..4b8cd19b5 100644 --- a/types/tests.ts +++ b/types/tests.ts @@ -180,6 +180,10 @@ const runAdmin = async () => { await admin.listTopics() + await admin.fetchOffsets({ groupId: 'test-group' }) + await admin.fetchOffsets({ groupId: 'test-group', topic: 'topic1' }) + await admin.fetchOffsets({ groupId: 'test-group', topics: ['topic1', 'topic2'] }) + await admin.createTopics({ topics: [{ topic, numPartitions: 10, replicationFactor: 1 }], timeout: 30000,