From 5d8e64541a4deb9153673f862085449421b68d82 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Fri, 15 May 2020 16:16:24 +0200 Subject: [PATCH] Register http (#16) * register http new impl * start ws thorugh console on same port as http --- go.mod | 4 +- go.sum | 7 +- graphql/service.go | 91 +++++------- node/api.go | 121 +++++++++------ node/node.go | 361 +++++++++++++++++++++++++-------------------- node/rpcstack.go | 2 + 6 files changed, 327 insertions(+), 259 deletions(-) diff --git a/go.mod b/go.mod index 24ee0f438a87..af663e715b3c 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/go-ole/go-ole v1.2.1 // indirect github.com/go-sourcemap/sourcemap v2.1.2+incompatible // indirect github.com/go-stack/stack v1.8.0 - github.com/golang/protobuf v1.3.2-0.20190517061210-b285ee9cfc6c + github.com/golang/protobuf v1.3.3 github.com/golang/snappy v0.0.1 github.com/google/go-cmp v0.3.1 // indirect github.com/gorilla/websocket v1.4.1-0.20190629185528-ae1634f6a989 @@ -59,7 +59,7 @@ require ( github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect - golang.org/x/sync v0.0.0-20181108010431-42b317875d0f + golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 diff --git a/go.sum b/go.sum index 23b89d464473..c141c0173593 100644 --- a/go.sum +++ b/go.sum @@ -81,10 +81,11 @@ github.com/go-sourcemap/sourcemap v2.1.2+incompatible h1:0b/xya7BKGhXuqFESKM4oIi github.com/go-sourcemap/sourcemap v2.1.2+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2-0.20190517061210-b285ee9cfc6c h1:zqAKixg3cTcIasAMJV+EcfVbWwLpOZ7LeoWJvcuD/5Q= -github.com/golang/protobuf v1.3.2-0.20190517061210-b285ee9cfc6c/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= @@ -198,6 +199,8 @@ golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/graphql/service.go b/graphql/service.go index 68e41fde4689..559bb2852643 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -17,16 +17,12 @@ package graphql import ( - "context" "errors" - "fmt" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les" - "net" "net/http" "github.com/ethereum/go-ethereum/internal/ethapi" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" "github.com/graph-gophers/graphql-go" @@ -41,14 +37,7 @@ type Service struct { // New constructs a new GraphQL service instance. func New(stack *node.Node, ethBackend *eth.Ethereum, lesBackend *les.LightEthereum, endpoint string, cors, vhosts []string, timeouts rpc.HTTPTimeouts) error { - service := &Service{ - graphqlServer: &node.HTTPServer{ - Timeouts: timeouts, - Vhosts: vhosts, - CorsAllowedOrigins: cors, - GQLAllowed: true, - }, - } + service := new(Service) // add backend if ethBackend != nil { service.backend = ethBackend.APIBackend @@ -58,26 +47,52 @@ func New(stack *node.Node, ethBackend *eth.Ethereum, lesBackend *les.LightEthere return errors.New("no Ethereum service") } - service.graphqlServer.SetEndpoint(endpoint) - // create handler - handler, err := service.CreateHandler() + // check if http server with given endpoint exists and enable graphQL on it + server := stack.ExistingHTTPServer(endpoint) + if server != nil { + server.GQLAllowed = true + server.Vhosts = append(server.Vhosts, vhosts...) + server.CorsAllowedOrigins = append(server.CorsAllowedOrigins, cors...) + server.Timeouts = timeouts + // create handler + handler, err := createHandler(service.backend, cors, vhosts) + if err != nil { + return err + } + server.GQLHandler = handler + // don't register lifecycle if registering on existing http server + return nil + } + // otherwise create a new server + handler, err := createHandler(service.backend, cors, vhosts) if err != nil { return err } - service.graphqlServer.SetHandler(handler) + // create the http server + gqlServer := &node.HTTPServer{ + Vhosts: vhosts, + CorsAllowedOrigins: cors, + Timeouts: timeouts, + GQLAllowed: true, + GQLHandler: handler, + Srv: rpc.NewServer(), + } + gqlServer.SetEndpoint(endpoint) + stack.RegisterHTTPServer(gqlServer) + + service.graphqlServer = gqlServer - // TODO register http - return stack.RegisterLifecycle(service) + return nil } -func (s *Service) CreateHandler() (http.Handler, error) { +func createHandler(backend ethapi.Backend, cors, vhosts []string) (http.Handler, error) { // create handler stack and wrap the graphql handler - handler, err := newHandler(s.backend) + handler, err := newHandler(backend) if err != nil { return nil, err } + handler = node.NewHTTPHandlerStack(handler, cors, vhosts) - handler = node.NewHTTPHandlerStack(handler, s.graphqlServer.CorsAllowedOrigins, s.graphqlServer.Vhosts) return handler, nil } @@ -102,32 +117,6 @@ func newHandler(backend ethapi.Backend) (http.Handler, error) { // Start is called after all services have been constructed and the networking // layer was also initialized to spawn any goroutines required by the service. func (s *Service) Start() error { - handler, err := s.CreateHandler() - if err != nil { - return err - } - s.graphqlServer.SetHandler(handler) - - // start listening on given endpoint - listener, err := net.Listen("tcp", s.graphqlServer.Endpoint()) - if err != nil { - return err - } - // make sure timeout values are meaningful - node.CheckTimeouts(&s.graphqlServer.Timeouts) - // create http server - httpSrv := &http.Server{ - Handler: handler, - ReadTimeout: s.graphqlServer.Timeouts.ReadTimeout, - WriteTimeout: s.graphqlServer.Timeouts.WriteTimeout, - IdleTimeout: s.graphqlServer.Timeouts.IdleTimeout, - } - go httpSrv.Serve(listener) - log.Info("GraphQL endpoint opened", "url", fmt.Sprintf("http://%v", listener.Addr())) - // add information to graphql http server - s.graphqlServer.Server = httpSrv - s.graphqlServer.ListenerAddr = listener.Addr() - s.graphqlServer.SetHandler(handler) return nil } @@ -135,10 +124,10 @@ func (s *Service) Start() error { // Stop terminates all goroutines belonging to the service, blocking until they // are all terminated. func (s *Service) Stop() error { - if s.graphqlServer.Server != nil { - s.graphqlServer.Server.Shutdown(context.Background()) - log.Info("GraphQL endpoint closed", "url", fmt.Sprintf("http://%v", s.graphqlServer.ListenerAddr)) - } + //if s.graphqlServer.Server != nil { + // s.graphqlServer.Server.Shutdown(context.Background()) + // log.Info("GraphQL endpoint closed", "url", fmt.Sprintf("http://%v", s.graphqlServer.ListenerAddr)) + //} return nil } diff --git a/node/api.go b/node/api.go index 437807f6dc53..0ae40cc208c6 100644 --- a/node/api.go +++ b/node/api.go @@ -146,9 +146,11 @@ func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string, vhosts *string) (bool, error) { api.node.lock.Lock() defer api.node.lock.Unlock() - - if api.node.http.Server != nil { - return false, fmt.Errorf("HTTP RPC already running on %v", api.node.http.ListenerAddr) + // check if HTTP server already exists + for _, httpServer := range api.node.httpServers { + if httpServer.RPCAllowed { + return false, fmt.Errorf("HTTP RPC already running on %v", httpServer.ListenerAddr) + } } if host == nil { @@ -161,9 +163,7 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis if port == nil { port = &api.node.config.HTTPPort } - api.node.http.host = *host - api.node.http.port = *port - api.node.http.endpoint = fmt.Sprintf("%s:%d", *host, *port) + endpoint := fmt.Sprintf("%s:%d", *host, *port) allowedOrigins := api.node.config.HTTPCors if cors != nil { @@ -172,7 +172,6 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis allowedOrigins = append(allowedOrigins, strings.TrimSpace(origin)) } } - api.node.http.CorsAllowedOrigins = allowedOrigins allowedVHosts := api.node.config.HTTPVirtualHosts if vhosts != nil { @@ -181,27 +180,37 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis allowedVHosts = append(allowedVHosts, strings.TrimSpace(vhost)) } } - api.node.http.Vhosts = allowedVHosts - modules := api.node.http.Whitelist + modules := api.node.config.HTTPModules if apis != nil { modules = nil for _, m := range strings.Split(*apis, ",") { modules = append(modules, strings.TrimSpace(m)) } } - api.node.http.Whitelist = modules + // configure http server + httpServer := &HTTPServer{ + host: *host, + port: *port, + endpoint: endpoint, + Srv: rpc.NewServer(), + CorsAllowedOrigins: allowedOrigins, + Vhosts: allowedVHosts, + Whitelist: modules, + } // create handler - api.node.http.handler = NewHTTPHandlerStack(api.node.http.Srv, api.node.http.CorsAllowedOrigins, api.node.http.Vhosts) + httpServer.handler = NewHTTPHandlerStack(httpServer.Srv, httpServer.CorsAllowedOrigins, httpServer.Vhosts) // create HTTP server - if err := api.node.CreateHTTPServer(api.node.http, false); err != nil { + if err := api.node.CreateHTTPServer(httpServer, false); err != nil { return false, err } // start the HTTP server - api.node.http.Start() - api.node.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", api.node.http.ListenerAddr), - "cors", strings.Join(api.node.http.CorsAllowedOrigins, ","), - "vhosts", strings.Join(api.node.http.Vhosts, ",")) + httpServer.Start() + api.node.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", httpServer.ListenerAddr), + "cors", strings.Join(httpServer.CorsAllowedOrigins, ","), + "vhosts", strings.Join(httpServer.Vhosts, ",")) + + api.node.RegisterHTTPServer(httpServer) return true, nil } @@ -210,22 +219,25 @@ func (api *PrivateAdminAPI) StopRPC() (bool, error) { api.node.lock.Lock() defer api.node.lock.Unlock() - if api.node.http == nil { - return false, fmt.Errorf("HTTP RPC not running") + for _, httpServer := range api.node.httpServers { + if httpServer.RPCAllowed { + api.node.stopServer(httpServer) + return true, nil + } } - api.node.stopHTTP() - return true, nil + + return false, fmt.Errorf("HTTP RPC not running") } // StartWS starts the websocket RPC API server. func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *string, apis *string) (bool, error) { api.node.lock.Lock() defer api.node.lock.Unlock() - - if api.node.ws.Server != nil { - return false, fmt.Errorf("WebSocket RPC already running on %v", api.node.ws.ListenerAddr) - } else if api.node.http.WSAllowed { - return false, fmt.Errorf("WebSocket RPC already running on %v", api.node.http.ListenerAddr) + // check if an existing HTTP server already handles websocket + for _, httpServer := range api.node.httpServers { + if httpServer.WSAllowed { + return false, fmt.Errorf("WebSocket RPC already running on %v", httpServer.ListenerAddr) + } } // set host, port and endpoint if host == nil { @@ -238,15 +250,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str if port == nil { port = &api.node.config.WSPort } - api.node.ws.host = *host - api.node.ws.port = *port - api.node.ws.endpoint = fmt.Sprintf("%s:%d", *host, *port) - - if api.node.ws.endpoint == api.node.http.endpoint && api.node.http.Server != nil { - api.node.http.WSAllowed = true - api.node.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", api.node.http.ListenerAddr)) - return true, nil - } + endpoint := fmt.Sprintf("%s:%d", *host, *port) origins := api.node.config.WSOrigins if allowedOrigins != nil { @@ -255,7 +259,14 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str origins = append(origins, strings.TrimSpace(origin)) } } - api.node.ws.WsOrigins = origins + // check if an HTTP server exists on the given endpoint, and if so, enable websocket on that HTTP server + existingServer := api.node.ExistingHTTPServer(endpoint) + if existingServer != nil { + existingServer.WSAllowed = true + existingServer.WsOrigins = origins + api.node.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", existingServer.ListenerAddr)) + return true, nil + } modules := api.node.config.WSModules if apis != nil { @@ -264,15 +275,26 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str modules = append(modules, strings.TrimSpace(m)) } } - api.node.ws.Whitelist = modules - api.node.ws.handler = api.node.ws.Srv.WebsocketHandler(api.node.ws.WsOrigins) - if err := api.node.CreateHTTPServer(api.node.ws, api.node.config.WSExposeAll); err != nil { + wsServer := &HTTPServer{ + Srv: rpc.NewServer(), + endpoint: endpoint, + host: *host, + port: *port, + Whitelist: modules, + WsOrigins: origins, + WSAllowed: true, + } + + wsServer.handler = wsServer.Srv.WebsocketHandler(wsServer.WsOrigins) + if err := api.node.CreateHTTPServer(wsServer, api.node.config.WSExposeAll); err != nil { return false, err } - api.node.ws.Start() - api.node.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", api.node.ws.ListenerAddr)) + wsServer.Start() + api.node.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", wsServer.ListenerAddr)) + + api.node.RegisterHTTPServer(wsServer) return true, nil } @@ -281,16 +303,19 @@ func (api *PrivateAdminAPI) StopWS() (bool, error) { api.node.lock.Lock() defer api.node.lock.Unlock() - if api.node.ws.Server == nil && !api.node.http.WSAllowed { - return false, fmt.Errorf("WebSocket RPC not running") - } - if api.node.http.WSAllowed { - api.node.http.WSAllowed = false - return true, nil + for _, httpServer := range api.node.httpServers { + if httpServer.WSAllowed { + httpServer.WSAllowed = false + // if RPC is not enabled on the WS http server, shut it down + if !httpServer.RPCAllowed { + api.node.stopServer(httpServer) + } + + return true, nil + } } - api.node.stopWS() - return true, nil + return false, fmt.Errorf("WebSocket RPC not running") } // PublicAdminAPI is the collection of administrative API methods exposed over diff --git a/node/node.go b/node/node.go index 7f7fe4e044d8..11ebea4d32bd 100644 --- a/node/node.go +++ b/node/node.go @@ -58,9 +58,9 @@ type Node struct { rpcAPIs []rpc.API // List of APIs currently provided by the node inprocHandler *rpc.Server // In-process RPC request handler to process the API requests + httpServers []*HTTPServer // Stores information about all http servers (if any), including http, ws, and graphql + ipc *HTTPServer // TODO - http *HTTPServer // TODO - ws *HTTPServer // TODO stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex @@ -110,36 +110,13 @@ func New(conf *Config) (*Node, error) { ServiceContext: &ServiceContext{ Config: *conf, }, + httpServers: make([]*HTTPServer, 0), ipc: &HTTPServer{ endpoint: conf.IPCEndpoint(), }, - http: &HTTPServer{ - CorsAllowedOrigins: conf.HTTPCors, - Vhosts: conf.HTTPVirtualHosts, - Whitelist: conf.HTTPModules, - Timeouts: conf.HTTPTimeouts, - Srv: rpc.NewServer(), - endpoint: conf.HTTPEndpoint(), - host: conf.HTTPHost, - port: conf.HTTPPort, - }, - ws: &HTTPServer{ - CorsAllowedOrigins: conf.WSOrigins, - Whitelist: conf.WSModules, - Srv: rpc.NewServer(), - endpoint: conf.WSEndpoint(), - host: conf.WSHost, - port: conf.WSPort, - }, eventmux: new(event.TypeMux), log: conf.Logger, } - if conf.HTTPHost != "" { - node.http.RPCAllowed = true - } - if conf.WSHost != "" { - node.ws.WSAllowed = true - } // Initialize the p2p server. This creates the node key and // discovery databases. @@ -156,9 +133,44 @@ func New(conf *Config) (*Node, error) { if node.server.Config.NodeDatabase == "" { node.server.Config.NodeDatabase = node.config.NodeDB() } - + // Configure service context node.ServiceContext.EventMux = node.eventmux node.ServiceContext.AccountManager = node.accman + // Configure HTTP server(s) + if conf.HTTPHost != "" { + httpServ := &HTTPServer{ + CorsAllowedOrigins: conf.HTTPCors, + Vhosts: conf.HTTPVirtualHosts, + Whitelist: conf.HTTPModules, + Timeouts: conf.HTTPTimeouts, + Srv: rpc.NewServer(), + endpoint: conf.HTTPEndpoint(), + host: conf.HTTPHost, + port: conf.HTTPPort, + RPCAllowed: true, + } + // check if ws is enabled and if ws port is the same as http port + if conf.WSHost != "" && conf.WSPort == conf.HTTPPort { + httpServ.WSAllowed = true + httpServ.WsOrigins = conf.WSOrigins + httpServ.Whitelist = append(httpServ.Whitelist, conf.WSModules...) + node.httpServers = append(node.httpServers, httpServ) + return node, nil + } + node.httpServers = append(node.httpServers, httpServ) + } + if conf.WSHost != "" { + node.httpServers = append(node.httpServers, &HTTPServer{ + WsOrigins: conf.WSOrigins, + Whitelist: conf.WSModules, + Srv: rpc.NewServer(), + endpoint: conf.WSEndpoint(), + host: conf.WSHost, + port: conf.WSPort, + WSAllowed: true, + }) + } + return node, nil } @@ -209,6 +221,20 @@ func (n *Node) RegisterAPIs(apis []rpc.API) { n.rpcAPIs = append(n.rpcAPIs, apis...) } +func (n *Node) ExistingHTTPServer(endpoint string) *HTTPServer { + for _, httpServer := range n.httpServers { + if endpoint == httpServer.endpoint { + return httpServer + } + } + + return nil +} + +func (n *Node) RegisterHTTPServer(server *HTTPServer) { + n.httpServers = append(n.httpServers, server) +} + // TODO document func (n *Node) RegisterHTTP(dest *HTTPServer, toRegister *HTTPServer) { // takes in default existing http server @@ -247,6 +273,7 @@ func (n *Node) CreateHTTPServer(h *HTTPServer, exposeAll bool) error { httpSrv.WriteTimeout = h.Timeouts.WriteTimeout httpSrv.IdleTimeout = h.Timeouts.IdleTimeout } + // complete the HTTPServer h.Listener = listener h.ListenerAddr = listener.Addr() @@ -350,46 +377,36 @@ func (n *Node) startRPC() error { n.stopInProc() return err } - // create and start ws server if the endpoint exists - if n.ws.endpoint != "" && n.http.endpoint != n.ws.endpoint { - n.ws.handler = n.ws.Srv.WebsocketHandler(n.ws.CorsAllowedOrigins) - if err := n.CreateHTTPServer(n.ws, n.config.WSExposeAll); err != nil { - n.stopIPC() - n.stopInProc() - return err + + for _, server := range n.httpServers { + // configure the handlers + if server.RPCAllowed { + server.handler = NewHTTPHandlerStack(server.Srv, server.CorsAllowedOrigins, server.Vhosts) + // wrap ws handler just in case ws is enabled through the console after start-up + wsHandler := server.Srv.WebsocketHandler(server.WsOrigins) + server.handler = server.NewWebsocketUpgradeHandler(server.handler, wsHandler) + + n.log.Info("HTTP configured on endpoint ", "endpoint", server.endpoint) } - n.ws.Start() - n.ws.WSAllowed = true - n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", n.ws.ListenerAddr)) - } - // create and start http server if the endpoint exists // TODO CLEAN THIS UP!!!!!!!!! - if n.http.endpoint != "" { - // wrap handler in websocket handler only if websocket port is the same as http rpc - n.http.handler = NewHTTPHandlerStack(n.http.Srv, n.http.CorsAllowedOrigins, n.http.Vhosts) - // if websocket server is not already started, or is specified on the same endpoint as http, - // register websocket on the http server - if n.ws.Server == nil { - if n.http.endpoint == n.ws.endpoint { - n.http.WSAllowed = true - } - n.ws.handler = n.ws.Srv.WebsocketHandler(n.ws.CorsAllowedOrigins) - n.RegisterHTTP(n.http, n.ws) + if server.WSAllowed && server.handler == nil { + server.handler = server.Srv.WebsocketHandler(server.WsOrigins) + n.log.Info("Websocket configured on endpoint ", "endpoint", server.endpoint) } - // only set exposeAll if websocket is enabled - var exposeAll bool - if n.http.WSAllowed { - exposeAll = n.config.WSExposeAll + if server.GQLAllowed { + if server.handler == nil { + server.handler = server.GQLHandler + } else { + server.handler = NewGQLUpgradeHandler(server.handler, server.GQLHandler) + } + n.log.Info("GraphQL configured on endpoint ", "endpoint", server.endpoint) } - // create the HTTP Server - if err := n.CreateHTTPServer(n.http, exposeAll); err != nil { - n.stopIPC() - n.stopInProc() + // create the HTTP server + if err := n.CreateHTTPServer(server, false); err != nil { return err } - n.http.Start() - n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", n.http.ListenerAddr), - "cors", strings.Join(n.http.CorsAllowedOrigins, ","), - "vhosts", strings.Join(n.http.Vhosts, ",")) + // start HTTP server + server.Start() + n.log.Info("HTTP endpoint successfully opened", "url", fmt.Sprintf("http://%v/", server.ListenerAddr)) } // All API endpoints started successfully return nil @@ -446,96 +463,115 @@ func (n *Node) stopIPC() { } } -// startHTTP initializes and starts the HTTP RPC endpoint. -func (n *Node) startHTTP(endpoint string, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts, wsOrigins []string) error { - // Short circuit if the HTTP endpoint isn't being exposed - if endpoint == "" { - return nil - } - // register apis and create handler stack - srv := rpc.NewServer() - err := RegisterApisFromWhitelist(n.rpcAPIs, modules, srv, false) - if err != nil { - return err - } - handler := NewHTTPHandlerStack(srv, cors, vhosts) - // wrap handler in websocket handler only if websocket port is the same as http rpc - if n.http.Endpoint() == n.ws.Endpoint() { - handler = n.http.NewWebsocketUpgradeHandler(handler, srv.WebsocketHandler(wsOrigins)) - } - httpServer, addr, err := StartHTTPEndpoint(endpoint, timeouts, handler) - if err != nil { - return err - } - n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", addr), - "cors", strings.Join(cors, ","), - "vhosts", strings.Join(vhosts, ",")) - if n.http.Endpoint() == n.ws.Endpoint() { - n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", addr)) - } - // All listeners booted successfully - n.http.endpoint = endpoint - n.http.Server = httpServer - n.http.ListenerAddr = addr - n.http.Srv = srv - - return nil -} - -// stopHTTP terminates the HTTP RPC endpoint. -func (n *Node) stopHTTP() { - if n.http.Server != nil { - url := fmt.Sprintf("http://%v/", n.http.ListenerAddr) - // Don't bother imposing a timeout here. - n.http.Server.Shutdown(context.Background()) - n.log.Info("HTTP Endpoint closed", "url", url) - } - if n.http.Srv != nil { - n.http.Srv.Stop() - n.http.Srv = nil - } -} - -// startWS initializes and starts the websocket RPC endpoint. -func (n *Node) startWS(endpoint string, modules []string, wsOrigins []string, exposeAll bool) error { - // Short circuit if the WS endpoint isn't being exposed - if endpoint == "" { - return nil - } - - srv := rpc.NewServer() - handler := srv.WebsocketHandler(wsOrigins) - err := RegisterApisFromWhitelist(n.rpcAPIs, modules, srv, exposeAll) - if err != nil { - return err - } - httpServer, addr, err := startWSEndpoint(endpoint, handler) - if err != nil { - return err - } - n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", addr)) - // All listeners booted successfully - n.ws.endpoint = endpoint - n.ws.ListenerAddr = addr - n.ws.Server = httpServer - n.ws.Srv = srv - - return nil -} +//// startHTTP initializes and starts the HTTP RPC endpoint. +//func (n *Node) startHTTP(endpoint string, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts, wsOrigins []string) error { +// // Short circuit if the HTTP endpoint isn't being exposed +// if endpoint == "" { +// return nil +// } +// // register apis and create handler stack +// srv := rpc.NewServer() +// err := RegisterApisFromWhitelist(n.rpcAPIs, modules, srv, false) +// if err != nil { +// return err +// } +// handler := NewHTTPHandlerStack(srv, cors, vhosts) +// // wrap handler in websocket handler only if websocket port is the same as http rpc +// if n.http.Endpoint() == n.ws.Endpoint() { +// handler = n.http.NewWebsocketUpgradeHandler(handler, srv.WebsocketHandler(wsOrigins)) +// } +// httpServer, addr, err := StartHTTPEndpoint(endpoint, timeouts, handler) +// if err != nil { +// return err +// } +// n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", addr), +// "cors", strings.Join(cors, ","), +// "vhosts", strings.Join(vhosts, ",")) +// if n.http.Endpoint() == n.ws.Endpoint() { +// n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", addr)) +// } +// // All listeners booted successfully +// n.http.endpoint = endpoint +// n.http.Server = httpServer +// n.http.ListenerAddr = addr +// n.http.Srv = srv +// +// return nil +//} -// stopWS terminates the websocket RPC endpoint. -func (n *Node) stopWS() { - if n.ws.Server != nil { - url := fmt.Sprintf("http://%v/", n.ws.ListenerAddr) +// stopServers terminates the given HTTP servers' endpoints +func (n *Node) stopServer(server *HTTPServer) { + if server.Server != nil { + url := fmt.Sprintf("http://%v/", server.ListenerAddr) // Don't bother imposing a timeout here. - n.ws.Server.Shutdown(context.Background()) + server.Server.Shutdown(context.Background()) n.log.Info("HTTP Endpoint closed", "url", url) } - if n.ws.Srv != nil { - n.ws.Srv.Stop() - n.ws.Srv = nil - } -} + if server.Srv != nil { + server.Srv.Stop() + server.Srv = nil + } +} + + +//// stopHTTP terminates the HTTP RPC endpoint. +//func (n *Node) stopHTTP() { +// for _, server := range n.httpServers { +// if server.RPCAllowed { +// if server.Server != nil { +// url := fmt.Sprintf("http://%v/", server.ListenerAddr) +// // Don't bother imposing a timeout here. +// server.Server.Shutdown(context.Background()) +// n.log.Info("HTTP Endpoint closed", "url", url) +// } +// if server.Srv != nil { +// server.Srv.Stop() +// server.Srv = nil +// } +// } +// } +//} + +//// startWS initializes and starts the websocket RPC endpoint. +//func (n *Node) startWS(endpoint string, modules []string, wsOrigins []string, exposeAll bool) error { +// // Short circuit if the WS endpoint isn't being exposed +// if endpoint == "" { +// return nil +// } +// +// srv := rpc.NewServer() +// handler := srv.WebsocketHandler(wsOrigins) +// err := RegisterApisFromWhitelist(n.rpcAPIs, modules, srv, exposeAll) +// if err != nil { +// return err +// } +// httpServer, addr, err := startWSEndpoint(endpoint, handler) +// if err != nil { +// return err +// } +// n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", addr)) +// // All listeners booted successfully +// n.ws.endpoint = endpoint +// n.ws.ListenerAddr = addr +// n.ws.Server = httpServer +// n.ws.Srv = srv +// +// return nil +//} + +//// stopWS terminates the websocket RPC endpoint. +//func (n *Node) stopWS() { +// if n.ws.Server != nil { +// url := fmt.Sprintf("http://%v/", n.ws.ListenerAddr) +// // Don't bother imposing a timeout here. +// n.ws.Server.Shutdown(context.Background()) +// n.log.Info("HTTP Endpoint closed", "url", url) +// } +// if n.ws.Srv != nil { +// n.ws.Srv.Stop() +// n.ws.Srv = nil +// } +//} // Stop terminates a running node along with all it's services. In the node was // not started, an error is returned. @@ -549,8 +585,9 @@ func (n *Node) Stop() error { } // Terminate the API, services and the p2p server. - n.stopWS() - n.stopHTTP() + for _, httpServer := range n.httpServers { + n.stopServer(httpServer) + } n.stopIPC() n.rpcAPIs = nil failure := &StopError{ @@ -674,10 +711,16 @@ func (n *Node) HTTPEndpoint() string { n.lock.Lock() defer n.lock.Unlock() - if n.http.Server != nil { - return n.http.ListenerAddr.String() + for _, httpServer := range n.httpServers { + if httpServer.RPCAllowed { + if httpServer.Listener != nil { + return httpServer.ListenerAddr.String() + } + return httpServer.endpoint + } } - return n.http.Endpoint() + + return "" // TODO should return an empty string if http server not configured? } // WSEndpoint retrieves the current WS endpoint @@ -686,10 +729,16 @@ func (n *Node) WSEndpoint() string { n.lock.Lock() defer n.lock.Unlock() - if n.ws.Server != nil { - return n.ws.ListenerAddr.String() + for _, httpServer := range n.httpServers { + if httpServer.WSAllowed { + if httpServer.Listener != nil { + return httpServer.ListenerAddr.String() + } + return httpServer.endpoint + } } - return n.ws.Endpoint() + + return "" // TODO should return an empty string if ws server not configured? } // EventMux retrieves the event multiplexer used by all the network services in diff --git a/node/rpcstack.go b/node/rpcstack.go index 4d214d80e45d..3903762c88c2 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -53,6 +53,8 @@ type HTTPServer struct { RPCAllowed bool WSAllowed bool GQLAllowed bool + + GQLHandler http.Handler } // TODO document