Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retain message, max delivery, empty header #181

Draft
wants to merge 27 commits into
base: master
Choose a base branch
from

Conversation

shellphy
Copy link

@shellphy shellphy commented Nov 15, 2024

Reason for This PR

purge will cause Message deleted, so remove purge, if you do not want to persist message, use only delete_stream_on_stop

The default ack timeout for nats is 30 seconds. If there is no ack after 30 seconds, the message will be delivered again, this is a bug, resulting in a duplicate task, so MaxDeliver = 1 option shoud be add. refer to this document.

Description of Changes

License Acceptance

By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.

PR Checklist

[Author TODO: Meet these criteria.]
[Reviewer TODO: Verify that these criteria are met. Request changes if not]

  • All commits in this PR are signed (git commit -s).
  • The reason for this PR is clearly provided (issue no. or explanation).
  • The description of changes is clear and encompassing.
  • Any required documentation changes (code and docs) are included in this PR.
  • Any user-facing changes are mentioned in CHANGELOG.md.
  • All added/changed functionality is tested.

Summary by CodeRabbit

  • New Features

    • Introduced a configuration option to retain stream messages after the driver stops, enhancing message lifecycle management.
    • Added a maximum delivery count setting to prevent duplicate task processing in message consumption.
    • Added a new JsonItem type for improved serialization of job items with headers.
  • Bug Fixes

    • Enhanced error handling and data unpacking logic for improved robustness in processing job items.
  • Documentation

    • Improved comments and formatting for better clarity.

Copy link

coderabbitai bot commented Nov 15, 2024

Walkthrough

The changes introduce a new boolean field retainStreamMessages to the Driver struct in the natsjobs package, impacting the retention of messages in streams when the driver is stopped. Additionally, the listenerInit method is modified to include a MaxDeliver configuration option in the jetstream.ConsumerConfig struct, which controls message redelivery behavior. The Push, requeue, and unpack methods are updated to utilize a new JsonItem struct for improved data handling. Overall, these modifications enhance message lifecycle management without introducing new methods or altering existing public entity declarations.

Changes

File Change Summary
natsjobs/driver.go Added retainStreamMessages boolean field to Driver struct; modified Stop method to check this field before purging stream messages. Updated FromConfig, FromPipeline, Push, and requeue methods to initialize and marshal JsonItem.
natsjobs/listener.go Modified listenerInit method to add MaxDeliver option to jetstream.ConsumerConfig struct, setting its value to 1 to prevent duplicate task processing. No changes to listenerStart.
natsjobs/item.go Introduced new JsonItem type that embeds Item struct and adds Headers field for JSON serialization of headers.
natsjobs/unpack.go Updated unpack method to use JsonItem for unmarshalling JSON data and populating item headers.

Poem

In the meadow where messages flow,
A new field in the driver does glow.
With retainStreamMessages in sight,
Our streams will hold tight,
While MaxDeliver keeps tasks in tow,
No duplicates here, just a smooth show! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (4)
natsjobs/driver.go (1)

410-413: Consider potential race condition

The atomic load of c.listeners and the check of c.retainStreamMessages are not performed atomically. While this might not cause issues in practice due to the Stop method's usage pattern, consider using a mutex to ensure thread safety.

-if atomic.LoadUint32(&c.listeners) > 0 && !c.retainStreamMessages {
+c.consumerLock.RLock()
+shouldPurge := atomic.LoadUint32(&c.listeners) > 0 && !c.retainStreamMessages
+c.consumerLock.RUnlock()
+if shouldPurge {
   err := c.stream.Purge(ctx)
   if err != nil {
     c.log.Error("drain error", zap.Error(err))
   }
   c.stopCh <- struct{}{}
}
natsjobs/item.go (3)

6-6: File is not formatted with goimports.

Please run goimports to format the code and ensure imports are correctly grouped and ordered.

🧰 Tools
🪛 golangci-lint

6-6: File is not goimports-ed

(goimports)


113-137: Use appropriate logging levels for heartbeat messages.

Currently, the heartbeat is logged at the Error level, which may not be appropriate for regular operation. Consider using log.Info or log.Debug for heartbeat messages to reduce noise in the logs and reserve log.Error for actual error conditions.

Apply this diff to adjust the logging level:

-    				log.Error("heartbeat")
+    				log.Info("heartbeat")

118-118: Consider making the heartbeat interval configurable.

The heartbeat interval is hardcoded to 20 seconds. To provide flexibility and accommodate different NATS timeout settings, consider making the heartbeat interval a configurable parameter.

You can modify the code as follows:

Add a new field in the Options struct:

+    	HeartbeatInterval time.Duration

Update the startHeartbeat method to use the configurable interval:

-    		ticker := time.NewTicker(20 * time.Second)
+    		ticker := time.NewTicker(i.Options.HeartbeatInterval)

Ensure that HeartbeatInterval is set during initialization, possibly with a default value if not specified.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between fb53018 and 908c8f2.

⛔ Files ignored due to path filters (1)
  • go.work.sum is excluded by !**/*.sum
📒 Files selected for processing (5)
  • natsjobs/config.go (1 hunks)
  • natsjobs/driver.go (4 hunks)
  • natsjobs/item.go (4 hunks)
  • natsjobs/listener.go (1 hunks)
  • schema.json (1 hunks)
🧰 Additional context used
🪛 golangci-lint
natsjobs/item.go

6-6: File is not goimports-ed

(goimports)

🔇 Additional comments (10)
natsjobs/config.go (2)

32-33: Clarify interaction between DeleteStreamOnStop and RetainStreamMessages

These two boolean flags appear to have overlapping or potentially conflicting behaviors:

  • DeleteStreamOnStop: Deletes the stream when stopping
  • RetainStreamMessages: Retains messages in the stream

Please clarify how these flags interact:

  1. What happens if both are true?
  2. Should they be mutually exclusive?
  3. Which flag takes precedence?

Consider adding validation logic to prevent conflicting configurations.

#!/bin/bash
# Look for any validation or flag interaction handling
rg -A 5 "DeleteStreamOnStop|RetainStreamMessages" --type go

17-17: Verify configuration key naming

The PR objectives mention reserve_stream_on_stop as the configuration key, but the code implements retain_stream_messages. This inconsistency could lead to confusion for users following the PR documentation.

natsjobs/listener.go (3)

70-72: Verify interaction between heartbeat and message retention.

The PR adds message retention functionality, but we need to ensure the heartbeat mechanism properly interacts with it.

Let's verify the retention implementation:

#!/bin/bash
# Description: Check retention implementation and shutdown handling
# Expected: Find proper cleanup of heartbeat on shutdown

# Search for retention configuration
rg 'retain.*stream.*message'

# Check shutdown handling
ast-grep --pattern 'func ($_ *Driver) Stop() {
  $$$
}'

Consider these architectural points:

  1. Ensure heartbeat is properly stopped when the driver is stopped
  2. Document the interaction between message retention and heartbeat in comments
  3. Add metrics/logging to track heartbeat status for retained messages

70-72: 🛠️ Refactor suggestion

Verify heartbeat implementation and consider adding timeout configuration.

The heartbeat mechanism is a good addition to prevent duplicate tasks due to message timeouts. However, there are a few considerations:

  1. The heartbeat is started before context propagation, which means it might continue even if the context is canceled.
  2. There's no configurable timeout for the heartbeat interval.

Let's verify the heartbeat implementation in the Item struct:

#!/bin/bash
# Description: Check the heartbeat implementation details
# Expected: Find the heartbeat implementation with proper context handling

# Search for the heartbeat implementation
ast-grep --pattern 'func (i *Item) startHeartbeat($_) {
  $$$
}'

# Check for context usage in heartbeat
rg -A 10 'startHeartbeat'

Consider these improvements:

  1. Start the heartbeat after context propagation to ensure proper cancellation
  2. Add configurable heartbeat interval:
-item.startHeartbeat(c.log)
+ctx := c.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers))
+ctx, span := c.tracer.Tracer(tracerName).Start(ctx, "nats_listener")
+item.startHeartbeat(ctx, c.log)

70-71: Consider adding error handling for heartbeat initialization.

While simplifying error handling is good, we should ensure failures are properly handled to prevent message redelivery issues.

Let's check the error handling in the codebase:

Consider adding basic error handling:

-item.Options.inProgressFunc = m.InProgress
-item.startHeartbeat(c.log)
+item.Options.inProgressFunc = m.InProgress
+if err := item.startHeartbeat(c.log); err != nil {
+    c.log.Error("failed to start heartbeat", zap.Error(err))
+    // Consider if we should Nak the message here
+}
natsjobs/driver.go (5)

57-65: LGTM: Driver struct changes are well-organized

The new retainStreamMessages field is appropriately placed in the config section and follows Go naming conventions.


410-413: LGTM: Stream retention logic is correctly implemented

The conditional stream purge based on retainStreamMessages aligns with the PR objectives.


227-236: Verify intentional difference in default values

The retainStreamMessages default value is set to true in FromPipeline but might be different in FromConfig. This could lead to inconsistent behavior depending on how the driver is initialized.

#!/bin/bash
# Description: Compare default values between config and pipeline initialization
# Expected: Find default value in config initialization

# Search for default value initialization in config
rg "RetainStreamMessages.*default" 

145-154: Verify config parsing implementation

The initialization of retainStreamMessages looks correct. Let's verify the config struct implementation.

✅ Verification successful

Config parsing implementation is correctly implemented

The retainStreamMessages field is properly defined in the config struct with the correct mapstructure tag retain_stream_messages. The implementation is consistent across:

  • Config struct definition in natsjobs/config.go
  • Schema definition in schema.json
  • Driver initialization using the config value
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify config struct implementation for retain_stream_messages
# Expected: Find the config struct definition with the new field

# Search for config struct definition
rg -A 10 "type\s+config\s+struct" 

Length of output: 662


Script:

#!/bin/bash
# Let's search for the complete config struct definition to find retainStreamMessages
rg -A 20 "type\s+config\s+struct" 

# Also search for any references to retain_stream_messages in the codebase
rg "retain_stream_messages" -i

Length of output: 1384


Line range hint 1-1: Missing heartbeat implementation

The PR objectives mention adding a heartbeat mechanism to handle the 30-second NATS timeout, but the implementation for this feature is missing in the changes.

Would you like help implementing the heartbeat mechanism mentioned in the PR objectives?

natsjobs/config.go Outdated Show resolved Hide resolved
schema.json Outdated Show resolved Hide resolved
natsjobs/config.go Outdated Show resolved Hide resolved
natsjobs/driver.go Outdated Show resolved Hide resolved
natsjobs/item.go Outdated
@@ -107,7 +110,41 @@ func (i *Item) Context() ([]byte, error) {
return ctx, nil
}

func (i *Item) startHeartbeat(log *zap.Logger) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, refer to the NATS docs about hearbeats.

natsjobs/item.go Outdated Show resolved Hide resolved
natsjobs/item.go Outdated Show resolved Hide resolved
natsjobs/listener.go Outdated Show resolved Hide resolved
schema.json Outdated Show resolved Hide resolved
@shellphy
Copy link
Author

shellphy commented Nov 15, 2024

I set MaxDeliver = 1, so heartbeat is not needed

@shellphy shellphy changed the title retain message and add heartbeat retain message、max deliver、empty header Nov 16, 2024
@shellphy shellphy changed the title retain message、max deliver、empty header fix: retain message、max deliver、empty header Nov 16, 2024
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (2)
natsjobs/item.go (1)

31-34: Consider header access consistency and thread safety

The introduction of JsonItem with a public Headers field alongside Item's private headers creates potential issues:

  1. Inconsistent access patterns (direct vs method-based)
  2. Possible race conditions with concurrent modifications
  3. Risk of header state divergence

Consider these alternatives:

  1. Use JSON struct tags on the private headers field
  2. Implement custom MarshalJSON/UnmarshalJSON methods
  3. Add synchronization if concurrent access is needed
🧰 Tools
🪛 golangci-lint

[warning] 31-31: var-naming: type JsonItem should be JSONItem

(revive)

natsjobs/driver.go (1)

Implementation does not match PR objectives

The codebase analysis confirms that the message retention configuration and heartbeat functionality are not implemented:

  • The configuration struct in natsjobs/config.go does not contain any fields for message retention settings.
  • Stream configurations in natsjobs/driver.go and natsjobs/listener.go do not include retention-related parameters.
  • While MaxDeliver: 1 is set in natsjobs/listener.go, there are no heartbeat-related configurations or implementations.

Please implement:

  • Add the proposed retain_stream_messages configuration to the config struct
  • Implement the retention logic in the stream configuration
  • Document why heartbeat is not needed if that's the intended approach
🔗 Analysis chain

Line range hint 1-1: Implementation does not match PR objectives

The PR objectives mention adding message retention configuration (retain_stream_messages) and heartbeat functionality, but these changes are not visible in the implementation. While the author's comment suggests that heartbeat isn't needed with MaxDeliver = 1, the message retention feature is still missing.

Let's verify if these features exist elsewhere:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Look for retention and heartbeat related code
# Test: Search for retention configuration
rg -i 'retain.*stream.*message'

# Test: Search for heartbeat related code
rg -i 'heartbeat'

# Test: Search for MaxDeliver configuration
ast-grep --pattern 'MaxDeliver.*=.*1'

Length of output: 91


Script:

#!/bin/bash
# Let's try a broader search for retention and configuration related code
# Search for retention-related terms
rg -i 'retention|retain'

# Search for stream configuration
rg -i 'stream.*config|config.*stream'

# Search for MaxDeliver in any form
rg -i 'maxdeliver'

# Look for configuration structs
ast-grep --pattern 'type $_ struct {
  $$$
}'

Length of output: 6049

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 60be35e and abe45fb.

📒 Files selected for processing (3)
  • natsjobs/driver.go (2 hunks)
  • natsjobs/item.go (1 hunks)
  • natsjobs/unpack.go (2 hunks)
🧰 Additional context used
🪛 golangci-lint
natsjobs/item.go

[warning] 31-31: var-naming: type JsonItem should be JSONItem

(revive)

🔇 Additional comments (4)
natsjobs/unpack.go (2)

Line range hint 1-30: Verify alignment with PR objectives.

While the code changes improve message handling, I don't see direct implementation of the message retention functionality mentioned in the PR objectives (retain_stream_messages: true). Could you clarify how this file's changes contribute to the message retention feature?

#!/bin/bash
# Description: Look for retention-related configuration and implementation

# Search for retention-related configuration
rg -i 'retain.*stream|stream.*retain' --type go --type yaml

# Search for JetStream configuration
ast-grep --pattern 'jetstream.Config{
  $$$
}'

15-16: LGTM! Consider verifying performance impact.

The introduction of the auxiliary JsonItem structure improves the separation of concerns and makes the code more maintainable. However, since this is in the message processing path, it's worth verifying that the additional allocation doesn't impact performance in high-throughput scenarios.

✅ Verification successful

Based on the analysis of the codebase, I can now generate the final response:

Performance impact is negligible, change is safe

The JsonItem auxiliary structure is used consistently throughout the codebase and follows a common pattern for JSON unmarshaling in Go. The implementation is lightweight with minimal overhead:

  • The struct only adds a single Headers field to the base Item
  • Used only during unmarshaling, not kept in memory
  • No evidence of high-throughput performance requirements in tests
  • Similar unmarshaling pattern used elsewhere in the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Look for benchmark tests that could be affected by this change

# Search for benchmark tests related to message processing
rg -l 'func.*Benchmark.*' --type go

# Search for any performance-related comments or documentation
rg -i '(performance|benchmark|throughput)' --type go

Length of output: 274


Script:

#!/bin/bash
# Let's try a different approach to find performance-related code and tests

# Look for benchmark tests
fd -e go -x grep -l "func.*Benchmark" {}

# Look for any test files that might contain performance tests
fd "_test.go$"

# Look for any performance-related comments
rg -i "performance|throughput" --type go

# Check if there are any existing JsonItem usages or similar patterns
rg "type.*JsonItem" --type go
rg "json.Unmarshal.*aux" --type go -A 2

Length of output: 419


Script:

#!/bin/bash
# Let's examine the test files and JsonItem implementation for better context

# Check the JsonItem struct definition
cat natsjobs/item.go

# Look at the test files to understand usage patterns and any performance considerations
cat tests/jobs_nats_test.go
cat tests/jobs_nats_durability_test.go

# Check for any other similar JSON unmarshaling patterns in the codebase
ast-grep --pattern 'json.Unmarshal($_, $_)'

Length of output: 33813

natsjobs/item.go (1)

31-34: Verify alignment with PR objectives

The introduction of JsonItem appears to be supporting infrastructure, but the connection to the PR's main objectives (message retention and heartbeat mechanism) isn't clear. Could you clarify:

  1. How does this change support message retention after pipeline stop?
  2. Is this serialization change related to the heartbeat implementation?

Let's check for related changes in other files:

🧰 Tools
🪛 golangci-lint

[warning] 31-31: var-naming: type JsonItem should be JSONItem

(revive)

natsjobs/driver.go (1)

254-257: Verify consumer-side compatibility with the new message format

The message format has been changed to include both item and headers in a single JSON object. While this is a good practice for data cohesion, we need to ensure that all consumers are updated to handle this new format.

Let's verify the consumer-side implementation:

✅ Verification successful

Message format change is properly handled

The verification shows that the message format change is safe and properly implemented:

  • JsonItem is well-defined in natsjobs/item.go with proper JSON tags
  • The unmarshalling is handled correctly in unpack() method which properly deserializes both the item and headers
  • The same JsonItem structure is used consistently in both publishing (Push) and requeuing operations
  • The change maintains backward compatibility as the unmarshalling gracefully handles the payload even when it doesn't match the expected format (falls back to creating a new Item with raw payload)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for JsonItem struct definition and its usage in consumer code
# Test: Look for JsonItem struct definition
ast-grep --pattern 'type JsonItem struct {
  $$$
}'

# Test: Look for JsonItem unmarshalling in consumer code
rg -A 5 'json\.Unmarshal.*JsonItem'

Length of output: 249


Script:

#!/bin/bash
# Let's check how the JsonItem is used in consumer/subscriber code
rg -A 10 "Subscribe|QueueSubscribe|Pull" natsjobs/

# Check for any unmarshalling of messages in the codebase
rg -A 5 "json\.Unmarshal.*\(" natsjobs/

# Look for any message handling or processing code
ast-grep --pattern 'func ($_) Handle($$$) {
  $$$
}'

Length of output: 438


Script:

#!/bin/bash
# Let's check the implementation of message processing in the driver
rg -A 15 "func.*Process" natsjobs/

# Check how messages are consumed/pulled
rg -A 15 "Pull.*Message|Fetch.*Message" natsjobs/

# Look at the implementation of JsonItem usage
rg -C 10 "JsonItem" natsjobs/

Length of output: 3993

natsjobs/unpack.go Show resolved Hide resolved
natsjobs/item.go Show resolved Hide resolved
natsjobs/driver.go Show resolved Hide resolved
@@ -28,6 +28,11 @@ type Item struct {
Options *Options `json:"options,omitempty"`
}

type JsonItem struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Author

@shellphy shellphy Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found an another bug where header fields are lost during NATS message passing. The issue occurs because the Item struct has an unexported headers field that won't be included in JSON marshaling:

type Item struct {
    Job     string              `json:"job"`
    Ident   string             `json:"id"`
    Payload []byte             `json:"payload"`
    headers map[string][]string  // unexported, gets dropped during marshaling
    Options *Options           `json:"options,omitempty"`
}

Interestingly, the Context() method already handles headers correctly through a custom JSON struct, but this wasn't implemented for the main message passing to nats. To fix this, we can add:

type JsonItem struct {
    *Item
    Headers map[string][]string `json:"headers"`
}

This simple solution ensures headers are properly preserved when passing messages through NATS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a simple solution, not a solution at all. Headers should be passed via NATS message headers and then populated via unpack method. I agree, that atm, headers are not properly passed, but they should not be passed like this.
The simple solution is the following: pass headers to the NATS message in the Push method. Get them in the listener m.Headers() and pass into the Item.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this: #182

@rustatian rustatian changed the title fix: retain message、max deliver、empty header fix: retain message, max delivery, empty header Nov 16, 2024
@rustatian
Copy link
Member

I set MaxDeliver = 1, so heartbeat is not needed

This is not a good practice to hide some behavior. If you wanted to expose this option - it should be exposed via configuration, but not hidden like this.

@rustatian rustatian marked this pull request as draft November 16, 2024 23:56
@shellphy
Copy link
Author

I set MaxDeliver = 1, so heartbeat is not needed

This is not a good practice to hide some behavior. If you wanted to expose this option - it should be exposed via configuration, but not hidden like this.

I understand your perspective, but implementing this option would introduce unnecessary complexity without significant practical benefits. While adding this limitation would align with other job drivers' behavior, it goes against Roadrunner's core strength - providing a clean abstraction layer that elegantly encapsulates implementation details. The power of Roadrunner lies precisely in its ability to simplify complex workflows through unified abstractions.

@rustatian
Copy link
Member

Yes, simplify complex workflows, you're right. But this is not the same - introduce some hidden behavior. For you, for example, max_deliver: 1 is sufficient. But imagine cases, when the user wants to re-deliver the message until it'd be successfully processed (Acked). In this case, you introduced a breaking change. This is a good option, but should not be hardcoded. It should be moved to the configuration + added docs for it + added to the schema.json.

@rustatian rustatian added the bug Something isn't working label Nov 17, 2024
@shellphy
Copy link
Author

Indeed, it should be made configurable. However, if a task times out, NATS will resend the message, and RoadRunner will spawn a new worker to handle it. Should we keep track of the task execution status in PHP to address this issue?

@rustatian
Copy link
Member

RR does not spawn workers per-request in a regular mode. Only in the debug mode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: 🏗 In progress
Development

Successfully merging this pull request may close these issues.

2 participants