Skip to content

Commit

Permalink
Merge branch 'master' into jwinkler2083233/5967-no-hotstuff-mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
jwinkler2083233 authored Oct 30, 2021
2 parents d12fa68 + 38957a7 commit 1d31060
Show file tree
Hide file tree
Showing 90 changed files with 3,476 additions and 1,986 deletions.
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func unstakedNetworkMsgValidators(log zerolog.Logger, idProvider id.IdentityProv
validator.NewAnyValidator(
// message should be either from a valid staked node
validator.NewOriginValidator(
id.NewFilteredIdentifierProvider(filter.IsValidCurrentEpochParticipant, idProvider),
id.NewIdentityFilterIdentifierProvider(filter.IsValidCurrentEpochParticipant, idProvider),
),
// or the message should be specifically targeted for this node
validator.ValidateTarget(log, selfID),
Expand Down
25 changes: 13 additions & 12 deletions cmd/access/node_builder/staked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (fnb *StakedAccessNodeBuilder) InitIDProviders() {
fnb.IdentityProvider = idCache

fnb.SyncEngineParticipantsProviderFactory = func() id.IdentifierProvider {
return id.NewFilteredIdentifierProvider(
return id.NewIdentityFilterIdentifierProvider(
filter.And(
filter.HasRole(flow.RoleConsensus),
filter.Not(filter.HasNodeID(node.Me.NodeID())),
Expand All @@ -61,10 +61,6 @@ func (fnb *StakedAccessNodeBuilder) InitIDProviders() {

fnb.IDTranslator = p2p.NewHierarchicalIDTranslator(idCache, p2p.NewUnstakedNetworkIDTranslator())

if !fnb.supportsUnstakedFollower {
fnb.NetworkingIdentifierProvider = id.NewFilteredIdentifierProvider(p2p.NotEjectedFilter, idCache)
}

return nil
})
}
Expand Down Expand Up @@ -196,6 +192,13 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifier
// The staked nodes act as the DHT servers
dhtOptions := []dht.Option{p2p.AsServer(true)}

psOpts := append(p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize),
func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
h.ID(), builder.RootBlock.ID(), builder.IdentityProvider,
)), nil
})

myAddr := builder.NodeConfig.Me.Address()
if builder.BaseConfig.BindAddr != cmd.NotSet {
myAddr = builder.BaseConfig.BindAddr
Expand All @@ -206,12 +209,10 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifier
resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))

return func(ctx context.Context) (*p2p.Node, error) {
psOpts := p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)
psOpts = append(psOpts, func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
h.ID(), builder.RootBlock.ID(), builder.IdentityProvider,
)), nil
})
streamFactory, err := p2p.LibP2PStreamCompressorFactoryFunc(builder.BaseConfig.LibP2PStreamCompression)
if err != nil {
return nil, fmt.Errorf("could not convert stream factory: %w", err)
}
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, myAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID()).
// no connection gater
Expand All @@ -221,7 +222,7 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifier
SetPubsubOptions(psOpts...).
SetLogger(builder.Logger).
SetResolver(resolver).
SetStreamCompressor(p2p.WithGzipCompression).
SetStreamCompressor(streamFactory).
Build(ctx)
if err != nil {
return nil, err
Expand Down
62 changes: 52 additions & 10 deletions cmd/access/node_builder/unstaked_access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"errors"
"fmt"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/crypto"
Expand All @@ -26,6 +28,7 @@ import (

type UnstakedAccessNodeBuilder struct {
*FlowAccessNodeBuilder
peerID peer.ID
}

func NewUnstakedAccessNodeBuilder(anb *FlowAccessNodeBuilder) *UnstakedAccessNodeBuilder {
Expand All @@ -45,12 +48,12 @@ func (anb *UnstakedAccessNodeBuilder) initNodeInfo() error {
return fmt.Errorf("could not load networking public key: %w", err)
}

peerID, err := peer.IDFromPublicKey(pubKey)
anb.peerID, err = peer.IDFromPublicKey(pubKey)
if err != nil {
return fmt.Errorf("could not get peer ID from public key: %w", err)
}

anb.NodeID, err = p2p.NewUnstakedNetworkIDTranslator().GetFlowID(peerID)
anb.NodeID, err = p2p.NewUnstakedNetworkIDTranslator().GetFlowID(anb.peerID)
if err != nil {
return fmt.Errorf("could not get flow node ID: %w", err)
}
Expand All @@ -74,13 +77,26 @@ func (anb *UnstakedAccessNodeBuilder) InitIDProviders() {

// use the default identifier provider
anb.SyncEngineParticipantsProviderFactory = func() id.IdentifierProvider {

// use the middleware that should have now been initialized
middleware, ok := anb.Middleware.(*p2p.Middleware)
if !ok {
anb.Logger.Fatal().Msg("middleware was of unexpected type")
}
return middleware.IdentifierProvider()
return id.NewCustomIdentifierProvider(func() flow.IdentifierList {
var result flow.IdentifierList

pids := anb.LibP2PNode.GetPeersForProtocol(p2p.FlowProtocolID(anb.RootBlock.ID()))

for _, pid := range pids {
// exclude own Identifier
if pid == anb.peerID {
continue
}

if flowID, err := anb.IDTranslator.GetFlowID(pid); err != nil {
anb.Logger.Err(err).Str("peer", pid.Pretty()).Msg("failed to translate to Flow ID")
} else {
result = append(result, flowID)
}
}

return result
})
}

return nil
Expand Down Expand Up @@ -174,7 +190,32 @@ func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifi

resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))

var pis []peer.AddrInfo
for _, b := range builder.bootstrapIdentities {
pi, err := p2p.PeerAddressInfo(*b)
if err != nil {
return nil, fmt.Errorf("could not extract peer address info from bootstrap identity %v: %w", b, err)
}
pis = append(pis, pi)
}

psOpts := append(p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize),
func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
h.ID(), builder.RootBlock.ID(), builder.IdentityProvider,
)), nil
},
// Note: using the WithDirectPeers option will automatically store these addresses
// as permanent addresses in the Peerstore and try to connect to them when the
// PubSubRouter starts up
p2p.PubSubOptionWrapper(pubsub.WithDirectPeers(pis)),
)

