Skip to content

Commit

Permalink
Refactor validation into separate method
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Dec 6, 2024
1 parent 2d898bd commit 367567f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ SELECT DISTINCT ON (originator_node_id)
FROM
gateway_envelopes
ORDER BY
originator_node_id,
originator_sequence_id DESC;

-- name: GetAddressLogs :many
Expand Down
1 change: 1 addition & 0 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 32 additions & 27 deletions pkg/sync/syncWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,41 +335,46 @@ func (s *syncWorker) listenToStream(
err)
}
s.log.Debug("Received envelopes", zap.Any("numEnvelopes", len(envs.Envelopes)))
for _, envProto := range envs.Envelopes {
env, err := envUtils.NewOriginatorEnvelope(envProto)
if err != nil {
s.log.Error("Failed to unmarshal originator envelope", zap.Error(err))
continue
}
if env.OriginatorNodeID() != stream.nodeID {
s.log.Error("Received envelope from wrong node", zap.Any("nodeID", env.OriginatorNodeID()))
continue
}
for _, env := range envs.Envelopes {
s.validateAndInsertEnvelope(stream, env)
}
}
}

var lastSequenceID uint64 = 0
var lastNs int64 = 0
if stream.lastEnvelope != nil {
lastSequenceID = stream.lastEnvelope.OriginatorSequenceID()
lastNs = stream.lastEnvelope.OriginatorNs()
}
if env.OriginatorSequenceID() != lastSequenceID+1 || env.OriginatorNs() < lastNs {
// TODO(rich) Submit misbehavior report and continue
s.log.Error("Received out of order envelope")
}
func (s *syncWorker) validateAndInsertEnvelope(stream *originatorStream, envProto *envelopes.OriginatorEnvelope) {
env, err := envUtils.NewOriginatorEnvelope(envProto)
if err != nil {
s.log.Error("Failed to unmarshal originator envelope", zap.Error(err))
return
}

if env.OriginatorSequenceID() > stream.lastEnvelope.OriginatorSequenceID() {
stream.lastEnvelope = env
}
s.insertEnvelope(env)
}
if env.OriginatorNodeID() != stream.nodeID {
s.log.Error("Received envelope from wrong node", zap.Any("nodeID", env.OriginatorNodeID()))
return
}

var lastSequenceID uint64 = 0
var lastNs int64 = 0
if stream.lastEnvelope != nil {
lastSequenceID = stream.lastEnvelope.OriginatorSequenceID()
lastNs = stream.lastEnvelope.OriginatorNs()
}
if env.OriginatorSequenceID() != lastSequenceID+1 || env.OriginatorNs() < lastNs {
// TODO(rich) Submit misbehavior report and continue
s.log.Error("Received out of order envelope")
}

if env.OriginatorSequenceID() > stream.lastEnvelope.OriginatorSequenceID() {
stream.lastEnvelope = env
}

// TODO Validation logic - share code with API service and publish worker
// Signatures, topic type, etc
s.insertEnvelope(env)
}

func (s *syncWorker) insertEnvelope(env *envUtils.OriginatorEnvelope) {
s.log.Debug("Replication server received envelope", zap.Any("envelope", env))
// Here also
// TODO(nm) Validation logic - share code with API service and publish worker
originatorBytes, err := env.Bytes()
if err != nil {
s.log.Error("Failed to marshal originator envelope", zap.Error(err))
Expand Down

0 comments on commit 367567f

Please sign in to comment.