diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 3e69728d9851..ea5cafc84ae5 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -140,13 +140,14 @@ func (s *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) { // It implements the p2p.Server interface so it can be used transparently // by the underlying service. type SimNode struct { - lock sync.RWMutex - ID discover.NodeID - config *NodeConfig - adapter *SimAdapter - node *node.Node - running map[string]node.Service - client *rpc.Client + lock sync.RWMutex + ID discover.NodeID + config *NodeConfig + adapter *SimAdapter + node *node.Node + running map[string]node.Service + client *rpc.Client + registerOnce sync.Once } // Addr returns the node's discovery address @@ -210,8 +211,6 @@ func (self *SimNode) Snapshots() (map[string][]byte, error) { // Start starts the RPC handler and the underlying service func (self *SimNode) Start(snapshots map[string][]byte) error { - self.lock.Lock() - defer self.lock.Unlock() newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) { return func(nodeCtx *node.ServiceContext) (node.Service, error) { ctx := &ServiceContext{ @@ -232,10 +231,17 @@ func (self *SimNode) Start(snapshots map[string][]byte) error { } } - for _, name := range self.config.Services { - if err := self.node.Register(newService(name)); err != nil { - return err + var regErr error + self.registerOnce.Do(func() { + for _, name := range self.config.Services { + if err := self.node.Register(newService(name)); err != nil { + regErr = err + return + } } + }) + if regErr != nil { + return regErr } if err := self.node.Start(); err != nil { @@ -247,7 +253,10 @@ func (self *SimNode) Start(snapshots map[string][]byte) error { if err != nil { return err } + + self.lock.Lock() self.client = rpc.DialInProc(handler) + self.lock.Unlock() return nil } diff --git a/p2p/simulations/cmd/p2psim/main.go b/p2p/simulations/cmd/p2psim/main.go index 012611c5ad5a..b511fb595337 100644 --- a/p2p/simulations/cmd/p2psim/main.go +++ b/p2p/simulations/cmd/p2psim/main.go @@ -3,22 +3,19 @@ // Here is an example of creating a 2 node network with the first node // connected to the second: // -// $ p2psim network create -// Created network net1 -// -// $ p2psim node create net1 +// $ p2psim node create // Created node01 // -// $ p2psim node start net1 node01 +// $ p2psim node start node01 // Started node01 // -// $ p2psim node create net1 +// $ p2psim node create // Created node02 // -// $ p2psim node start net1 node02 +// $ p2psim node start node02 // Started node02 // -// $ p2psim node connect net1 node01 node02 +// $ p2psim node connect node01 node02 // Connected node01 to node02 // package main @@ -26,7 +23,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "io" "os" @@ -42,10 +38,7 @@ import ( "gopkg.in/urfave/cli.v1" ) -var ( - client *simulations.Client - networkID string -) +var client *simulations.Client func main() { app := cli.NewApp() @@ -64,78 +57,29 @@ func main() { } app.Commands = []cli.Command{ { - Name: "network", - Usage: "manage simulation networks", - Action: listNetworks, - Subcommands: []cli.Command{ - { - Name: "list", - Usage: "list networks", - Action: listNetworks, - }, - { - Name: "create", - Usage: "create a network", - Action: createNetwork, - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "id", - Value: "", - Usage: "network ID", - }, - cli.StringFlag{ - Name: "default-service", - Value: "", - Usage: "default service", - }, - }, - }, - { - Name: "show", - ArgsUsage: "", - Usage: "show network information", - Action: showNetwork, - }, - { - Name: "events", - ArgsUsage: "", - Usage: "stream network events", - Action: streamNetwork, - }, - { - Name: "snapshot", - ArgsUsage: "", - Usage: "create a network snapshot to stdout", - Action: createSnapshot, - }, - { - Name: "load", - ArgsUsage: "", - Usage: "load a network snapshot from stdin", - Action: loadSnapshot, - }, - }, + Name: "show", + Usage: "show network information", + Action: showNetwork, + }, + { + Name: "events", + Usage: "stream network events", + Action: streamNetwork, + }, + { + Name: "snapshot", + Usage: "create a network snapshot to stdout", + Action: createSnapshot, + }, + { + Name: "load", + Usage: "load a network snapshot from stdin", + Action: loadSnapshot, }, { Name: "node", Usage: "manage simulation nodes", Action: listNodes, - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "network", - Usage: "simulation network", - }, - }, - Before: func(ctx *cli.Context) error { - networkID = ctx.GlobalString("network") - if networkID == "" { - networkID = os.Getenv("P2PSIM_NETWORK") - } - if networkID == "" { - return errors.New("missing network, set with --network or P2PSIM_NETWORK") - } - return nil - }, Subcommands: []cli.Command{ { Name: "list", @@ -212,65 +156,27 @@ func main() { app.Run(os.Args) } -func listNetworks(ctx *cli.Context) error { - if len(ctx.Args()) != 0 { - return cli.ShowCommandHelp(ctx, ctx.Command.Name) - } - networks, err := client.GetNetworks() - if err != nil { - return err - } - w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) - defer w.Flush() - fmt.Fprintf(w, "ID\tNODES\tCONNS\n") - for _, network := range networks { - fmt.Fprintf(w, "%s\t%d\t%d\n", network.ID, len(network.Nodes), len(network.Conns)) - } - return nil -} - -func createNetwork(ctx *cli.Context) error { - if len(ctx.Args()) != 0 { - return cli.ShowCommandHelp(ctx, ctx.Command.Name) - } - config := &simulations.NetworkConfig{ - ID: ctx.String("id"), - DefaultService: ctx.String("default-service"), - } - network, err := client.CreateNetwork(config) - if err != nil { - return err - } - fmt.Fprintln(ctx.App.Writer, "Created network", network.ID) - return nil -} - func showNetwork(ctx *cli.Context) error { - args := ctx.Args() - if len(args) != 1 { + if len(ctx.Args()) != 0 { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } - networkID := args[0] - network, err := client.GetNetwork(networkID) + network, err := client.GetNetwork() if err != nil { return err } w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) defer w.Flush() - fmt.Fprintf(w, "ID\t%s\n", network.ID) fmt.Fprintf(w, "NODES\t%d\n", len(network.Nodes)) fmt.Fprintf(w, "CONNS\t%d\n", len(network.Conns)) return nil } func streamNetwork(ctx *cli.Context) error { - args := ctx.Args() - if len(args) != 1 { + if len(ctx.Args()) != 0 { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } - networkID := args[0] events := make(chan *simulations.Event) - sub, err := client.SubscribeNetwork(networkID, events) + sub, err := client.SubscribeNetwork(events) if err != nil { return err } @@ -289,12 +195,10 @@ func streamNetwork(ctx *cli.Context) error { } func createSnapshot(ctx *cli.Context) error { - args := ctx.Args() - if len(args) != 1 { + if len(ctx.Args()) != 0 { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } - networkID := args[0] - snap, err := client.CreateSnapshot(networkID) + snap, err := client.CreateSnapshot() if err != nil { return err } @@ -302,23 +206,21 @@ func createSnapshot(ctx *cli.Context) error { } func loadSnapshot(ctx *cli.Context) error { - args := ctx.Args() - if len(args) != 1 { + if len(ctx.Args()) != 0 { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } - networkID := args[0] snap := &simulations.Snapshot{} if err := json.NewDecoder(os.Stdin).Decode(snap); err != nil { return err } - return client.LoadSnapshot(networkID, snap) + return client.LoadSnapshot(snap) } func listNodes(ctx *cli.Context) error { if len(ctx.Args()) != 0 { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } - nodes, err := client.GetNodes(networkID) + nodes, err := client.GetNodes() if err != nil { return err } @@ -357,7 +259,7 @@ func createNode(ctx *cli.Context) error { if services := ctx.String("services"); services != "" { config.Services = strings.Split(services, ",") } - node, err := client.CreateNode(networkID, config) + node, err := client.CreateNode(config) if err != nil { return err } @@ -371,7 +273,7 @@ func showNode(ctx *cli.Context) error { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } nodeName := args[0] - node, err := client.GetNode(networkID, nodeName) + node, err := client.GetNode(nodeName) if err != nil { return err } @@ -396,7 +298,7 @@ func startNode(ctx *cli.Context) error { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } nodeName := args[0] - if err := client.StartNode(networkID, nodeName); err != nil { + if err := client.StartNode(nodeName); err != nil { return err } fmt.Fprintln(ctx.App.Writer, "Started", nodeName) @@ -409,7 +311,7 @@ func stopNode(ctx *cli.Context) error { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } nodeName := args[0] - if err := client.StopNode(networkID, nodeName); err != nil { + if err := client.StopNode(nodeName); err != nil { return err } fmt.Fprintln(ctx.App.Writer, "Stopped", nodeName) @@ -423,7 +325,7 @@ func connectNode(ctx *cli.Context) error { } nodeName := args[0] peerName := args[1] - if err := client.ConnectNode(networkID, nodeName, peerName); err != nil { + if err := client.ConnectNode(nodeName, peerName); err != nil { return err } fmt.Fprintln(ctx.App.Writer, "Connected", nodeName, "to", peerName) @@ -437,7 +339,7 @@ func disconnectNode(ctx *cli.Context) error { } nodeName := args[0] peerName := args[1] - if err := client.DisconnectNode(networkID, nodeName, peerName); err != nil { + if err := client.DisconnectNode(nodeName, peerName); err != nil { return err } fmt.Fprintln(ctx.App.Writer, "Disconnected", nodeName, "from", peerName) @@ -451,7 +353,7 @@ func rpcNode(ctx *cli.Context) error { } nodeName := args[0] method := args[1] - rpcClient, err := client.RPCClient(context.Background(), networkID, nodeName) + rpcClient, err := client.RPCClient(context.Background(), nodeName) if err != nil { return err } diff --git a/p2p/simulations/examples/connectivity.go b/p2p/simulations/examples/connectivity.go index 4e7421178700..3a5987c4ac4c 100644 --- a/p2p/simulations/examples/connectivity.go +++ b/p2p/simulations/examples/connectivity.go @@ -34,16 +34,16 @@ func main() { adapters.RegisterServices(services) - config := &simulations.ServerConfig{} - - adapter := flag.String("adapter", "sim", `node adapter to use (one of "sim", "exec" or "docker")`) + adapterType := flag.String("adapter", "sim", `node adapter to use (one of "sim", "exec" or "docker")`) flag.Parse() - switch *adapter { + var adapter adapters.NodeAdapter + + switch *adapterType { case "sim": log.Info("using sim adapter") - config.NewAdapter = func() adapters.NodeAdapter { return adapters.NewSimAdapter(services) } + adapter = adapters.NewSimAdapter(services) case "exec": tmpdir, err := ioutil.TempDir("", "p2p-example") @@ -52,22 +52,26 @@ func main() { } defer os.RemoveAll(tmpdir) log.Info("using exec adapter", "tmpdir", tmpdir) - config.NewAdapter = func() adapters.NodeAdapter { return adapters.NewExecAdapter(tmpdir) } + adapter = adapters.NewExecAdapter(tmpdir) case "docker": log.Info("using docker adapter") - adapter, err := adapters.NewDockerAdapter() + var err error + adapter, err = adapters.NewDockerAdapter() if err != nil { log.Crit("error creating docker adapter", "err", err) } - config.NewAdapter = func() adapters.NodeAdapter { return adapter } default: - log.Crit(fmt.Sprintf("unknown node adapter %q", *adapter)) + log.Crit(fmt.Sprintf("unknown node adapter %q", *adapterType)) } log.Info("starting simulation server on 0.0.0.0:8888...") - if err := http.ListenAndServe(":8888", simulations.NewServer(config)); err != nil { + network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "ping-pong", + }) + config := simulations.ServerConfig{} + if err := http.ListenAndServe(":8888", simulations.NewServer(network, config)); err != nil { log.Crit("error starting simulation server", "err", err) } } diff --git a/p2p/simulations/examples/p2psim.sh b/p2p/simulations/examples/p2psim.sh index 468d66d4e30b..3e164543f4d6 100755 --- a/p2p/simulations/examples/p2psim.sh +++ b/p2p/simulations/examples/p2psim.sh @@ -9,13 +9,9 @@ main() { fail "missing p2psim binary (you need to build p2p/simulations/cmd/p2psim)" fi - info "creating the example network" - export P2PSIM_NETWORK="example" - p2psim network create --id "${P2PSIM_NETWORK}" - info "creating 10 nodes" for i in $(seq 1 10); do - p2psim node create --name "$(node_name $i)" --services "ping-pong" + p2psim node create --name "$(node_name $i)" p2psim node start "$(node_name $i)" done diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 0d1742cfaf04..c9140412e95c 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -10,7 +10,6 @@ import ( "io/ioutil" "net/http" "strings" - "sync" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" @@ -41,54 +40,37 @@ func NewClient(url string) *Client { } } -// GetNetworks returns a list of simulations networks -func (c *Client) GetNetworks() ([]*Network, error) { - var networks []*Network - return networks, c.Get("/networks", &networks) -} - -// CreateNetwork creates a new simulation network -func (c *Client) CreateNetwork(config *NetworkConfig) (*Network, error) { - network := &Network{} - return network, c.Post("/networks", config, network) -} - -// GetNetwork returns details of a network -func (c *Client) GetNetwork(networkID string) (*Network, error) { +// GetNetwork returns details of the network +func (c *Client) GetNetwork() (*Network, error) { network := &Network{} - return network, c.Get(fmt.Sprintf("/networks/%s", networkID), network) + return network, c.Get("/", network) } -// StartNetwork starts all existing nodes in a simulation network -func (c *Client) StartNetwork(networkID string) error { - return c.Post(fmt.Sprintf("/networks/%s/start", networkID), nil, nil) +// StartNetwork starts all existing nodes in the simulation network +func (c *Client) StartNetwork() error { + return c.Post("/start", nil, nil) } // StopNetwork stops all existing nodes in a simulation network -func (c *Client) StopNetwork(networkID string) error { - return c.Post(fmt.Sprintf("/networks/%s/stop", networkID), nil, nil) -} - -// DeleteNetwork stops and deletes a simulation network -func (c *Client) DeleteNetwork(networkID string) error { - return c.Delete(fmt.Sprintf("/networks/%s", networkID)) +func (c *Client) StopNetwork() error { + return c.Post("/stop", nil, nil) } // CreateSnapshot creates a network snapshot -func (c *Client) CreateSnapshot(networkID string) (*Snapshot, error) { +func (c *Client) CreateSnapshot() (*Snapshot, error) { snap := &Snapshot{} - return snap, c.Get(fmt.Sprintf("/networks/%s/snapshot", networkID), snap) + return snap, c.Get("/snapshot", snap) } -// LoadSnapshot loads a snapshot into a network -func (c *Client) LoadSnapshot(networkID string, snap *Snapshot) error { - return c.Post(fmt.Sprintf("/networks/%s/snapshot", networkID), snap, nil) +// LoadSnapshot loads a snapshot into the network +func (c *Client) LoadSnapshot(snap *Snapshot) error { + return c.Post("/snapshot", snap, nil) } // SubscribeNetwork subscribes to network events which are sent from the server // as a server-sent-events stream -func (c *Client) SubscribeNetwork(networkID string, events chan *Event) (event.Subscription, error) { - req, err := http.NewRequest("GET", fmt.Sprintf("%s/networks/%s/events", c.URL, networkID), nil) +func (c *Client) SubscribeNetwork(events chan *Event) (event.Subscription, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("%s/events", c.URL), nil) if err != nil { return nil, err } @@ -154,48 +136,48 @@ func (c *Client) SubscribeNetwork(networkID string, events chan *Event) (event.S return event.NewSubscription(producer), nil } -// GetNodes returns all nodes which exist in a network -func (c *Client) GetNodes(networkID string) ([]*p2p.NodeInfo, error) { +// GetNodes returns all nodes which exist in the network +func (c *Client) GetNodes() ([]*p2p.NodeInfo, error) { var nodes []*p2p.NodeInfo - return nodes, c.Get(fmt.Sprintf("/networks/%s/nodes", networkID), &nodes) + return nodes, c.Get("/nodes", &nodes) } -// CreateNode creates a node in a network using the given configuration -func (c *Client) CreateNode(networkID string, config *adapters.NodeConfig) (*p2p.NodeInfo, error) { +// CreateNode creates a node in the network using the given configuration +func (c *Client) CreateNode(config *adapters.NodeConfig) (*p2p.NodeInfo, error) { node := &p2p.NodeInfo{} - return node, c.Post(fmt.Sprintf("/networks/%s/nodes", networkID), config, node) + return node, c.Post("/nodes", config, node) } // GetNode returns details of a node -func (c *Client) GetNode(networkID, nodeID string) (*p2p.NodeInfo, error) { +func (c *Client) GetNode(nodeID string) (*p2p.NodeInfo, error) { node := &p2p.NodeInfo{} - return node, c.Get(fmt.Sprintf("/networks/%s/nodes/%s", networkID, nodeID), node) + return node, c.Get(fmt.Sprintf("/nodes/%s", nodeID), node) } // StartNode starts a node -func (c *Client) StartNode(networkID, nodeID string) error { - return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/start", networkID, nodeID), nil, nil) +func (c *Client) StartNode(nodeID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/start", nodeID), nil, nil) } // StopNode stops a node -func (c *Client) StopNode(networkID, nodeID string) error { - return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/stop", networkID, nodeID), nil, nil) +func (c *Client) StopNode(nodeID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/stop", nodeID), nil, nil) } // ConnectNode connects a node to a peer node -func (c *Client) ConnectNode(networkID, nodeID, peerID string) error { - return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/conn/%s", networkID, nodeID, peerID), nil, nil) +func (c *Client) ConnectNode(nodeID, peerID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID), nil, nil) } // DisconnectNode disconnects a node from a peer node -func (c *Client) DisconnectNode(networkID, nodeID, peerID string) error { - return c.Delete(fmt.Sprintf("/networks/%s/nodes/%s/conn/%s", networkID, nodeID, peerID)) +func (c *Client) DisconnectNode(nodeID, peerID string) error { + return c.Delete(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID)) } // RPCClient returns an RPC client connected to a node -func (c *Client) RPCClient(ctx context.Context, networkID, nodeID string) (*rpc.Client, error) { +func (c *Client) RPCClient(ctx context.Context, nodeID string) (*rpc.Client, error) { baseURL := strings.Replace(c.URL, "http", "ws", 1) - return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/networks/%s/nodes/%s/rpc", baseURL, networkID, nodeID), "") + return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/nodes/%s/rpc", baseURL, nodeID), "") } // Get performs a HTTP GET request decoding the resulting JSON response @@ -251,17 +233,9 @@ func (c *Client) Send(method, path string, in, out interface{}) error { // ServerConfig is the configuration used to start an API server type ServerConfig struct { - // NewAdapter is called to create a new NodeAdapter for each new - // network - NewAdapter func() adapters.NodeAdapter - - // ExternalNetworks are externally defined networks to expose via the - // HTTP server - ExternalNetworks map[string]*Network - // Mocker is the function which will be called when a client sends a - // POST request to /networks//mock and is expected to - // generate some mock events in the network + // POST request to /mock and is expected to generate some mock events + // in the network Mocker func(*Network) // In case of multiple mockers, set the default here DefaultMockerID string @@ -269,108 +243,51 @@ type ServerConfig struct { Mockers map[string]*MockerConfig } -// Server is an HTTP server providing an API to create and manage simulation -// networks +// Server is an HTTP server providing an API to manage a simulation network type Server struct { ServerConfig - router *httprouter.Router - networks map[string]*Network - mtx sync.Mutex + router *httprouter.Router + network *Network } // NewServer returns a new simulation API server -func NewServer(config *ServerConfig) *Server { - if config.NewAdapter == nil { - panic("NewAdapter not set") - } - +func NewServer(network *Network, config ServerConfig) *Server { s := &Server{ - ServerConfig: *config, + ServerConfig: config, router: httprouter.New(), - networks: make(map[string]*Network), - } - for name, network := range config.ExternalNetworks { - s.networks[name] = network - } - - s.OPTIONS("/networks", s.Options) - s.POST("/networks", s.CreateNetwork) - s.GET("/networks", s.GetNetworks) - s.GET("/networks/:netid", s.GetNetwork) - s.POST("/networks/:netid/start", s.StartNetwork) - s.POST("/networks/:netid/stop", s.StopNetwork) - s.DELETE("/networks/:netid", s.DeleteNetwork) - s.GET("/networks/:netid/events", s.StreamNetworkEvents) - s.GET("/networks/:netid/snapshot", s.CreateSnapshot) - s.POST("/networks/:netid/snapshot", s.LoadSnapshot) - s.POST("/networks/:netid/mock/:mockid", s.StartMocker) - s.GET("/networks/:netid/mock", s.GetMocker) - s.POST("/networks/:netid/nodes", s.CreateNode) - s.GET("/networks/:netid/nodes", s.GetNodes) - s.GET("/networks/:netid/nodes/:nodeid", s.GetNode) - s.POST("/networks/:netid/nodes/:nodeid/start", s.StartNode) - s.POST("/networks/:netid/nodes/:nodeid/stop", s.StopNode) - s.POST("/networks/:netid/nodes/:nodeid/conn/:peerid", s.ConnectNode) - s.DELETE("/networks/:netid/nodes/:nodeid/conn/:peerid", s.DisconnectNode) - s.GET("/networks/:netid/nodes/:nodeid/rpc", s.NodeRPC) + network: network, + } + + s.OPTIONS("/", s.Options) + s.GET("/", s.GetNetwork) + s.POST("/start", s.StartNetwork) + s.POST("/stop", s.StopNetwork) + s.GET("/events", s.StreamNetworkEvents) + s.GET("/snapshot", s.CreateSnapshot) + s.POST("/snapshot", s.LoadSnapshot) + s.POST("/mock/:mockid", s.StartMocker) + s.GET("/mock", s.GetMocker) + s.POST("/nodes", s.CreateNode) + s.GET("/nodes", s.GetNodes) + s.GET("/nodes/:nodeid", s.GetNode) + s.POST("/nodes/:nodeid/start", s.StartNode) + s.POST("/nodes/:nodeid/stop", s.StopNode) + s.POST("/nodes/:nodeid/conn/:peerid", s.ConnectNode) + s.DELETE("/nodes/:nodeid/conn/:peerid", s.DisconnectNode) + s.GET("/nodes/:nodeid/rpc", s.NodeRPC) return s } -// CreateNetwork creates a new simulation network -func (s *Server) CreateNetwork(w http.ResponseWriter, req *http.Request) { - config := &NetworkConfig{} - if err := json.NewDecoder(req.Body).Decode(config); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - network, err := func() (*Network, error) { - s.mtx.Lock() - defer s.mtx.Unlock() - if config.ID == "" { - config.ID = fmt.Sprintf("net%d", len(s.networks)+1) - } - if _, exists := s.networks[config.ID]; exists { - return nil, fmt.Errorf("network exists: %s", config.ID) - } - network := NewNetwork(s.NewAdapter(), config) - s.networks[config.ID] = network - return network, nil - }() - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - s.JSON(w, http.StatusCreated, network) -} - -// GetNetworks returns a list of simulations networks -func (s *Server) GetNetworks(w http.ResponseWriter, req *http.Request) { - s.mtx.Lock() - networks := make([]*Network, 0, len(s.networks)) - for _, network := range s.networks { - networks = append(networks, network) - } - s.mtx.Unlock() - - s.JSON(w, http.StatusOK, networks) -} - -// GetNetwork returns details of a network +// GetNetwork returns details of the network func (s *Server) GetNetwork(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - - s.JSON(w, http.StatusOK, network) + s.JSON(w, http.StatusOK, s.network) } -// StartNetwork starts all nodes in a network +// StartNetwork starts all nodes in the network func (s *Server) StartNetwork(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - - if err := network.StartAll(); err != nil { + if err := s.network.StartAll(); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -378,11 +295,9 @@ func (s *Server) StartNetwork(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } -// StopNetwork stops all nodes in a network +// StopNetwork stops all nodes in the network func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - - if err := network.StopAll(); err != nil { + if err := s.network.StopAll(); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -390,22 +305,6 @@ func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } -// DeleteNetwork stops all nodes in a network and deletes it -func (s *Server) DeleteNetwork(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - - if err := network.StopAll(); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - s.mtx.Lock() - delete(s.networks, network.ID) - s.mtx.Unlock() - - w.WriteHeader(http.StatusOK) -} - //Get the info for a particular mocker func (s *Server) GetMocker(w http.ResponseWriter, req *http.Request) { m := make(map[string]string) @@ -418,7 +317,6 @@ func (s *Server) GetMocker(w http.ResponseWriter, req *http.Request) { } func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) mockerid := req.Context().Value("mock").(string) if len(s.Mockers) == 0 { @@ -436,7 +334,7 @@ func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { http.Error(w, "mocker not configured", http.StatusInternalServerError) return } - go mocker.Mocker(network) + go mocker.Mocker(s.network) w.WriteHeader(http.StatusOK) } else { http.Error(w, "invalid mockerid provided", http.StatusBadRequest) @@ -446,10 +344,8 @@ func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { // StreamNetworkEvents streams network events as a server-sent-events stream func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - events := make(chan *Event) - sub := network.events.Subscribe(events) + sub := s.network.events.Subscribe(events) defer sub.Unsubscribe() // stop the stream if the client goes away @@ -494,9 +390,7 @@ func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { // CreateSnapshot creates a network snapshot func (s *Server) CreateSnapshot(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - - snap, err := network.Snapshot() + snap, err := s.network.Snapshot() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -505,28 +399,24 @@ func (s *Server) CreateSnapshot(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, snap) } -// LoadSnapshot loads a snapshot into a network +// LoadSnapshot loads a snapshot into the network func (s *Server) LoadSnapshot(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - snap := &Snapshot{} if err := json.NewDecoder(req.Body).Decode(snap); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - if err := network.Load(snap); err != nil { + if err := s.network.Load(snap); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - s.JSON(w, http.StatusOK, network) + s.JSON(w, http.StatusOK, s.network) } -// CreateNode creates a node in a network using the given configuration +// CreateNode creates a node in the network using the given configuration func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - config := adapters.RandomNodeConfig() err := json.NewDecoder(req.Body).Decode(config) if err != nil && err != io.EOF { @@ -534,7 +424,7 @@ func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { return } - node, err := network.NewNodeWithConfig(config) + node, err := s.network.NewNodeWithConfig(config) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -543,11 +433,9 @@ func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusCreated, node.NodeInfo()) } -// GetNodes returns all nodes which exist in a network +// GetNodes returns all nodes which exist in the network func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) - - nodes := network.GetNodes() + nodes := s.network.GetNodes() infos := make([]*p2p.NodeInfo, len(nodes)) for i, node := range nodes { @@ -566,10 +454,9 @@ func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) { // StartNode starts a node func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) - if err := network.Start(node.ID()); err != nil { + if err := s.network.Start(node.ID()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -579,10 +466,9 @@ func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { // StopNode stops a node func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) - if err := network.Stop(node.ID()); err != nil { + if err := s.network.Stop(node.ID()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -592,11 +478,10 @@ func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { // ConnectNode connects a node to a peer node func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) peer := req.Context().Value("peer").(*Node) - if err := network.Connect(node.ID(), peer.ID()); err != nil { + if err := s.network.Connect(node.ID(), peer.ID()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -606,11 +491,10 @@ func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { // DisconnectNode disconnects a node from a peer node func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { - network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) peer := req.Context().Value("peer").(*Node) - if err := network.Disconnect(node.ID(), peer.ID()); err != nil { + if err := s.network.Disconnect(node.ID(), peer.ID()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -677,29 +561,12 @@ func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { ctx := context.Background() - var network *Network - if id := params.ByName("netid"); id != "" { - s.mtx.Lock() - var ok bool - network, ok = s.networks[id] - s.mtx.Unlock() - if !ok { - http.NotFound(w, req) - return - } - ctx = context.WithValue(ctx, "network", network) - } - if id := params.ByName("nodeid"); id != "" { - if network == nil { - http.NotFound(w, req) - return - } var node *Node if nodeID, err := discover.HexID(id); err == nil { - node = network.GetNode(nodeID) + node = s.network.GetNode(nodeID) } else { - node = network.GetNodeByName(id) + node = s.network.GetNodeByName(id) } if node == nil { http.NotFound(w, req) @@ -709,15 +576,11 @@ func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { } if id := params.ByName("peerid"); id != "" { - if network == nil { - http.NotFound(w, req) - return - } var peer *Node if peerID, err := discover.HexID(id); err == nil { - peer = network.GetNode(peerID) + peer = s.network.GetNode(peerID) } else { - peer = network.GetNodeByName(id) + peer = s.network.GetNodeByName(id) } if peer == nil { http.NotFound(w, req) @@ -727,10 +590,6 @@ func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { } if id := params.ByName("mockid"); id != "" { - if network == nil { - http.NotFound(w, req) - return - } ctx = context.WithValue(ctx, "mock", id) } diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go index 84ceb0f6775e..f9955db5c0fe 100644 --- a/p2p/simulations/http_test.go +++ b/p2p/simulations/http_test.go @@ -127,54 +127,32 @@ var testServices = adapters.Services{ "test": newTestService, } -func testHTTPServer(t *testing.T) *httptest.Server { - return httptest.NewServer(NewServer(&ServerConfig{ - NewAdapter: func() adapters.NodeAdapter { return adapters.NewSimAdapter(testServices) }, - })) +func testHTTPServer(t *testing.T) (*Network, *httptest.Server) { + adapter := adapters.NewSimAdapter(testServices) + network := NewNetwork(adapter, &NetworkConfig{ + DefaultService: "test", + }) + return network, httptest.NewServer(NewServer(network, ServerConfig{})) } -// TestHTTPNetwork tests creating and interacting with a simulation -// network using the HTTP API +// TestHTTPNetwork tests interacting with a simulation network using the HTTP +// API func TestHTTPNetwork(t *testing.T) { // start the server - s := testHTTPServer(t) + network, s := testHTTPServer(t) defer s.Close() - // create a network - client := NewClient(s.URL) - config := &NetworkConfig{ - DefaultService: "test", - } - network, err := client.CreateNetwork(config) - if err != nil { - t.Fatalf("error creating network: %s", err) - } - // subscribe to events so we can check them later + client := NewClient(s.URL) events := make(chan *Event, 100) - sub, err := client.SubscribeNetwork(network.ID, events) + sub, err := client.SubscribeNetwork(events) if err != nil { t.Fatalf("error subscribing to network events: %s", err) } defer sub.Unsubscribe() - // check the network has an ID - if network.ID == "" { - t.Fatal("expected network.ID to be set") - } - - // check the network exists - networks, err := client.GetNetworks() - if err != nil { - t.Fatalf("error getting networks: %s", err) - } - if len(networks) != 1 { - t.Fatalf("expected 1 network, got %d", len(networks)) - } - if networks[0].ID != network.ID { - t.Fatalf("expected network to have ID %q, got %q", network.ID, networks[0].ID) - } - gotNetwork, err := client.GetNetwork(network.ID) + // check we can retrieve details about the network + gotNetwork, err := client.GetNetwork() if err != nil { t.Fatalf("error getting network: %s", err) } @@ -185,7 +163,7 @@ func TestHTTPNetwork(t *testing.T) { // create 2 nodes nodeIDs := make([]string, 2) for i := 0; i < 2; i++ { - node, err := client.CreateNode(network.ID, nil) + node, err := client.CreateNode(nil) if err != nil { t.Fatalf("error creating node: %s", err) } @@ -193,7 +171,7 @@ func TestHTTPNetwork(t *testing.T) { } // check both nodes exist - nodes, err := client.GetNodes(network.ID) + nodes, err := client.GetNodes() if err != nil { t.Fatalf("error getting nodes: %s", err) } @@ -204,7 +182,7 @@ func TestHTTPNetwork(t *testing.T) { if nodes[i].ID != nodeID { t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID) } - node, err := client.GetNode(network.ID, nodeID) + node, err := client.GetNode(nodeID) if err != nil { t.Fatalf("error getting node %d: %s", i, err) } @@ -215,13 +193,13 @@ func TestHTTPNetwork(t *testing.T) { // start both nodes for _, nodeID := range nodeIDs { - if err := client.StartNode(network.ID, nodeID); err != nil { + if err := client.StartNode(nodeID); err != nil { t.Fatalf("error starting node %q: %s", nodeID, err) } } // connect the nodes - if err := client.ConnectNode(network.ID, nodeIDs[0], nodeIDs[1]); err != nil { + if err := client.ConnectNode(nodeIDs[0], nodeIDs[1]); err != nil { t.Fatalf("error connecting nodes: %s", err) } @@ -319,31 +297,27 @@ func (t *expectEvents) expect(events ...*Event) { // TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API func TestHTTPNodeRPC(t *testing.T) { // start the server - s := testHTTPServer(t) + _, s := testHTTPServer(t) defer s.Close() - // start a node in a network + // start a node in the network client := NewClient(s.URL) - network, err := client.CreateNetwork(&NetworkConfig{DefaultService: "test"}) - if err != nil { - t.Fatalf("error creating network: %s", err) - } - node, err := client.CreateNode(network.ID, nil) + node, err := client.CreateNode(nil) if err != nil { t.Fatalf("error creating node: %s", err) } - if err := client.StartNode(network.ID, node.ID); err != nil { + if err := client.StartNode(node.ID); err != nil { t.Fatalf("error starting node: %s", err) } // create two RPC clients ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - rpcClient1, err := client.RPCClient(ctx, network.ID, node.ID) + rpcClient1, err := client.RPCClient(ctx, node.ID) if err != nil { t.Fatalf("error getting node RPC client: %s", err) } - rpcClient2, err := client.RPCClient(ctx, network.ID, node.ID) + rpcClient2, err := client.RPCClient(ctx, node.ID) if err != nil { t.Fatalf("error getting node RPC client: %s", err) } @@ -382,35 +356,31 @@ func TestHTTPNodeRPC(t *testing.T) { // TestHTTPSnapshot tests creating and loading network snapshots func TestHTTPSnapshot(t *testing.T) { // start the server - s := testHTTPServer(t) + _, s := testHTTPServer(t) defer s.Close() // create a two-node network client := NewClient(s.URL) - network, err := client.CreateNetwork(&NetworkConfig{DefaultService: "test"}) - if err != nil { - t.Fatalf("error creating network: %s", err) - } nodeCount := 2 nodes := make([]*p2p.NodeInfo, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := client.CreateNode(network.ID, nil) + node, err := client.CreateNode(nil) if err != nil { t.Fatalf("error creating node: %s", err) } - if err := client.StartNode(network.ID, node.ID); err != nil { + if err := client.StartNode(node.ID); err != nil { t.Fatalf("error starting node: %s", err) } nodes[i] = node } - if err := client.ConnectNode(network.ID, nodes[0].ID, nodes[1].ID); err != nil { + if err := client.ConnectNode(nodes[0].ID, nodes[1].ID); err != nil { t.Fatalf("error connecting nodes: %s", err) } // store some state in the test services states := make([]string, nodeCount) for i, node := range nodes { - rpc, err := client.RPCClient(context.Background(), network.ID, node.ID) + rpc, err := client.RPCClient(context.Background(), node.ID) if err != nil { t.Fatalf("error getting RPC client: %s", err) } @@ -423,7 +393,7 @@ func TestHTTPSnapshot(t *testing.T) { } // create a snapshot - snap, err := client.CreateSnapshot(network.ID) + snap, err := client.CreateSnapshot() if err != nil { t.Fatalf("error creating snapshot: %s", err) } @@ -435,26 +405,25 @@ func TestHTTPSnapshot(t *testing.T) { } // create another network - network, err = client.CreateNetwork(&NetworkConfig{DefaultService: "test"}) - if err != nil { - t.Fatalf("error creating network: %s", err) - } + _, s = testHTTPServer(t) + defer s.Close() + client = NewClient(s.URL) // subscribe to events so we can check them later events := make(chan *Event, 100) - sub, err := client.SubscribeNetwork(network.ID, events) + sub, err := client.SubscribeNetwork(events) if err != nil { t.Fatalf("error subscribing to network events: %s", err) } defer sub.Unsubscribe() // load the snapshot - if err := client.LoadSnapshot(network.ID, snap); err != nil { + if err := client.LoadSnapshot(snap); err != nil { t.Fatalf("error loading snapshot: %s", err) } // check the nodes and connection exists - net, err := client.GetNetwork(network.ID) + net, err := client.GetNetwork() if err != nil { t.Fatalf("error getting network: %s", err) } @@ -480,7 +449,7 @@ func TestHTTPSnapshot(t *testing.T) { // check the node states were restored for i, node := range nodes { - rpc, err := client.RPCClient(context.Background(), network.ID, node.ID) + rpc, err := client.RPCClient(context.Background(), node.ID) if err != nil { t.Fatalf("error getting RPC client: %s", err) } diff --git a/swarm/network/simulations/overlay.go b/swarm/network/simulations/overlay.go index 6570d30ac09d..04f7b8be60e1 100644 --- a/swarm/network/simulations/overlay.go +++ b/swarm/network/simulations/overlay.go @@ -6,6 +6,7 @@ package main import ( + "flag" "fmt" "math/rand" "net/http" @@ -22,6 +23,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" ) +var noDiscovery = flag.Bool("no-discovery", false, "disable discovery (useful if you want to load a snapshot)") + type Simulation struct { mtx sync.Mutex stores map[discover.NodeID]*adapters.SimStateStore @@ -57,6 +60,7 @@ func (s *Simulation) NewService(ctx *adapters.ServiceContext) (node.Service, err ticker := time.NewTicker(time.Duration(kad.PruneInterval) * time.Millisecond) kad.Prune(ticker.C) hp := network.NewHiveParams() + hp.Discovery = !*noDiscovery hp.KeepAliveInterval = 3 * time.Second config := &network.BzzConfig{ @@ -94,9 +98,6 @@ func createMockers() map[string]*simulations.MockerConfig { } func setupMocker(net *simulations.Network) []discover.NodeID { - conf := net.Config() - conf.DefaultService = "overlay" - nodeCount := 30 ids := make([]discover.NodeID, nodeCount) for i := 0; i < nodeCount; i++ { @@ -195,6 +196,8 @@ func startStopMocker(net *simulations.Network) { // var server func main() { + flag.Parse() + runtime.GOMAXPROCS(runtime.NumCPU()) log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) @@ -203,17 +206,20 @@ func main() { services := adapters.Services{ "overlay": s.NewService, } - adapters.RegisterServices(services) + adapter := adapters.NewSimAdapter(services) + + network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "overlay", + }) mockers := createMockers() - config := &simulations.ServerConfig{ - NewAdapter: func() adapters.NodeAdapter { return adapters.NewSimAdapter(services) }, + config := simulations.ServerConfig{ DefaultMockerID: "randomNodes", // DefaultMockerID: "bootNet", Mockers: mockers, } log.Info("starting simulation server on 0.0.0.0:8888...") - http.ListenAndServe(":8888", simulations.NewServer(config)) + http.ListenAndServe(":8888", simulations.NewServer(network, config)) }