Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: Add support for time lag #17735

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

adithyachakilam
Copy link
Contributor

@adithyachakilam adithyachakilam commented Feb 17, 2025

Description

We really don't know how much of effort it would take to clear out certain kafka lag. Since processing time would be different for every stream. In order to correctly measure the SLAs we need to measure the lag in terms of time. This PR adds support to calculate the lag of kafka stream in terms of time.

Release note

Kafka Supervisor would now adds additional lag metric which informs how many minutes of data are we falling behind.


Key changed/added classes in this PR
  • KafkaSupervisor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adithyachakilam , thanks for the changes.
The approach makes sense to me.
But we can probably simplify it as follows:

  • Add field long timestamp to OrderedPartitionableRecord.
  • Update KafkaSupervisor.poll() to also set timestamp in the records
  • Add a config (maybe in KafkaSupervisorIOConfig) to enable emitting the new metric, mostly because I am not too sure of the impact of the additional polls.
  • Update method updatePartitionLagFromStream() as suggested below. With this approach, we can avoid additional seeks and polls and hold the lock for a shorter period of time. Kafka topics may sometimes have several partitions, and polling each one separately can be inefficient.
  • Compute the difference between highestIngestedTimestamps and latestTimestampsFromStream in method getPartitionTimeLag().
  • I would also advise adding a metric ingest/%s/updateOffsets/time in SeekableStreamSupervisor.updateCurrentAndLatestOffsets() which measures the total time spent in that method.
  • Please include results of any cluster testing in the PR description.

Please let me know what you think.

@Override
protected void updatePartitionLagFromStream()
{
    if (new config is enabled) {
        updatePartitionTimeAndRecordLagFromStream();
        return;
    }
    
    // existing code flow
}

/**
 * This method is similar to updatePartitionLagFromStream
 * but also determines time lag. Once this method has been
 * tested, we can remove the older one.
 */
private void updatePartitionTimeAndRecordLagFromStream()
{
    // NEW CODE - determine highest of the current offsets across all tasks
    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets();
    
    getRecordSupplierLock().lock();
    try {
      Set<KafkaTopicPartition> partitionIds;
      try {
        partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream());
      }
      catch (Exception e) {
        log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream());
        throw new StreamException(e);
      }
      
      // NEW CODE - seek all partitions to highest current offsets
      for (each entry in highestCurrentOffsets) {
         if (entry.getKey() is present in partitionIds) {
           recordSupplier.seek(entry.getKey(), entry.getValue());
         }
      }
      
      Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
          .stream()
          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
          .collect(Collectors.toSet());

      // NEW CODE - poll records for all partitions at highest current offsets
      final List<Record> lastIngestedRecords = recordSupplier.poll(timeout);
      
      // NEW CODE - determine max ingested timestamps for each partition using the lastIngestedRecords above
      // Make sure to filter out relevant records as consumer may have returned records that had a higher offset than the one requested

      recordSupplier.seekToLatest(partitions);

      // this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is
      // because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest
      // task offsets from the latest offsets from the stream when it is needed
          
      // NEW CODE - poll records for all partitions at the latest offsets
      final List<Record> latestRecordsInStream = recordSupplier.poll(timeout);
      
      // NEW CODE - iterate over latest records to determine latestTimestampsFromStream and latestSequencesFromStream
    }
    catch (InterruptedException e) {
      throw new StreamException(e);
    }
    finally {
      getRecordSupplierLock().unlock();
    }
  }

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done a partial review, need to take another look at the new method updatePartitionTimeAndRecordLagFromStream()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants