-
Notifications
You must be signed in to change notification settings - Fork 976
/
behaviour.rs
3421 lines (3077 loc) · 133 KB
/
behaviour.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::{
cmp::{max, Ordering, Ordering::Equal},
collections::{BTreeSet, HashMap, HashSet, VecDeque},
fmt,
fmt::Debug,
net::IpAddr,
task::{Context, Poll},
time::Duration,
};
use futures::FutureExt;
use futures_timer::Delay;
use libp2p_core::{
multiaddr::Protocol::{Ip4, Ip6},
transport::PortUse,
Endpoint, Multiaddr,
};
use libp2p_identity::{Keypair, PeerId};
use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
dial_opts::DialOpts,
ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use prometheus_client::registry::Registry;
use quick_protobuf::{MessageWrite, Writer};
use rand::{seq::SliceRandom, thread_rng};
use web_time::{Instant, SystemTime};
use crate::{
backoff::BackoffStorage,
config::{Config, ValidationMode},
gossip_promises::GossipPromises,
handler::{Handler, HandlerEvent, HandlerIn},
mcache::MessageCache,
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason},
protocol::SIGNING_PREFIX,
rpc::Sender,
rpc_proto::proto,
subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
time_cache::DuplicateCache,
topic::{Hasher, Topic, TopicHash},
transform::{DataTransform, IdentityTransform},
types::{
ControlAction, Graft, IHave, IWant, Message, MessageAcceptance, MessageId, PeerConnections,
PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, SubscriptionAction,
},
FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
};
#[cfg(test)]
mod tests;
/// Determines if published messages should be signed or not.
///
/// Without signing, a number of privacy preserving modes can be selected.
///
/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
/// should be updated in the [`Config`] to allow for unsigned messages.
#[derive(Clone)]
pub enum MessageAuthenticity {
/// Message signing is enabled. The author will be the owner of the key and the sequence number
/// will be linearly increasing.
Signed(Keypair),
/// Message signing is disabled.
///
/// The specified [`PeerId`] will be used as the author of all published messages. The sequence
/// number will be randomized.
Author(PeerId),
/// Message signing is disabled.
///
/// A random [`PeerId`] will be used when publishing each message. The sequence number will be
/// randomized.
RandomAuthor,
/// Message signing is disabled.
///
/// The author of the message and the sequence numbers are excluded from the message.
///
/// NOTE: Excluding these fields may make these messages invalid by other nodes who
/// enforce validation of these fields. See [`ValidationMode`] in the [`Config`]
/// for how to customise this for rust-libp2p gossipsub. A custom `message_id`
/// function will need to be set to prevent all messages from a peer being filtered
/// as duplicates.
Anonymous,
}
impl MessageAuthenticity {
/// Returns true if signing is enabled.
pub fn is_signing(&self) -> bool {
matches!(self, MessageAuthenticity::Signed(_))
}
pub fn is_anonymous(&self) -> bool {
matches!(self, MessageAuthenticity::Anonymous)
}
}
/// Event that can be emitted by the gossipsub behaviour.
#[derive(Debug)]
pub enum Event {
/// A message has been received.
Message {
/// The peer that forwarded us this message.
propagation_source: PeerId,
/// The [`MessageId`] of the message. This should be referenced by the application when
/// validating a message (if required).
message_id: MessageId,
/// The decompressed message itself.
message: Message,
},
/// A remote subscribed to a topic.
Subscribed {
/// Remote that has subscribed.
peer_id: PeerId,
/// The topic it has subscribed to.
topic: TopicHash,
},
/// A remote unsubscribed from a topic.
Unsubscribed {
/// Remote that has unsubscribed.
peer_id: PeerId,
/// The topic it has subscribed from.
topic: TopicHash,
},
/// A peer that does not support gossipsub has connected.
GossipsubNotSupported { peer_id: PeerId },
/// A peer is not able to download messages in time.
SlowPeer {
/// The peer_id
peer_id: PeerId,
/// The types and amounts of failed messages that are occurring for this peer.
failed_messages: FailedMessages,
},
}
/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
/// for further details.
#[allow(clippy::large_enum_variant)]
enum PublishConfig {
Signing {
keypair: Keypair,
author: PeerId,
inline_key: Option<Vec<u8>>,
last_seq_no: SequenceNumber,
},
Author(PeerId),
RandomAuthor,
Anonymous,
}
/// A strictly linearly increasing sequence number.
///
/// We start from the current time as unix timestamp in milliseconds.
#[derive(Debug)]
struct SequenceNumber(u64);
impl SequenceNumber {
fn new() -> Self {
let unix_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time to be linear")
.as_nanos();
Self(unix_timestamp as u64)
}
fn next(&mut self) -> u64 {
self.0 = self
.0
.checked_add(1)
.expect("to not exhaust u64 space for sequence numbers");
self.0
}
}
impl PublishConfig {
pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
match self {
Self::Signing { author, .. } => Some(author),
Self::Author(author) => Some(author),
_ => None,
}
}
}
impl From<MessageAuthenticity> for PublishConfig {
fn from(authenticity: MessageAuthenticity) -> Self {
match authenticity {
MessageAuthenticity::Signed(keypair) => {
let public_key = keypair.public();
let key_enc = public_key.encode_protobuf();
let key = if key_enc.len() <= 42 {
// The public key can be inlined in [`rpc_proto::proto::::Message::from`], so we
// don't include it specifically in the
// [`rpc_proto::proto::Message::key`] field.
None
} else {
// Include the protobuf encoding of the public key in the message.
Some(key_enc)
};
PublishConfig::Signing {
keypair,
author: public_key.to_peer_id(),
inline_key: key,
last_seq_no: SequenceNumber::new(),
}
}
MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
}
}
}
/// Network behaviour that handles the gossipsub protocol.
///
/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If
/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
/// appropriate level to accept unsigned messages.
///
/// The DataTransform trait allows applications to optionally add extra encoding/decoding
/// functionality to the underlying messages. This is intended for custom compression algorithms.
///
/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
/// prevent unwanted messages being propagated and evaluated.
pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Configuration providing gossipsub performance parameters.
config: Config,
/// Events that need to be yielded to the outside when polling.
events: VecDeque<ToSwarm<Event, HandlerIn>>,
/// Information used for publishing messages.
publish_config: PublishConfig,
/// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
/// duplicates from being propagated to the application and on the network.
duplicate_cache: DuplicateCache<MessageId>,
/// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
/// the set of [`ConnectionId`]s.
connected_peers: HashMap<PeerId, PeerConnections>,
/// A set of all explicit peers. These are peers that remain connected and we unconditionally
/// forward messages to, outside of the scoring system.
explicit_peers: HashSet<PeerId>,
/// A list of peers that have been blacklisted by the user.
/// Messages are not sent to and are rejected from these peers.
blacklisted_peers: HashSet<PeerId>,
/// Overlay network of connected peers - Maps topics to connected gossipsub peers.
mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
/// Map of topics to list of peers that we publish to, but don't subscribe to.
fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
/// The last publish time for fanout topics.
fanout_last_pub: HashMap<TopicHash, Instant>,
/// Storage for backoffs
backoffs: BackoffStorage,
/// Message cache for the last few heartbeats.
mcache: MessageCache,
/// Heartbeat interval stream.
heartbeat: Delay,
/// Number of heartbeats since the beginning of time; this allows us to amortize some resource
/// clean up -- eg backoff clean up.
heartbeat_ticks: u64,
/// We remember all peers we found through peer exchange, since those peers are not considered
/// as safe as randomly discovered outbound peers. This behaviour diverges from the go
/// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
/// be removed from this list which may result in a true outbound rediscovery.
px_peers: HashSet<PeerId>,
/// Set of connected outbound peers (we only consider true outbound peers found through
/// discovery and not by PX).
outbound_peers: HashSet<PeerId>,
/// Stores optional peer score data together with thresholds, decay interval and gossip
/// promises.
peer_score: Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,
/// Counts the number of `IHAVE` received from each peer since the last heartbeat.
count_received_ihave: HashMap<PeerId, usize>,
/// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
count_sent_iwant: HashMap<PeerId, usize>,
/// Short term cache for published message ids. This is used for penalizing peers sending
/// our own messages back if the messages are anonymous or use a random author.
published_message_ids: DuplicateCache<MessageId>,
/// The filter used to handle message subscriptions.
subscription_filter: F,
/// A general transformation function that can be applied to data received from the wire before
/// calculating the message-id and sending to the application. This is designed to allow the
/// user to implement arbitrary topic-based compression algorithms.
data_transform: D,
/// Keep track of a set of internal metrics relating to gossipsub.
metrics: Option<Metrics>,
/// Tracks the numbers of failed messages per peer-id.
failed_messages: HashMap<PeerId, FailedMessages>,
}
impl<D, F> Behaviour<D, F>
where
D: DataTransform + Default,
F: TopicSubscriptionFilter + Default,
{
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`]. This has no subscription filter and uses no compression.
pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
None,
F::default(),
D::default(),
)
}
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`]. This has no subscription filter and uses no compression.
/// Metrics can be evaluated by passing a reference to a [`Registry`].
pub fn new_with_metrics(
privacy: MessageAuthenticity,
config: Config,
metrics_registry: &mut Registry,
metrics_config: MetricsConfig,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
Some((metrics_registry, metrics_config)),
F::default(),
D::default(),
)
}
}
impl<D, F> Behaviour<D, F>
where
D: DataTransform + Default,
F: TopicSubscriptionFilter,
{
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`] and a custom subscription filter.
pub fn new_with_subscription_filter(
privacy: MessageAuthenticity,
config: Config,
metrics: Option<(&mut Registry, MetricsConfig)>,
subscription_filter: F,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
metrics,
subscription_filter,
D::default(),
)
}
}
impl<D, F> Behaviour<D, F>
where
D: DataTransform,
F: TopicSubscriptionFilter + Default,
{
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`] and a custom data transform.
pub fn new_with_transform(
privacy: MessageAuthenticity,
config: Config,
metrics: Option<(&mut Registry, MetricsConfig)>,
data_transform: D,
) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
metrics,
F::default(),
data_transform,
)
}
}
impl<D, F> Behaviour<D, F>
where
D: DataTransform,
F: TopicSubscriptionFilter,
{
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`] and a custom subscription filter and data transform.
pub fn new_with_subscription_filter_and_transform(
privacy: MessageAuthenticity,
config: Config,
metrics: Option<(&mut Registry, MetricsConfig)>,
subscription_filter: F,
data_transform: D,
) -> Result<Self, &'static str> {
// Set up the router given the configuration settings.
// We do not allow configurations where a published message would also be rejected if it
// were received locally.
validate_config(&privacy, config.validation_mode())?;
Ok(Behaviour {
metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
events: VecDeque::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
explicit_peers: HashSet::new(),
blacklisted_peers: HashSet::new(),
mesh: HashMap::new(),
fanout: HashMap::new(),
fanout_last_pub: HashMap::new(),
backoffs: BackoffStorage::new(
&config.prune_backoff(),
config.heartbeat_interval(),
config.backoff_slack(),
),
mcache: MessageCache::new(config.history_gossip(), config.history_length()),
heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
heartbeat_ticks: 0,
px_peers: HashSet::new(),
outbound_peers: HashSet::new(),
peer_score: None,
count_received_ihave: HashMap::new(),
count_sent_iwant: HashMap::new(),
connected_peers: HashMap::new(),
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
config,
subscription_filter,
data_transform,
failed_messages: Default::default(),
})
}
}
impl<D, F> Behaviour<D, F>
where
D: DataTransform + Send + 'static,
F: TopicSubscriptionFilter + Send + 'static,
{
/// Lists the hashes of the topics we are currently subscribed to.
pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
self.mesh.keys()
}
/// Lists all mesh peers for a certain topic hash.
pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
}
pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
let mut res = BTreeSet::new();
for peers in self.mesh.values() {
res.extend(peers);
}
res.into_iter()
}
/// Lists all known peers and their associated subscribed topics.
pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
self.connected_peers
.iter()
.map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
}
/// Lists all known peers and their associated protocol.
pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
}
/// Returns the gossipsub score for a given peer, if one exists.
pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
self.peer_score
.as_ref()
.map(|(score, ..)| score.score(peer_id))
}
/// Subscribe to a topic.
///
/// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
/// subscribed.
pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
tracing::debug!(%topic, "Subscribing to topic");
let topic_hash = topic.hash();
if !self.subscription_filter.can_subscribe(&topic_hash) {
return Err(SubscriptionError::NotAllowed);
}
if self.mesh.contains_key(&topic_hash) {
tracing::debug!(%topic, "Topic is already in the mesh");
return Ok(false);
}
// send subscription request to all peers
for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
let event = RpcOut::Subscribe(topic_hash.clone());
self.send_message(peer_id, event);
}
// call JOIN(topic)
// this will add new peers to the mesh for the topic
self.join(&topic_hash);
tracing::debug!(%topic, "Subscribed to topic");
Ok(true)
}
/// Unsubscribes from a topic.
///
/// Returns `true` if we were subscribed to this topic.
pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
tracing::debug!(%topic, "Unsubscribing from topic");
let topic_hash = topic.hash();
if !self.mesh.contains_key(&topic_hash) {
tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
// we are not subscribed
return false;
}
// announce to all peers
for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
let event = RpcOut::Unsubscribe(topic_hash.clone());
self.send_message(peer, event);
}
// call LEAVE(topic)
// this will remove the topic from the mesh
self.leave(&topic_hash);
tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
true
}
/// Publishes a message with multiple topics to the network.
pub fn publish(
&mut self,
topic: impl Into<TopicHash>,
data: impl Into<Vec<u8>>,
) -> Result<MessageId, PublishError> {
let data = data.into();
let topic = topic.into();
// Transform the data before building a raw_message.
let transformed_data = self
.data_transform
.outbound_transform(&topic, data.clone())?;
// check that the size doesn't exceed the max transmission size.
if transformed_data.len() > self.config.max_transmit_size() {
return Err(PublishError::MessageTooLarge);
}
let raw_message = self.build_raw_message(topic, transformed_data)?;
// calculate the message id from the un-transformed data
let msg_id = self.config.message_id(&Message {
source: raw_message.source,
data, // the uncompressed form
sequence_number: raw_message.sequence_number,
topic: raw_message.topic.clone(),
});
// Check the if the message has been published before
if self.duplicate_cache.contains(&msg_id) {
// This message has already been seen. We don't re-publish messages that have already
// been published on the network.
tracing::warn!(
message=%msg_id,
"Not publishing a message that has already been published"
);
return Err(PublishError::Duplicate);
}
tracing::trace!(message=%msg_id, "Publishing message");
let topic_hash = raw_message.topic.clone();
let mut peers_on_topic = self
.connected_peers
.iter()
.filter(|(_, p)| p.topics.contains(&topic_hash))
.map(|(peer_id, _)| peer_id)
.peekable();
if peers_on_topic.peek().is_none() {
return Err(PublishError::InsufficientPeers);
}
let mut recipient_peers = HashSet::new();
if self.config.flood_publish() {
// Forward to all peers above score and all explicit peers
recipient_peers.extend(peers_on_topic.filter(|p| {
self.explicit_peers.contains(*p)
|| !self.score_below_threshold(p, |ts| ts.publish_threshold).0
}));
} else {
match self.mesh.get(&topic_hash) {
// Mesh peers
Some(mesh_peers) => {
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
// peers (if possible).
let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
if needed_extra_peers > 0 {
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
// and publish to them.
// Get a random set of peers that are appropriate to send messages too.
let peer_list = get_random_peers(
&self.connected_peers,
&topic_hash,
needed_extra_peers,
|peer| {
!mesh_peers.contains(peer)
&& !self.explicit_peers.contains(peer)
&& !self
.score_below_threshold(peer, |pst| pst.publish_threshold)
.0
},
);
recipient_peers.extend(peer_list);
}
recipient_peers.extend(mesh_peers);
}
// Gossipsub peers
None => {
tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
// If we have fanout peers add them to the map.
if self.fanout.contains_key(&topic_hash) {
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
recipient_peers.insert(*peer);
}
} else {
// We have no fanout peers, select mesh_n of them and add them to the fanout
let mesh_n = self.config.mesh_n();
let new_peers =
get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
|p| {
!self.explicit_peers.contains(p)
&& !self
.score_below_threshold(p, |pst| pst.publish_threshold)
.0
}
});
// Add the new peers to the fanout and recipient peers
self.fanout.insert(topic_hash.clone(), new_peers.clone());
for peer in new_peers {
tracing::debug!(%peer, "Peer added to fanout");
recipient_peers.insert(peer);
}
}
// We are publishing to fanout peers - update the time we published
self.fanout_last_pub
.insert(topic_hash.clone(), Instant::now());
}
}
// Explicit peers that are part of the topic
recipient_peers
.extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
// Floodsub peers
for (peer, connections) in &self.connected_peers {
if connections.kind == PeerKind::Floodsub
&& !self
.score_below_threshold(peer, |ts| ts.publish_threshold)
.0
{
recipient_peers.insert(*peer);
}
}
}
// If the message isn't a duplicate and we have sent it to some peers add it to the
// duplicate cache and memcache.
self.duplicate_cache.insert(msg_id.clone());
self.mcache.put(&msg_id, raw_message.clone());
// If the message is anonymous or has a random author add it to the published message ids
// cache.
if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
if !self.config.allow_self_origin() {
self.published_message_ids.insert(msg_id.clone());
}
}
// Send to peers we know are subscribed to the topic.
let mut publish_failed = true;
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
if self.send_message(
*peer_id,
RpcOut::Publish {
message: raw_message.clone(),
timeout: Delay::new(self.config.publish_queue_duration()),
},
) {
publish_failed = false
}
}
if recipient_peers.is_empty() {
return Err(PublishError::InsufficientPeers);
}
if publish_failed {
return Err(PublishError::AllQueuesFull(recipient_peers.len()));
}
tracing::debug!(message=%msg_id, "Published message");
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_published_message(&topic_hash);
}
Ok(msg_id)
}
/// This function should be called when [`Config::validate_messages()`] is `true` after
/// the message got validated by the caller. Messages are stored in the ['Memcache'] and
/// validation is expected to be fast enough that the messages should still exist in the cache.
/// There are three possible validation outcomes and the outcome is given in acceptance.
///
/// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
/// network. The `propagation_source` parameter indicates who the message was received by and
/// will not be forwarded back to that peer.
///
/// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
/// and the P₄ penalty will be applied to the `propagation_source`.
//
/// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
/// but no P₄ penalty will be applied.
///
/// This function will return true if the message was found in the cache and false if was not
/// in the cache anymore.
///
/// This should only be called once per message.
pub fn report_message_validation_result(
&mut self,
msg_id: &MessageId,
propagation_source: &PeerId,
acceptance: MessageAcceptance,
) -> bool {
let reject_reason = match acceptance {
MessageAcceptance::Accept => {
let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
Some((raw_message, originating_peers)) => {
(raw_message.clone(), originating_peers)
}
None => {
tracing::warn!(
message=%msg_id,
"Message not in cache. Ignoring forwarding"
);
if let Some(metrics) = self.metrics.as_mut() {
metrics.memcache_miss();
}
return false;
}
};
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_msg_validation(&raw_message.topic, &acceptance);
}
self.forward_msg(
msg_id,
raw_message,
Some(propagation_source),
originating_peers,
);
return true;
}
MessageAcceptance::Reject => RejectReason::ValidationFailed,
MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
};
if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_msg_validation(&raw_message.topic, &acceptance);
}
// Tell peer_score about reject
// Reject the original source, and any duplicates we've seen from other peers.
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
reject_reason,
);
for peer in originating_peers.iter() {
peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
}
}
true
} else {
tracing::warn!(message=%msg_id, "Rejected message not in cache");
false
}
}
/// Adds a new peer to the list of explicitly connected peers.
pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
tracing::debug!(peer=%peer_id, "Adding explicit peer");
self.explicit_peers.insert(*peer_id);
self.check_explicit_peer_connection(peer_id);
}
/// This removes the peer from explicitly connected peers, note that this does not disconnect
/// the peer.
pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
tracing::debug!(peer=%peer_id, "Removing explicit peer");
self.explicit_peers.remove(peer_id);
}
/// Blacklists a peer. All messages from this peer will be rejected and any message that was
/// created by this peer will be rejected.
pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
if self.blacklisted_peers.insert(*peer_id) {
tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
}
}
/// Removes a peer from the blacklist if it has previously been blacklisted.
pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
if self.blacklisted_peers.remove(peer_id) {
tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
}
}
/// Activates the peer scoring system with the given parameters. This will reset all scores
/// if there was already another peer scoring system activated. Returns an error if the
/// params are not valid or if they got already set.
pub fn with_peer_score(
&mut self,
params: PeerScoreParams,
threshold: PeerScoreThresholds,
) -> Result<(), String> {
self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
}
/// Activates the peer scoring system with the given parameters and a message delivery time
/// callback. Returns an error if the parameters got already set.
pub fn with_peer_score_and_message_delivery_time_callback(
&mut self,
params: PeerScoreParams,
threshold: PeerScoreThresholds,
callback: Option<fn(&PeerId, &TopicHash, f64)>,
) -> Result<(), String> {
params.validate()?;
threshold.validate()?;
if self.peer_score.is_some() {
return Err("Peer score set twice".into());
}
let interval = Delay::new(params.decay_interval);
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
Ok(())
}
/// Sets scoring parameters for a topic.
///
/// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
pub fn set_topic_params<H: Hasher>(
&mut self,
topic: Topic<H>,
params: TopicScoreParams,
) -> Result<(), &'static str> {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.set_topic_params(topic.hash(), params);
Ok(())
} else {
Err("Peer score must be initialised with `with_peer_score()`")
}
}
/// Returns a scoring parameters for a topic if existent.
pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
self.peer_score.as_ref()?.0.get_topic_params(&topic.hash())
}
/// Sets the application specific score for a peer. Returns true if scoring is active and
/// the peer is connected or if the score of the peer is not yet expired, false otherwise.
pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.set_application_score(peer_id, new_score)
} else {
false
}
}
/// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
fn join(&mut self, topic_hash: &TopicHash) {
tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
// if we are already in the mesh, return
if self.mesh.contains_key(topic_hash) {
tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN");
return;
}
let mut added_peers = HashSet::new();
if let Some(m) = self.metrics.as_mut() {
m.joined(topic_hash)
}
// check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
// removing the fanout entry.
if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
tracing::debug!(
topic=%topic_hash,
"JOIN: Removing peers from the fanout for topic"
);
// remove explicit peers, peers with negative scores, and backoffed peers
peers.retain(|p| {
!self.explicit_peers.contains(p)
&& !self.score_below_threshold(p, |_| 0.0).0
&& !self.backoffs.is_backoff_with_slack(topic_hash, p)
});
// Add up to mesh_n of them to the mesh
// NOTE: These aren't randomly added, currently FIFO
let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
tracing::debug!(
topic=%topic_hash,
"JOIN: Adding {:?} peers from the fanout for topic",
add_peers
);
added_peers.extend(peers.iter().take(add_peers));
self.mesh.insert(
topic_hash.clone(),
peers.into_iter().take(add_peers).collect(),
);
// remove the last published time
self.fanout_last_pub.remove(topic_hash);
}
let fanaout_added = added_peers.len();
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
}
// check if we need to get more peers, which we randomly select
if added_peers.len() < self.config.mesh_n() {
// get the peers
let new_peers = get_random_peers(
&self.connected_peers,
topic_hash,
self.config.mesh_n() - added_peers.len(),
|peer| {
!added_peers.contains(peer)
&& !self.explicit_peers.contains(peer)
&& !self.score_below_threshold(peer, |_| 0.0).0