Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration update #46

Merged
merged 3 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,34 @@ eventually causing the system to run out of memory.
## Consumer Configuration
The Kinesis Consumer `ConsumerConfig` can be configured via HOCON configuration, which is common for Akka projects
```hocon
kinesis-consumer {
application-name = "test-app" # name of the application (consumer group)
stream-name = "test-stream" # name of the stream to connect to

position {
initial = "latest" # (latest, trim-horizon, at-timestamp). defaults to latest
time = "" # Only set if position is at-timestamp. Supports a valid Java Date parseable datetime string
com.contxt.kinesis {
consumer {
application-name = "test-app" # name of the application (consumer group)
stream-name = "test-stream" # name of the stream to connect to

position {
initial = "latest" # (latest, trim-horizon, at-timestamp) Defaults to latest.
time = "" # Only required if position is at-timestamp. Supports a valid Java Date parseable datetime string
}

# Note: Configurations below need to be in this location (com.contxt.kinesis.consumer) to be found

# Optional stats reporting class that implements com.contxt.kinesis.ConsumerStats
stats-class-name = "com.contxt.kinesis.NoopConsumerStats"

# Optional checkpoint configuration
shard-checkpoint-config {
checkpoint-period = 60 seconds
checkpoint-after-processing-nr-of-records = 10000
max-wait-for-completion-on-stream-shutdown = 5 seconds
}
}
}
```

Then configuring the consumer using `ConsumerConfig.fromConfig`.
Then configure the consumer using the convenience method `ConsumerConfig.fromConfig`.
```scala
ConsumerConfig.fromConfig(system.settings.config.getConfig("kinesis-consumer"))
ConsumerConfig.fromConfig(system.settings.config.getConfig("com.contxt.kinesis.consumer"))
```

The `ConsumerConfig` class also has methods for accepting raw AWS SDK clients which can be configured.
Expand Down
7 changes: 4 additions & 3 deletions src/it/scala/com/contxt/kinesis/TestStreamConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ case class TestStreamConfig(
streamName,
applicationName,
workerId,
kinesisClient,
DynamoDbAsyncClient.builder.region(Region.of(regionName)).build(),
CloudWatchAsyncClient.builder.region(Region.of(regionName)).build(),
initialPositionInStream,
coordinatorConfig = Some(
new CoordinatorConfig(applicationName)
Expand All @@ -52,6 +49,10 @@ case class TestStreamConfig(
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
.initialPositionInStreamExtended(initialPositionInStream)
)
)(
kinesisClient,
DynamoDbAsyncClient.builder.region(Region.of(regionName)).build(),
CloudWatchAsyncClient.builder.region(Region.of(regionName)).build()
)
}

Expand Down
171 changes: 104 additions & 67 deletions src/main/scala/com/contxt/kinesis/ConsumerConfig.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.contxt.kinesis

import java.sql.Date
import java.text.DateFormat
import java.util.UUID

Expand All @@ -21,87 +20,125 @@ case class ConsumerConfig(
kinesisClient: KinesisAsyncClient,
dynamoClient: DynamoDbAsyncClient,
cloudwatchClient: CloudWatchAsyncClient,
initialPositionInStreamExtended: InitialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST),
coordinatorConfig: Option[CoordinatorConfig] = None,
leaseManagementConfig: Option[LeaseManagementConfig] = None,
metricsConfig: Option[MetricsConfig] = None,
retrievalConfig: Option[RetrievalConfig] = None
) {
def withInitialStreamPosition(position: InitialPositionInStream): ConsumerConfig =
this.copy(initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(position))

def withInitialStreamPositionAtTimestamp(time: Date): ConsumerConfig =
this.copy(initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(time))

def withCoordinatorConfig(config: CoordinatorConfig): ConsumerConfig =
this.copy(coordinatorConfig = Some(config))

def withLeaseManagementConfig(config: LeaseManagementConfig): ConsumerConfig =
this.copy(leaseManagementConfig = Some(config))

def withMetricsConfig(config: MetricsConfig): ConsumerConfig =
this.copy(metricsConfig = Some(config))

def withRetrievalConfig(config: RetrievalConfig): ConsumerConfig =
this.copy(retrievalConfig = Some(config))

def withWorkerId(workerId: String): ConsumerConfig =
this.copy(workerId = workerId)
}
initialPositionInStreamExtended: InitialPositionInStreamExtended,
coordinatorConfig: Option[CoordinatorConfig],
leaseManagementConfig: Option[LeaseManagementConfig],
metricsConfig: Option[MetricsConfig],
retrievalConfig: Option[RetrievalConfig]
)

