diff --git a/pkg/db/queries.sql b/pkg/db/queries.sql index b6b151f6..b6431cd6 100644 --- a/pkg/db/queries.sql +++ b/pkg/db/queries.sql @@ -53,6 +53,7 @@ SELECT DISTINCT ON (originator_node_id) FROM gateway_envelopes ORDER BY + originator_node_id, originator_sequence_id DESC; -- name: GetAddressLogs :many diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index fa8a2e82..e9c2c443 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -346,6 +346,7 @@ SELECT DISTINCT ON (originator_node_id) FROM gateway_envelopes ORDER BY + originator_node_id, originator_sequence_id DESC ` diff --git a/pkg/sync/syncWorker.go b/pkg/sync/syncWorker.go index d7df9963..acf31332 100644 --- a/pkg/sync/syncWorker.go +++ b/pkg/sync/syncWorker.go @@ -335,41 +335,46 @@ func (s *syncWorker) listenToStream( err) } s.log.Debug("Received envelopes", zap.Any("numEnvelopes", len(envs.Envelopes))) - for _, envProto := range envs.Envelopes { - env, err := envUtils.NewOriginatorEnvelope(envProto) - if err != nil { - s.log.Error("Failed to unmarshal originator envelope", zap.Error(err)) - continue - } - if env.OriginatorNodeID() != stream.nodeID { - s.log.Error("Received envelope from wrong node", zap.Any("nodeID", env.OriginatorNodeID())) - continue - } + for _, env := range envs.Envelopes { + s.validateAndInsertEnvelope(stream, env) + } + } +} - var lastSequenceID uint64 = 0 - var lastNs int64 = 0 - if stream.lastEnvelope != nil { - lastSequenceID = stream.lastEnvelope.OriginatorSequenceID() - lastNs = stream.lastEnvelope.OriginatorNs() - } - if env.OriginatorSequenceID() != lastSequenceID+1 || env.OriginatorNs() < lastNs { - // TODO(rich) Submit misbehavior report and continue - s.log.Error("Received out of order envelope") - } +func (s *syncWorker) validateAndInsertEnvelope(stream *originatorStream, envProto *envelopes.OriginatorEnvelope) { + env, err := envUtils.NewOriginatorEnvelope(envProto) + if err != nil { + s.log.Error("Failed to unmarshal originator envelope", zap.Error(err)) + return + } - if env.OriginatorSequenceID() > stream.lastEnvelope.OriginatorSequenceID() { - stream.lastEnvelope = env - } - s.insertEnvelope(env) - } + if env.OriginatorNodeID() != stream.nodeID { + s.log.Error("Received envelope from wrong node", zap.Any("nodeID", env.OriginatorNodeID())) + return + } + + var lastSequenceID uint64 = 0 + var lastNs int64 = 0 + if stream.lastEnvelope != nil { + lastSequenceID = stream.lastEnvelope.OriginatorSequenceID() + lastNs = stream.lastEnvelope.OriginatorNs() + } + if env.OriginatorSequenceID() != lastSequenceID+1 || env.OriginatorNs() < lastNs { + // TODO(rich) Submit misbehavior report and continue + s.log.Error("Received out of order envelope") + } + + if env.OriginatorSequenceID() > stream.lastEnvelope.OriginatorSequenceID() { + stream.lastEnvelope = env } + // TODO Validation logic - share code with API service and publish worker + // Signatures, topic type, etc + s.insertEnvelope(env) } func (s *syncWorker) insertEnvelope(env *envUtils.OriginatorEnvelope) { s.log.Debug("Replication server received envelope", zap.Any("envelope", env)) - // Here also - // TODO(nm) Validation logic - share code with API service and publish worker originatorBytes, err := env.Bytes() if err != nil { s.log.Error("Failed to marshal originator envelope", zap.Error(err))