Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: Add support for time lag #17735

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ These metrics apply to the [Kafka indexing service](../ingestion/kafka-ingestion
|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. |
|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. |
|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, should not be a very high number. |
|`ingest/kafka/updateOffsets/time`|Total time (in milliseconds) taken to fetch the latest offsets from Kafka stream and the ingestion tasks.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few seconds at most.|
|`ingest/kafka/lag/time`|Total lag time in milliseconds between the current message sequence number consumed by the Kafka indexing tasks and latest sequence number in Kafka across all shards. Minimum emission period for this metric is a minute. Enabled only when `pusblishLagTime` is set to true on supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max kafka retention period in milliseconds. |
|`ingest/kafka/maxLag/time`|Max lag time in milliseconds between the current message sequence number consumed by the Kafka indexing tasks and latest sequence number in Kafka across all shards. Minimum emission period for this metric is a minute. Enabled only when `pusblishLagTime` is set to true on supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max kafka retention period in milliseconds. |
|`ingest/kafka/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kafka indexing tasks and latest sequence number in Kafka across all shards. Minimum emission period for this metric is a minute. Enabled only when `pusblishLagTime` is set to true on supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max kafka retention period in milliseconds. |

### Ingestion metrics for Kinesis

Expand All @@ -232,6 +236,7 @@ These metrics apply to the [Kinesis indexing service](../ingestion/kinesis-inges
|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/updateOffsets/time`|Total time (in milliseconds) taken to fetch the latest offsets from Kafka stream and the ingestion tasks.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few seconds at most.|

### Compaction metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.druid.metadata.DynamicConfigProvider;

import javax.annotation.Nonnull;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -166,7 +167,8 @@ public List<OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEnt
record.topic(),
new KafkaTopicPartition(multiTopic, record.topic(), record.partition()),
record.offset(),
record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))
record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record)),
record.timestamp()
));
}
return polledRecords;
Expand Down Expand Up @@ -206,6 +208,21 @@ public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
return wrapExceptions(() -> consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
}

@Override
public Map<KafkaTopicPartition, Long> getLatestSequenceNumbers(Set<StreamPartition<KafkaTopicPartition>> partitions)
{
return wrapExceptions(() -> CollectionUtils.mapKeys(
consumer.endOffsets(
partitions
.stream()
.map(e -> e.getPartitionId().asTopicPartition(e.getStream()))
.collect(Collectors.toList()
)
),
p -> new KafkaTopicPartition(multiTopic, p.topic(), p.partition())
));
}

@Override
public Set<KafkaTopicPartition> getPartitionIds(String stream)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio

private final Pattern pattern;
private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
private volatile Map<KafkaTopicPartition, Long> partitionToTimeLag;

private final KafkaSupervisorSpec spec;

Expand Down Expand Up @@ -171,6 +173,7 @@ protected SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
ioConfig.getTaskDuration().getMillis() / 1000,
includeOffsets ? latestSequenceFromStream : null,
includeOffsets ? partitionLag : null,
includeOffsets ? getPartitionTimeLag() : null,
includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null,
includeOffsets ? sequenceLastUpdated : null,
spec.isSuspended(),
Expand Down Expand Up @@ -273,8 +276,7 @@ protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
@Override
protected Map<KafkaTopicPartition, Long> getPartitionTimeLag()
{
// time lag not currently support with kafka
return null;
return partitionToTimeLag;
}

// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
Expand Down Expand Up @@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}

/**
* This method is similar to updatePartitionLagFromStream
* but also determines time lag. Once this method has been
* tested, we can remove the older one.
*/
private void updatePartitionTimeAndRecordLagFromStream()
{
final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets();

getRecordSupplierLock().lock();
try {
Set<KafkaTopicPartition> partitionIds;
try {
partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream());
}
catch (Exception e) {
log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream());
throw new StreamException(e);
}

final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
.stream()
.map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
.collect(Collectors.toSet());

final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) {
if (partitionIds.contains(entry.getKey())) {
if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) {
yetToReadPartitions.add(entry.getKey());
continue;
}

recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 1);
}
}

final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = getTimestampPerPartitionAtCurrentOffset(partitions);
// TODO: this might give wierd values for lag when the tasks are yet to start processing
yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));

recordSupplier.seekToLatest(partitions);
// this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is
// because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest
// task offsets from the latest offsets from the stream when it is needed
latestSequenceFromStream = recordSupplier.getLatestSequenceNumbers(partitions);

for (Map.Entry<KafkaTopicPartition, Long> entry : latestSequenceFromStream.entrySet()) {
// if there are no messages .getEndOffset would return 0, but if there are n msgs it would return n+1
// and hence we need to seek to n - 2 to get the nth msg in the next poll.
if (entry.getValue() != 0) {
recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 2);
}
}

partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions)
.entrySet().stream().filter(e -> lastIngestedTimestamps.containsKey(e.getKey()))
.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
)
);
}
catch (InterruptedException e) {
throw new StreamException(e);
}
finally {
getRecordSupplierLock().unlock();
}
}

private Map<KafkaTopicPartition, Long> getTimestampPerPartitionAtCurrentOffset(Set<StreamPartition<KafkaTopicPartition>> allPartitions)
{
Map<KafkaTopicPartition, Long> result = new HashMap<>();
Set<StreamPartition<KafkaTopicPartition>> remainingPartitions = new HashSet<>(allPartitions);

try {
int maxPolls = 5;
while (!remainingPartitions.isEmpty() && maxPolls-- > 0) {
for (OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity> record : recordSupplier.poll(getIoConfig().getPollTimeout())) {
if (!result.containsKey(record.getPartitionId())) {
result.put(record.getPartitionId(), record.getTimestamp());
remainingPartitions.remove(new StreamPartition<>(getIoConfig().getStream(), record.getPartitionId()));
if (remainingPartitions.isEmpty()) {
break;
}
}
recordSupplier.assign(remainingPartitions);
}
}
}
finally {
recordSupplier.assign(allPartitions);
}

if (!remainingPartitions.isEmpty()) {
log.info("Couldn't fetch the latest timestamp for the following partitions: [%s]", remainingPartitions);
}
return result;
}

/**
* Fetches the latest offsets from the Kafka stream and updates the map
* {@link #latestSequenceFromStream}. The actual lag is computed lazily in
Expand All @@ -388,6 +492,11 @@ public LagStats computeLagStats()
@Override
protected void updatePartitionLagFromStream()
{
if (getIoConfig().isPublishTimeLag()) {
updatePartitionTimeAndRecordLagFromStream();
return;
}

getRecordSupplierLock().lock();
try {
Set<KafkaTopicPartition> partitionIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
private final KafkaConfigOverrides configOverrides;
private final String topic;
private final String topicPattern;
private final boolean publishTimeLag;

@JsonCreator
public KafkaSupervisorIOConfig(
Expand All @@ -72,7 +74,8 @@ public KafkaSupervisorIOConfig(
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig,
@JsonProperty("stopTaskCount") Integer stopTaskCount
@JsonProperty("stopTaskCount") Integer stopTaskCount,
@Nullable @JsonProperty("publishTimeLag") Boolean publishTimeLag
)
{
super(
Expand Down Expand Up @@ -102,6 +105,7 @@ public KafkaSupervisorIOConfig(
this.configOverrides = configOverrides;
this.topic = topic;
this.topicPattern = topicPattern;
this.publishTimeLag = Configs.valueOrDefault(publishTimeLag, false);
}

/**
Expand Down Expand Up @@ -151,6 +155,15 @@ public boolean isMultiTopic()
return topicPattern != null;
}

/**
* @return true if supervisor needs to publish the time lag.
*/
@JsonProperty
public boolean isPublishTimeLag()
{
return publishTimeLag;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public KafkaSupervisorReportPayload(
long durationSeconds,
@Nullable Map<KafkaTopicPartition, Long> latestOffsets,
@Nullable Map<KafkaTopicPartition, Long> minimumLag,
@Nullable Map<KafkaTopicPartition, Long> minimumLagMillis,
@Nullable Long aggregateLag,
@Nullable DateTime offsetsLastUpdated,
boolean suspended,
Expand All @@ -56,7 +57,7 @@ public KafkaSupervisorReportPayload(
latestOffsets,
minimumLag,
aggregateLag,
null,
minimumLagMillis,
null,
offsetsLastUpdated,
suspended,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public void testSample()
null,
null,
null,
null
null,
false
),
null,
null,
Expand Down Expand Up @@ -218,7 +219,8 @@ public void testSampleWithTopicPattern()
null,
null,
null,
null
null,
false
),
null,
null,
Expand Down Expand Up @@ -280,7 +282,8 @@ public void testSampleKafkaInputFormat()
null,
null,
null,
null
null,
false
),
null,
null,
Expand Down Expand Up @@ -384,7 +387,8 @@ public void testWithInputRowParser() throws IOException
null,
null,
null,
null
null,
false
),
null,
null,
Expand Down Expand Up @@ -568,7 +572,8 @@ public void testInvalidKafkaConfig()
null,
null,
null,
null
null,
false
),
null,
null,
Expand Down Expand Up @@ -624,7 +629,8 @@ public void testGetInputSourceResources()
null,
null,
null,
null
null,
false
),
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException
null,
null,
null,
null
null,
false
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
Expand Down Expand Up @@ -380,7 +381,8 @@ public void testIdleConfigSerde() throws JsonProcessingException
null,
null,
mapper.convertValue(idleConfig, IdleConfig.class),
null
null,
false
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
Expand Down
Loading
Loading