object ConsumerConfig {
def apply(streamName: String, appName: String): ConsumerConfig = {
val kinesisClient = KinesisAsyncClient.builder.build()
val dynamoClient = DynamoDbAsyncClient.builder.build()
val cloudWatchClient = CloudWatchAsyncClient.builder.build()

withNames(streamName, appName)(kinesisClient, dynamoClient, cloudWatchClient)
def apply(
streamName: String,
appName: String,
workerId: String = ConsumerConfig.generateWorkerId,
initialPositionInStreamExtended: InitialPositionInStreamExtended = defaultInitialPosition,
coordinatorConfig: Option[CoordinatorConfig] = None,
leaseManagementConfig: Option[LeaseManagementConfig] = None,
metricsConfig: Option[MetricsConfig] = None,
retrievalConfig: Option[RetrievalConfig] = None
)(implicit
kinesisClient: KinesisAsyncClient = defaultKinesisClient,
dynamoClient: DynamoDbAsyncClient = defaultDynamoClient,
cloudwatchClient: CloudWatchAsyncClient = defaultCloudwatchClient
): ConsumerConfig = {
ConsumerConfig(
streamName,
appName,
workerId,
kinesisClient,
dynamoClient,
cloudwatchClient,
initialPositionInStreamExtended,
coordinatorConfig,
leaseManagementConfig,
metricsConfig,
retrievalConfig
)
}

def withNames(
streamName: String,
appName: String
)(implicit kac: KinesisAsyncClient, dac: DynamoDbAsyncClient, cwac: CloudWatchAsyncClient): ConsumerConfig =
ConsumerConfig(streamName, appName, generateWorkerId(), kac, dac, cwac)

def fromConfig(config: Config)(implicit
kac: KinesisAsyncClient = null,
dac: DynamoDbAsyncClient = null,
cwac: CloudWatchAsyncClient = null
def fromConfig(
config: Config,
workerId: String = ConsumerConfig.generateWorkerId,
coordinatorConfig: Option[CoordinatorConfig] = None,
leaseManagementConfig: Option[LeaseManagementConfig] = None,
metricsConfig: Option[MetricsConfig] = None,
retrievalConfig: Option[RetrievalConfig] = None
)(implicit
kac: KinesisAsyncClient = defaultKinesisClient,
dac: DynamoDbAsyncClient = defaultDynamoClient,
cwac: CloudWatchAsyncClient = defaultCloudwatchClient
): ConsumerConfig = {
def getOpt[A](key: String, lookup: String => A): Option[A] =
if (config.hasPath(key)) Some(lookup(key)) else None
val parsedConfig = ParsedConsumerConfig(config)

ConsumerConfig(
parsedConfig.streamName,
parsedConfig.appName,
workerId,
kac,
dac,
cwac,
parsedConfig.initialPositionInStreamExtended.getOrElse(defaultInitialPosition),
coordinatorConfig,
leaseManagementConfig,
metricsConfig,
retrievalConfig
)
}

def getStringOpt(key: String): Option[String] =
getOpt(key, config.getString)
private def defaultInitialPosition: InitialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)

val StreamPositionLatest = "latest"
val StreamPositionHorizon = "trim-horizon"
val StreamPositionTimestamp = "at-timestamp"
private def defaultKinesisClient: KinesisAsyncClient =
KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder())

val streamName = config.getString("stream-name")
val name = config.getString("application-name")
val latestPos = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)
private def defaultDynamoClient: DynamoDbAsyncClient = DynamoDbAsyncClient.builder.build()

val streamPosition = getStringOpt("position.initial")
private def defaultCloudwatchClient: CloudWatchAsyncClient = CloudWatchAsyncClient.builder.build()

private def generateWorkerId: String = UUID.randomUUID().toString
}

private case class ParsedConsumerConfig(
streamName: String,
appName: String,
initialPositionInStreamExtended: Option[InitialPositionInStreamExtended]
)

private object ParsedConsumerConfig {
val KeyAppName = "application-name"
val KeyStreamName = "stream-name"
val KeyInitialPosition = "position.initial"
val KeyInitialPositionAtTimestampTime = "position.time"

val InitialPositionLatest = "latest"
val InitialPositionTrimHorizon = "trim-horizon"
val InitialPositionAtTimestamp = "at-timestamp"

def apply(config: Config): ParsedConsumerConfig = {
ParsedConsumerConfig(
streamName = config.getString(KeyStreamName),
appName = config.getString(KeyAppName),
initialPositionInStreamExtended = getStreamPosition(config)
)
}

private def getStreamPosition(config: Config): Option[InitialPositionInStreamExtended] = {
getStringOption(config, KeyInitialPosition)
.map {
case StreamPositionLatest => latestPos
case StreamPositionHorizon =>
case InitialPositionLatest =>
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)
case InitialPositionTrimHorizon =>
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
case StreamPositionTimestamp =>
case InitialPositionAtTimestamp =>
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(
DateFormat.getInstance().parse(config.getString("position.time"))
DateFormat.getInstance().parse(config.getString(KeyInitialPositionAtTimestampTime))
)
}
.getOrElse(latestPos)

val kinesisClient = Option(kac).getOrElse(KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder()))
val dynamoClient = Option(dac).getOrElse(DynamoDbAsyncClient.builder.build())
val cloudWatchClient = Option(cwac).getOrElse(CloudWatchAsyncClient.builder.build())

