Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Belated changes for KCL v3 migration #56

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

rtyley
Copy link
Member

@rtyley rtyley commented Jan 17, 2025

@rtyley rtyley force-pushed the changes-for-kcl-v3-migration branch 4 times, most recently from 4b5f8be to 2b5535f Compare January 21, 2025 12:07
@rtyley rtyley force-pushed the changes-for-kcl-v3-migration branch from 2b5535f to 4e2ea55 Compare January 21, 2025 12:10

import scala.concurrent.duration._

class ContentApiFirehoseConsumer(
val kinesisStreamReaderConfig: KinesisStreamReaderConfig,
override val credentialsProvider: AwsCredentialsProvider,
val streamListener: StreamListener,
val filterProductionMonitoring: Boolean = false) extends KinesisStreamReader {
val filterProductionMonitoring: Boolean = false,
val clientVersionCompatibility: CoordinatorConfig.ClientVersionConfig // see https://github.com/guardian/content-api-firehose-client/pull/56
Copy link
Member Author

@rtyley rtyley Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration Step 3: Migration-related configuration

I haven't provided a default value for clientVersionCompatibility here, in order to force consideration in projects that use KCL in more than one place - they need to be aware that by taking this library upgrade, they're using KCL v3 everywhere, and this ClientVersionConfig setting should be made on the coordinatorConfig of every KCL client that's in the project.

Recommended value of clientVersionCompatibility

Use CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X, for at least one deploy, until all workers are emitting WorkerMetricStats that show they have upgraded.

Comment on lines +96 to +103
val ongoingCalls = ongoingProcessRecordCallsPhaser.getUnarrivedParties - 1
if (ongoingCalls > 0)
logger.info(s"Shutdown requested but waiting for processRecords() to complete - ongoingCalls=$ongoingCalls")

// Ensure that all records we've received have been processed before we call checkpoint and exit
Try(ongoingProcessRecordCallsPhaser.awaitAdvanceInterruptibly(ongoingProcessRecordCallsPhaser.arrive(), 10, SECONDS)).recover {
case e: TimeoutException => logger.error("Timeout while waiting for processRecords() to complete", e)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration Step 4: Graceful lease handoff

If you are performing any asynchronous processing, make sure that all delivered records to the downstream were processed before invoking checkpointing.

I wasn't sure how to interpret this, but I think it is possible for a call to processRecords() (invoked by software.amazon.kinesis.lifecycle.ProcessTask) to be ongoing while shutdownRequested() is called (invoked by software.amazon.kinesis.lifecycle.ShutdownNotificationTask).

If that's the case, we want to make sure we've finished our processing work in processRecords() before we checkpoint in shutdownRequested(), while being conscious that it could be occurring in a different thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant