-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Validate node ID, sequence ID and timestamp on synced payloads #311
Conversation
WalkthroughThe pull request introduces modifications to SQL queries and their corresponding Go functions, primarily focusing on the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
var lastSequenceID uint64 = 0 | ||
var lastNs int64 = 0 | ||
if stream.lastEnvelope != nil { | ||
lastSequenceID = stream.lastEnvelope.OriginatorSequenceID() | ||
lastNs = stream.lastEnvelope.OriginatorNs() | ||
} | ||
if env.OriginatorSequenceID() != lastSequenceID+1 || env.OriginatorNs() < lastNs { | ||
// TODO(rich) Submit misbehavior report and continue | ||
s.log.Error("Received out of order envelope") | ||
} | ||
|
||
if env.OriginatorSequenceID() > lastSequenceID { | ||
stream.lastEnvelope = env | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we check this anywhere at stream startup? What if we rebuild the stream after network failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at where the originatorStream
is constructed, we are fetching the last payload out of the database before stream construction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (6)
pkg/sync/syncWorker.go (3)
36-41
: Consider adding documentation comments for the neworiginatorStream
structThe newly introduced
originatorStream
struct encapsulates stream-related data. Adding a comment explaining its purpose and usage would improve code readability and maintainability.
Line range hint
280-320
: Handle potential nil pointer dereference insetupStream
In the
setupStream
function, when iterating overresult
to assignlastEnvelope
, there is no check for a nillastEnvelope
. IfenvUtils.NewOriginatorEnvelopeFromBytes
returns an error,lastEnvelope
could be nil, potentially leading to a nil pointer dereference later.Consider adding error handling to address this issue:
originatorStream := &originatorStream{nodeID: nodeID, stream: stream} for _, row := range result { if uint32(row.OriginatorNodeID) == nodeID { lastEnvelope, err := envUtils.NewOriginatorEnvelopeFromBytes(row.OriginatorEnvelope) if err != nil { return nil, err } + if lastEnvelope == nil { + s.log.Error("Last envelope is nil") + continue + } originatorStream.lastEnvelope = lastEnvelope } }
Line range hint
324-339
: Handle errors returned byvalidateAndInsertEnvelope
In the
listenToStream
function, thevalidateAndInsertEnvelope
method is called, but any errors it returns are not handled. This could lead to silent failures during envelope validation and insertion.Consider modifying the code to handle errors:
for _, env := range envs.Envelopes { - s.validateAndInsertEnvelope(stream, env) + err := s.validateAndInsertEnvelope(stream, env) + if err != nil { + s.log.Error("Failed to validate and insert envelope", zap.Error(err)) + // Decide whether to continue or return the error + } }pkg/envelopes/originator.go (1)
61-64
: Add documentation for the newOriginatorNs
methodThe
OriginatorNs
method provides access to the originator's timestamp but lacks a comment explaining its purpose. Adding documentation would enhance code readability.Consider adding a comment:
// OriginatorNs returns the timestamp of the originator envelope in nanoseconds. func (o *OriginatorEnvelope) OriginatorNs() int64 { return o.UnsignedOriginatorEnvelope.OriginatorNs() }pkg/db/queries/queries.sql.go (2)
342-356
: Update struct tags forSelectVectorClockRow
The
SelectVectorClockRow
struct now includes theOriginatorEnvelope
field but lacks struct tags that might be necessary for JSON marshalling or other operations. Ensure consistency with other structs.Consider adding struct tags if applicable:
type SelectVectorClockRow struct { OriginatorNodeID int32 `json:"originator_node_id"` OriginatorSequenceID int64 `json:"originator_sequence_id"` OriginatorEnvelope []byte `json:"originator_envelope"` }
Line range hint
368-374
: Handle potential errors when scanning rowsIn the
SelectVectorClock
method, ensure that all errors during row scanning are handled. Currently, ifrows.Scan
returns an error, it immediately returns, but there might be resources to clean up or additional logging needed.Consider enhancing error handling:
for rows.Next() { var i SelectVectorClockRow if err := rows.Scan(&i.OriginatorNodeID, &i.OriginatorSequenceID, &i.OriginatorEnvelope); err != nil { rows.Close() s.log.Error("Failed to scan row", zap.Error(err)) return nil, err } items = append(items, i) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
pkg/proto/mls/message_contents/content.pb.go
is excluded by!**/*.pb.go
,!pkg/proto/**
📒 Files selected for processing (4)
pkg/db/queries.sql
(1 hunks)pkg/db/queries/queries.sql.go
(2 hunks)pkg/envelopes/originator.go
(1 hunks)pkg/sync/syncWorker.go
(8 hunks)
🔇 Additional comments (1)
pkg/db/queries.sql (1)
49-57
: Verify the impact of removing the GROUP BY
clause
The previous version of the SelectVectorClock
query used GROUP BY
to aggregate sequence IDs. Changing to DISTINCT ON
affects how duplicates are handled. Ensure that this modification does not introduce unintended side effects.
Run the following script to compare results between the old and new queries:
✅ Verification successful
Let me analyze the SQL queries statically since we can't execute them in the sandbox environment.
Based on the code context found, I can now provide a definitive response about the query change.
The change from GROUP BY
to DISTINCT ON
is safe and functionally equivalent
The new query using DISTINCT ON
with ORDER BY originator_sequence_id DESC
is actually more precise than the old version as it:
- Maintains the same grouping by
originator_node_id
- Ensures we get the highest sequence ID per node (via
ORDER BY ... DESC
) - Additionally preserves the
originator_envelope
data which is used inToVectorClock
conversion
Evidence from the codebase:
- The query results are used in
ToVectorClock
(pkg/db/types.go) which only needs the latest sequence ID per node - The
syncWorker.setupStream
andsubscribeWorker
implementations rely on this vector clock for synchronization - The
ON CONFLICT DO NOTHING
in envelope insertion (pkg/db/queries/queries.sql.go) ensures data consistency
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Compare results of the old and new SelectVectorClock queries.
# Test: Execute both queries and diff the results. Expect: Results should match in terms of latest sequence IDs per node.
# Old query
psql -c "
SELECT
originator_node_id,
max(originator_sequence_id)::BIGINT AS originator_sequence_id
FROM
gateway_envelopes
GROUP BY
originator_node_id;
" > old_query_results.txt
# New query
psql -c "
SELECT DISTINCT ON (originator_node_id)
originator_node_id,
originator_sequence_id
FROM
gateway_envelopes
ORDER BY
originator_node_id,
originator_sequence_id DESC;
" > new_query_results.txt
# Compare results
diff old_query_results.txt new_query_results.txt
Length of output: 610
Script:
#!/bin/bash
# Let's check if there are any tests or documentation that might help us understand the behavior
rg -g '!*.sql' "SelectVectorClock|gateway_envelopes" -A 5 -B 5
Length of output: 5578
@@ -46,13 +46,15 @@ DELETE FROM staged_originator_envelopes | |||
WHERE id = @id; | |||
|
|||
-- name: SelectVectorClock :many | |||
SELECT | |||
SELECT DISTINCT ON (originator_node_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you help me understand this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are now returning the last envelope as well, so using GROUP BY is not going to work. What we want is one row per originator_node_id, not just the largest sequence ID value.
There's a good section on DISTINCT ON
in https://www.postgresql.org/docs/current/sql-select.html if that helps!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m going to suggest we merge this after we turn off auto-deploy to the testnet, given how big the change is.
The golang race test seems to be failing on main (unrelated to this PR), can investigate that separately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pkg/sync/syncWorker.go (1)
358-371
: Enhance validation error handling and logging.The sequence ID and timestamp validation is correct, but the error handling could be more informative.
Consider this improvement:
var lastSequenceID uint64 = 0 var lastNs int64 = 0 if stream.lastEnvelope != nil { lastSequenceID = stream.lastEnvelope.OriginatorSequenceID() lastNs = stream.lastEnvelope.OriginatorNs() } -if env.OriginatorSequenceID() != lastSequenceID+1 || env.OriginatorNs() < lastNs { - // TODO(rich) Submit misbehavior report and continue - s.log.Error("Received out of order envelope") +if env.OriginatorSequenceID() != lastSequenceID+1 { + s.log.Error("Sequence ID mismatch", + zap.Uint64("expected", lastSequenceID+1), + zap.Uint64("received", env.OriginatorSequenceID())) + // TODO(rich) Submit misbehavior report and continue +} else if env.OriginatorNs() < lastNs { + s.log.Error("Timestamp regression", + zap.Int64("lastNs", lastNs), + zap.Int64("receivedNs", env.OriginatorNs())) + // TODO(rich) Submit misbehavior report and continue }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/sync/syncWorker.go
(7 hunks)
🧰 Additional context used
📓 Learnings (1)
pkg/sync/syncWorker.go (1)
Learnt from: richardhuaaa
PR: xmtp/xmtpd#311
File: pkg/sync/syncWorker.go:379-394
Timestamp: 2024-12-06T17:51:56.809Z
Learning: In `pkg/sync/syncWorker.go`, the `insertEnvelope` function's database insert operation is intended to be idempotent; it should do nothing if a duplicate already exists.
🔇 Additional comments (3)
pkg/sync/syncWorker.go (3)
36-40
: LGTM! Well-structured stream encapsulation.
The originatorStream
struct effectively encapsulates stream-related data and state, providing a clean abstraction for managing node-specific stream information.
Line range hint 279-319
: LGTM! Robust stream initialization with proper state management.
The method effectively:
- Initializes the vector clock from the database
- Sets up the subscription with correct node filtering
- Properly initializes the lastEnvelope state for sequence validation
Line range hint 378-401
: LGTM! Well-implemented idempotent envelope insertion.
The method properly handles:
- Envelope serialization
- Database insertion
- Error cases
- Duplicate detection (intentionally idempotent as per requirements)
for _, row := range result { | ||
if uint32(row.OriginatorNodeID) == nodeID { | ||
lastEnvelope, err := envUtils.NewOriginatorEnvelopeFromBytes(row.OriginatorEnvelope) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there cases where we would actually want to continue here? I worry about an invalid ClientEnvelope
blocking replication forever because it errors.
Maybe it's the originator's fault for not validating properly before putting it in the DB and we should stop? Would only be an issue if the validation rules changed.
Just want to make sure we've considered all the failure modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's a tricky call. I think it's similar to the general programming question of whether we should assert things that should always be true. I'm a bit more in the camp of a simpler programming model where things fail if our assumptions are violated, and we fix them - there are also other places in the codebase that rely on the DB deserialization working (e.g. when a query is made) and it would be preferable not to mask them in our responses to clients.
Maybe it's the originator's fault for not validating properly before putting it in the DB and we should stop? Would only be an issue if the validation rules changed.
Yes, I think the assumption is that the originator should have serialized it properly before writing to the DB. Note we're not doing any validation on the envelope here - we're just checking that it can be deserialized. So it shouldn't be a problem if the validation rules changed, but it could happen if we make a breaking proto change without preparing it properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the safest thing is to leave it as-is and if we have nodes that get jammed up we know we have a problem to fix.
Note we're not doing any validation on the envelope here - we're just checking that it can be deserialized
FWIW, we do a little bit of validation on the ClientEnvelope
to make sure the topic matches the payload type. But that all should be checked before it ever goes in the DB.
This adds initial validation on synced payloads.
Still to be done:
#302
Summary by CodeRabbit
Summary by CodeRabbit
New Features
OriginatorNs
value in theOriginatorEnvelope
struct.Bug Fixes
Refactor