ConsumerConfig(streamName, name, generateWorkerId(), kinesisClient, dynamoClient, cloudWatchClient, streamPosition)
}

private def generateWorkerId(): String = UUID.randomUUID().toString
private def getStringOption(config: Config, key: String): Option[String] = {
if (config.hasPath(key)) Some(config.getString(key)) else None
}
}
3 changes: 2 additions & 1 deletion src/main/scala/com/contxt/kinesis/KinesisRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ case class KinesisRecord(
* records out of order in an asynchronous fashion. Each record must be eventually marked as processed by
* calling `markProcessed()`, or the steam must be terminated with an exception. If the stream continues
* after failing to process a record, and not marking it as processed, then no further records can be checkpointed,
* eventually causing the system to run out of memory. */
* eventually causing the system to run out of memory.
*/
def markProcessed(): Unit = completionPromise.trySuccess(Done)

private[kinesis] def offsetString: String = {
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/contxt/kinesis/KinesisSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ import scala.util.control.NonFatal
object KinesisSource {

/** Creates a Source backed by Kinesis Consumer Library, with materialized valued of Future[Done] which completes
* when the stream has terminated and the Kinesis worker has fully shutdown. */
* when the stream has terminated and the Kinesis worker has fully shutdown.
*/
def apply(kclConfig: ConsumerConfig, config: Config = ConfigFactory.load()): Source[KinesisRecord, Future[Done]] = {
val shardCheckpointConfig = ShardCheckpointConfig(config)
val consumerStats = ConsumerStats.getInstance(config)
KinesisSource(createKclWorker, kclConfig, shardCheckpointConfig, consumerStats)
}

/** Creates a Source backed by Kinesis Consumer Library, with materialized valued of Future[Done] which completes
* when the stream has terminated and the Kinesis worker has fully shutdown. */
* when the stream has terminated and the Kinesis worker has fully shutdown.
*/
def apply(
kclConfig: ConsumerConfig,
shardCheckpointConfig: ShardCheckpointConfig,
Expand Down
10 changes: 10 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
com.contxt.kinesis {
consumer {
application-name = "unit-test-app"
stream-name = "unit-test-stream"

position {
initial = "trim-horizon"
}
}
}
58 changes: 58 additions & 0 deletions src/test/scala/com/contxt/kinesis/ConfigTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.contxt.kinesis

import com.typesafe.config.ConfigFactory
import org.scalamock.scalatest.MockFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.kinesis.common.InitialPositionInStream

class ConfigTest extends AnyWordSpec with Matchers with MockFactory {

"ConsumerConfig" should {

"Use implicit clients" in {
implicit val kinesis: KinesisAsyncClient = mock[KinesisAsyncClient]
implicit val dynamo: DynamoDbAsyncClient = mock[DynamoDbAsyncClient]
implicit val cloudwatch: CloudWatchAsyncClient = mock[CloudWatchAsyncClient]

def consumerConfig =
ConsumerConfig(
streamName = "streamName",
appName = "applicationName"
)

assert(consumerConfig.kinesisClient.eq(kinesis))
assert(consumerConfig.dynamoClient.eq(dynamo))
assert(consumerConfig.cloudwatchClient.eq(cloudwatch))
}

"Read HOCON configuration" in {
val config =
ConfigFactory
.load()
.getConfig(
"com.contxt.kinesis.consumer"
)

// verify fromConfig also accepts implicits
implicit val kinesis: KinesisAsyncClient = mock[KinesisAsyncClient]
implicit val dynamo: DynamoDbAsyncClient = mock[DynamoDbAsyncClient]
implicit val cloudwatch: CloudWatchAsyncClient = mock[CloudWatchAsyncClient]

val consumerConfig = ConsumerConfig.fromConfig(config)

consumerConfig.streamName shouldBe "unit-test-stream"
consumerConfig.appName shouldBe "unit-test-app"
consumerConfig.initialPositionInStreamExtended.getInitialPositionInStream shouldBe InitialPositionInStream.TRIM_HORIZON

assert(consumerConfig.kinesisClient.eq(kinesis))
assert(consumerConfig.dynamoClient.eq(dynamo))
assert(consumerConfig.cloudwatchClient.eq(cloudwatch))
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ class KinesisSourceFactoryTest
)

private def clientConfig =
new ConsumerConfig(
ConsumerConfig(
streamName = "streamName1",
appName = "applicationName1",
workerId = "workerId",
workerId = "workerId"
)(
kinesisClient = mock[KinesisAsyncClient],
dynamoClient = mock[DynamoDbAsyncClient],
cloudwatchClient = mock[CloudWatchAsyncClient]
Expand Down