Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dlq initial subscription support #250

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/Configuration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type ProducerConfiguration =
BlockIfQueueFull: bool
MessageEncryptor: IMessageEncryptor option
ProducerCryptoFailureAction: ProducerCryptoFailureAction
InitialSubscriptionName: string
}
member this.BatchingPartitionSwitchFrequencyIntervalMs =
this.BatchingPartitionSwitchFrequencyByPublishDelay * (int this.BatchingMaxPublishDelay.TotalMilliseconds)
Expand Down Expand Up @@ -165,6 +166,7 @@ type ProducerConfiguration =
BlockIfQueueFull = false
MessageEncryptor = None
ProducerCryptoFailureAction = ProducerCryptoFailureAction.FAIL
InitialSubscriptionName = ""
}

type ReaderConfiguration =
Expand Down
13 changes: 8 additions & 5 deletions src/Pulsar.Client/Api/ConsumerBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
let deadLettersProcessor (c: ConsumerConfiguration<'T>) (deadLettersPolicy: DeadLetterPolicy) (topic: TopicName) =
let getTopicName () =
topic.ToString()
let createProducer deadLetterTopic =
let createProducer deadLetterTopic initialSubscriptionName =
ProducerBuilder(createProducerAsync, schema)
.Topic(deadLetterTopic)
.InitialSubscriptionName(initialSubscriptionName)
.BlockIfQueueFull(false)
.CreateAsync()
DeadLetterProcessor(deadLettersPolicy, getTopicName, c.SubscriptionName, createProducer) :> IDeadLetterProcessor<'T>
Expand Down Expand Up @@ -62,19 +63,21 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
let prefixPart = c.SingleTopic.ToString() + "-" + %c.SubscriptionName
let defaultRetryLetterTopic = prefixPart + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX
let defaultDeadLetterTopic = prefixPart + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX
let defaultInitialSubscriptionName = ""
let newPolicy =
match c.DeadLetterPolicy with
| None ->
DeadLetterPolicy(RetryMessageUtil.MAX_RECONSUMETIMES, defaultDeadLetterTopic, defaultRetryLetterTopic)
DeadLetterPolicy(RetryMessageUtil.MAX_RECONSUMETIMES, defaultDeadLetterTopic, defaultRetryLetterTopic, defaultInitialSubscriptionName)
| Some policy ->
let initialSubscriptionName = if String.IsNullOrEmpty policy.InitialSubscriptionName then defaultInitialSubscriptionName else policy.InitialSubscriptionName
let isEmptyDL = String.IsNullOrEmpty policy.DeadLetterTopic
let isEmptyRL = String.IsNullOrEmpty policy.RetryLetterTopic
if isEmptyDL && isEmptyRL then
DeadLetterPolicy(policy.MaxRedeliveryCount, defaultDeadLetterTopic, defaultRetryLetterTopic)
DeadLetterPolicy(policy.MaxRedeliveryCount, defaultDeadLetterTopic, defaultRetryLetterTopic, initialSubscriptionName)
elif isEmptyDL then
DeadLetterPolicy(policy.MaxRedeliveryCount, defaultDeadLetterTopic, policy.RetryLetterTopic)
DeadLetterPolicy(policy.MaxRedeliveryCount, defaultDeadLetterTopic, policy.RetryLetterTopic, initialSubscriptionName)
elif isEmptyRL then
DeadLetterPolicy(policy.MaxRedeliveryCount, policy.DeadLetterTopic, defaultRetryLetterTopic)
DeadLetterPolicy(policy.MaxRedeliveryCount, policy.DeadLetterTopic, defaultRetryLetterTopic, initialSubscriptionName)
else
policy
{ c with
Expand Down
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/DeadLetters.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ open System.Threading.Tasks
type DeadLetterPolicy(maxRedeliveryCount: int
, [<Optional; DefaultParameterValue(null:string)>] deadLetterTopic: string
, [<Optional; DefaultParameterValue(null:string)>] retryLetterTopic: string
, [<Optional; DefaultParameterValue(null:string)>] initialSubscriptionName: string
) =
member __.MaxRedeliveryCount = maxRedeliveryCount
member __.DeadLetterTopic = deadLetterTopic
member __.RetryLetterTopic = retryLetterTopic
member __.InitialSubscriptionName = initialSubscriptionName

type IDeadLetterProcessor<'T> =
abstract member ClearMessages: unit -> unit
Expand Down
5 changes: 5 additions & 0 deletions src/Pulsar.Client/Api/ProducerBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ type ProducerBuilder<'T> private (сreateProducerAsync, config: ProducerConfigur
MessageEncryptor = Some messageEncryptor }
|> this.With

member internal this.InitialSubscriptionName initialSubscriptionName =
{ config with
InitialSubscriptionName = initialSubscriptionName }
|> this.With

member this.CreateAsync(): Task<IProducer<'T>> =
сreateProducerAsync(verify config, schema, producerInterceptors)

Expand Down
4 changes: 3 additions & 1 deletion src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,14 @@ let newLookup (topicName : CompleteTopicName) (requestId : RequestId) (authorita
command |> serializeSimpleCommand

let newProducer (topicName : CompleteTopicName) (producerName: string) (producerId : ProducerId) (requestId : RequestId)
(schemaInfo: SchemaInfo) (epoch: uint64) (txnEnabled: bool) =
(schemaInfo: SchemaInfo) (epoch: uint64) (txnEnabled: bool) (initialSubscriptionName: string) =
let schema = getProtoSchema schemaInfo
let request = CommandProducer(Topic = %topicName, ProducerId = %producerId, RequestId = %requestId,
Epoch = epoch, TxnEnabled = txnEnabled)
if producerName |> String.IsNullOrEmpty |> not then
request.ProducerName <- producerName
if initialSubscriptionName |> String.IsNullOrEmpty |> not then
request.InitialSubscriptionName <- initialSubscriptionName
if schema.``type`` <> Schema.Type.None then
request.Schema <- schema
let command = BaseCommand(``type`` = CommandType.Producer, Producer = request)
Expand Down
6 changes: 3 additions & 3 deletions src/Pulsar.Client/Internal/DeadLetters.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type internal DeadLetterProcessor<'T>
(policy: DeadLetterPolicy,
getTopicName: unit -> string,
subscriptionName: SubscriptionName,
createProducer: string -> Task<IProducer<'T>>) =
createProducer: string -> string -> Task<IProducer<'T>>) =

let topicName = getTopicName()
let store = Dictionary<MessageId, Message<'T>>()
Expand All @@ -24,11 +24,11 @@ type internal DeadLetterProcessor<'T>
$"{topicName}-{subscriptionName}{RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX}"

let dlProducer = lazy (
createProducer dlTopicName
createProducer dlTopicName policy.InitialSubscriptionName
)

let rlProducer = lazy (
createProducer policy.RetryLetterTopic
createProducer policy.RetryLetterTopic ""
)

let getOptionalKey (message: Message<'T>) =
Expand Down
2 changes: 1 addition & 1 deletion src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let requestId = Generators.getNextRequestId()
try
let payload = Commands.newProducer producerConfig.Topic.CompleteTopicName producerConfig.ProducerName
producerId requestId schema.SchemaInfo epoch clientConfig.EnableTransaction
producerId requestId schema.SchemaInfo epoch clientConfig.EnableTransaction producerConfig.InitialSubscriptionName
let! response = clientCnx.SendAndWaitForReply requestId payload
let success = response |> PulsarResponseType.GetProducerSuccess
if String.IsNullOrEmpty producerName then
Expand Down
Loading