return func(ctx context.Context) (*p2p.Node, error) {
streamFactory, err := p2p.LibP2PStreamCompressorFactoryFunc(builder.BaseConfig.LibP2PStreamCompression)
if err != nil {
return nil, fmt.Errorf("could not convert stream factory: %w", err)
}
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, builder.BaseConfig.BindAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID()).
SetConnectionManager(connManager).
Expand All @@ -183,7 +224,8 @@ func (builder *UnstakedAccessNodeBuilder) initLibP2PFactory(nodeID flow.Identifi
SetDHTOptions(dhtOptions...).
SetLogger(builder.Logger).
SetResolver(resolver).
SetStreamCompressor(p2p.WithGzipCompression).
SetPubsubOptions(psOpts...).
SetStreamCompressor(streamFactory).
Build(ctx)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions cmd/bootstrap/transit/cmd/flags.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package cmd

import "time"

var (
flagBootDir string // Bootstrap dir path
flagBucketName string = "flow-genesis-bootstrap" // The name of the bucket
flagToken string // the key directory
flagAccessAddress string
flagNodeRole string
flagTimeout time.Duration

flagWrapID string // wrap ID
flagVoteFile string
Expand Down
3 changes: 2 additions & 1 deletion cmd/bootstrap/transit/cmd/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func init() {
func addPullCmdFlags() {
pullCmd.Flags().StringVarP(&flagToken, "token", "t", "", "token provided by the Flow team to access the Transit server")
pullCmd.Flags().StringVarP(&flagNodeRole, "role", "r", "", `node role (can be "collection", "consensus", "execution", "verification" or "access")`)
pullCmd.Flags().DurationVar(&flagTimeout, "timeout", time.Second*300, `timeout for pull, default: 5m`)

_ = pullCmd.MarkFlagRequired("token")
_ = pullCmd.MarkFlagRequired("role")
Expand All @@ -38,7 +39,7 @@ func addPullCmdFlags() {
func pull(cmd *cobra.Command, args []string) {
log.Info().Msg("running pull")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
ctx, cancel := context.WithTimeout(context.Background(), flagTimeout)
defer cancel()

nodeID, err := readNodeID()
Expand Down
15 changes: 10 additions & 5 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,18 +497,23 @@ func main() {
return prov, err
}).
Component("ingestion engine", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ing, err := ingestion.New(
core := ingestion.NewCore(
node.Logger,
node.Tracer,
node.Metrics.Engine,
conMetrics,
node.Metrics.Mempool,
node.Network,
node.State,
node.Storage.Headers,
node.Me,
guarantees,
)

ing, err := ingestion.New(
node.Logger,
node.Metrics.Engine,
node.Network,
node.Me,
core,
)

return ing, err
}).
Component("consensus components", func(nodebuilder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func main() {
return syncEngine, nil
}).
Component("grpc server", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
rpcEng := rpc.New(node.Logger, rpcConf, ingestionEng, node.Storage.Blocks, events, results, txResults, node.RootChainID)
rpcEng := rpc.New(node.Logger, rpcConf, ingestionEng, node.Storage.Blocks, node.Storage.Headers, node.State, events, results, txResults, node.RootChainID)
return rpcEng, nil
}).Run()
}
Expand Down
105 changes: 53 additions & 52 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,33 +103,34 @@ type NodeBuilder interface {
// For a node running as a standalone process, the config fields will be populated from the command line params,
// while for a node running as a library, the config fields are expected to be initialized by the caller.
type BaseConfig struct {
nodeIDHex string
AdminAddr string
AdminCert string
AdminKey string
AdminClientCAs string
BindAddr string
NodeRole string
datadir string
secretsdir string
secretsDBEnabled bool
level string
metricsPort uint
BootstrapDir string
PeerUpdateInterval time.Duration
UnicastMessageTimeout time.Duration
DNSCacheTTL time.Duration
profilerEnabled bool
profilerDir string
profilerInterval time.Duration
profilerDuration time.Duration
tracerEnabled bool
tracerSensitivity uint
metricsEnabled bool
guaranteesCacheSize uint
receiptsCacheSize uint
db *badger.DB
LibP2PStreamCompression string
nodeIDHex string
AdminAddr string
AdminCert string
AdminKey string
AdminClientCAs string
BindAddr string
NodeRole string
datadir string
secretsdir string
secretsDBEnabled bool
level string
metricsPort uint
BootstrapDir string
PeerUpdateInterval time.Duration
UnicastMessageTimeout time.Duration
DNSCacheTTL time.Duration
profilerEnabled bool
profilerDir string
profilerInterval time.Duration
profilerDuration time.Duration
tracerEnabled bool
tracerSensitivity uint
metricsEnabled bool
guaranteesCacheSize uint
receiptsCacheSize uint
db *badger.DB
LibP2PStreamCompression string
NetworkReceivedMessageCacheSize int
}

// NodeConfig contains all the derived parameters such the NodeID, private keys etc. and initialized instances of
Expand Down Expand Up @@ -159,7 +160,6 @@ type NodeConfig struct {
// ID providers
IdentityProvider id.IdentityProvider
IDTranslator p2p.IDTranslator
NetworkingIdentifierProvider id.IdentifierProvider
SyncEngineIdentifierProvider id.IdentifierProvider

// root state information
Expand All @@ -176,29 +176,30 @@ func DefaultBaseConfig() *BaseConfig {
datadir := filepath.Join(homedir, ".flow", "database")

return &BaseConfig{
nodeIDHex: NotSet,
AdminAddr: NotSet,
AdminCert: NotSet,
AdminKey: NotSet,
AdminClientCAs: NotSet,
BindAddr: NotSet,
BootstrapDir: "bootstrap",
datadir: datadir,
secretsdir: NotSet,
secretsDBEnabled: true,
level: "info",
PeerUpdateInterval: p2p.DefaultPeerUpdateInterval,
UnicastMessageTimeout: p2p.DefaultUnicastTimeout,
metricsPort: 8080,
profilerEnabled: false,
profilerDir: "profiler",
profilerInterval: 15 * time.Minute,
profilerDuration: 10 * time.Second,
tracerEnabled: false,
tracerSensitivity: 4,
metricsEnabled: true,
receiptsCacheSize: bstorage.DefaultCacheSize,
guaranteesCacheSize: bstorage.DefaultCacheSize,
LibP2PStreamCompression: p2p.NoCompression,
nodeIDHex: NotSet,
AdminAddr: NotSet,
AdminCert: NotSet,
AdminKey: NotSet,
AdminClientCAs: NotSet,
BindAddr: NotSet,
BootstrapDir: "bootstrap",
datadir: datadir,
secretsdir: NotSet,
secretsDBEnabled: true,
level: "info",
PeerUpdateInterval: p2p.DefaultPeerUpdateInterval,
UnicastMessageTimeout: p2p.DefaultUnicastTimeout,
metricsPort: 8080,
profilerEnabled: false,
profilerDir: "profiler",
profilerInterval: 15 * time.Minute,
profilerDuration: 10 * time.Second,
tracerEnabled: false,
tracerSensitivity: 4,
metricsEnabled: true,
receiptsCacheSize: bstorage.DefaultCacheSize,
guaranteesCacheSize: bstorage.DefaultCacheSize,
LibP2PStreamCompression: p2p.NoCompression,
NetworkReceivedMessageCacheSize: p2p.DefaultCacheSize,
}
}
Loading

0 comments on commit 1d31060

Please sign in to comment.