Skip to content

Commit

Permalink
Register http (#16)
Browse files Browse the repository at this point in the history
* register http new impl

* start ws thorugh console on same port as http
  • Loading branch information
renaynay committed Jul 7, 2020
1 parent d8c9669 commit ecde8cd
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 258 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/go-ole/go-ole v1.2.1 // indirect
github.com/go-sourcemap/sourcemap v2.1.2+incompatible // indirect
github.com/go-stack/stack v1.8.0
github.com/golang/protobuf v1.3.2-0.20190517061210-b285ee9cfc6c
github.com/golang/protobuf v1.3.3
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf
github.com/google/go-cmp v0.3.1 // indirect
github.com/gorilla/websocket v1.4.1-0.20190629185528-ae1634f6a989
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ github.com/go-sourcemap/sourcemap v2.1.2+incompatible h1:0b/xya7BKGhXuqFESKM4oIi
github.com/go-sourcemap/sourcemap v2.1.2+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2-0.20190517061210-b285ee9cfc6c h1:zqAKixg3cTcIasAMJV+EcfVbWwLpOZ7LeoWJvcuD/5Q=
github.com/golang/protobuf v1.3.2-0.20190517061210-b285ee9cfc6c/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf h1:gFVkHXmVAhEbxZVDln5V9GKrLaluNoFHDbrZwAWZgws=
Expand Down Expand Up @@ -204,6 +205,8 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
91 changes: 40 additions & 51 deletions graphql/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@
package graphql

import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les"
"net"
"net/http"

"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/graph-gophers/graphql-go"
Expand All @@ -41,14 +37,7 @@ type Service struct {

// New constructs a new GraphQL service instance.
func New(stack *node.Node, ethBackend *eth.Ethereum, lesBackend *les.LightEthereum, endpoint string, cors, vhosts []string, timeouts rpc.HTTPTimeouts) error {
service := &Service{
graphqlServer: &node.HTTPServer{
Timeouts: timeouts,
Vhosts: vhosts,
CorsAllowedOrigins: cors,
GQLAllowed: true,
},
}
service := new(Service)
// add backend
if ethBackend != nil {
service.backend = ethBackend.APIBackend
Expand All @@ -58,26 +47,52 @@ func New(stack *node.Node, ethBackend *eth.Ethereum, lesBackend *les.LightEthere
return errors.New("no Ethereum service")
}

service.graphqlServer.SetEndpoint(endpoint)
// create handler
handler, err := service.CreateHandler()
// check if http server with given endpoint exists and enable graphQL on it
server := stack.ExistingHTTPServer(endpoint)
if server != nil {
server.GQLAllowed = true
server.Vhosts = append(server.Vhosts, vhosts...)
server.CorsAllowedOrigins = append(server.CorsAllowedOrigins, cors...)
server.Timeouts = timeouts
// create handler
handler, err := createHandler(service.backend, cors, vhosts)
if err != nil {
return err
}
server.GQLHandler = handler
// don't register lifecycle if registering on existing http server
return nil
}
// otherwise create a new server
handler, err := createHandler(service.backend, cors, vhosts)
if err != nil {
return err
}
service.graphqlServer.SetHandler(handler)
// create the http server
gqlServer := &node.HTTPServer{
Vhosts: vhosts,
CorsAllowedOrigins: cors,
Timeouts: timeouts,
GQLAllowed: true,
GQLHandler: handler,
Srv: rpc.NewServer(),
}
gqlServer.SetEndpoint(endpoint)
stack.RegisterHTTPServer(gqlServer)

service.graphqlServer = gqlServer

// TODO register http
return stack.RegisterLifecycle(service)
return nil
}

func (s *Service) CreateHandler() (http.Handler, error) {
func createHandler(backend ethapi.Backend, cors, vhosts []string) (http.Handler, error) {
// create handler stack and wrap the graphql handler
handler, err := newHandler(s.backend)
handler, err := newHandler(backend)
if err != nil {
return nil, err
}
handler = node.NewHTTPHandlerStack(handler, cors, vhosts)

handler = node.NewHTTPHandlerStack(handler, s.graphqlServer.CorsAllowedOrigins, s.graphqlServer.Vhosts)
return handler, nil
}

Expand All @@ -102,43 +117,17 @@ func newHandler(backend ethapi.Backend) (http.Handler, error) {
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
func (s *Service) Start() error {
handler, err := s.CreateHandler()
if err != nil {
return err
}
s.graphqlServer.SetHandler(handler)

// start listening on given endpoint
listener, err := net.Listen("tcp", s.graphqlServer.Endpoint())
if err != nil {
return err
}
// make sure timeout values are meaningful
node.CheckTimeouts(&s.graphqlServer.Timeouts)
// create http server
httpSrv := &http.Server{
Handler: handler,
ReadTimeout: s.graphqlServer.Timeouts.ReadTimeout,
WriteTimeout: s.graphqlServer.Timeouts.WriteTimeout,
IdleTimeout: s.graphqlServer.Timeouts.IdleTimeout,
}
go httpSrv.Serve(listener)
log.Info("GraphQL endpoint opened", "url", fmt.Sprintf("http://%v", listener.Addr()))
// add information to graphql http server
s.graphqlServer.Server = httpSrv
s.graphqlServer.ListenerAddr = listener.Addr()
s.graphqlServer.SetHandler(handler)

return nil
}

// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
func (s *Service) Stop() error {
if s.graphqlServer.Server != nil {
s.graphqlServer.Server.Shutdown(context.Background())
log.Info("GraphQL endpoint closed", "url", fmt.Sprintf("http://%v", s.graphqlServer.ListenerAddr))
}
//if s.graphqlServer.Server != nil {
// s.graphqlServer.Server.Shutdown(context.Background())
// log.Info("GraphQL endpoint closed", "url", fmt.Sprintf("http://%v", s.graphqlServer.ListenerAddr))
//}
return nil
}

Expand Down
121 changes: 73 additions & 48 deletions node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription,
func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string, vhosts *string) (bool, error) {
api.node.lock.Lock()
defer api.node.lock.Unlock()

if api.node.http.Server != nil {
return false, fmt.Errorf("HTTP RPC already running on %v", api.node.http.ListenerAddr)
// check if HTTP server already exists
for _, httpServer := range api.node.httpServers {
if httpServer.RPCAllowed {
return false, fmt.Errorf("HTTP RPC already running on %v", httpServer.ListenerAddr)
}
}

if host == nil {
Expand All @@ -161,9 +163,7 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis
if port == nil {
port = &api.node.config.HTTPPort
}
api.node.http.host = *host
api.node.http.port = *port
api.node.http.endpoint = fmt.Sprintf("%s:%d", *host, *port)
endpoint := fmt.Sprintf("%s:%d", *host, *port)

allowedOrigins := api.node.config.HTTPCors
if cors != nil {
Expand All @@ -172,7 +172,6 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis
allowedOrigins = append(allowedOrigins, strings.TrimSpace(origin))
}
}
api.node.http.CorsAllowedOrigins = allowedOrigins

allowedVHosts := api.node.config.HTTPVirtualHosts
if vhosts != nil {
Expand All @@ -181,27 +180,37 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis
allowedVHosts = append(allowedVHosts, strings.TrimSpace(vhost))
}
}
api.node.http.Vhosts = allowedVHosts

modules := api.node.http.Whitelist
modules := api.node.config.HTTPModules
if apis != nil {
modules = nil
for _, m := range strings.Split(*apis, ",") {
modules = append(modules, strings.TrimSpace(m))
}
}
api.node.http.Whitelist = modules
// configure http server
httpServer := &HTTPServer{
host: *host,
port: *port,
endpoint: endpoint,
Srv: rpc.NewServer(),
CorsAllowedOrigins: allowedOrigins,
Vhosts: allowedVHosts,
Whitelist: modules,
}
// create handler
api.node.http.handler = NewHTTPHandlerStack(api.node.http.Srv, api.node.http.CorsAllowedOrigins, api.node.http.Vhosts)
httpServer.handler = NewHTTPHandlerStack(httpServer.Srv, httpServer.CorsAllowedOrigins, httpServer.Vhosts)
// create HTTP server
if err := api.node.CreateHTTPServer(api.node.http, false); err != nil {
if err := api.node.CreateHTTPServer(httpServer, false); err != nil {
return false, err
}
// start the HTTP server
api.node.http.Start()
api.node.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", api.node.http.ListenerAddr),
"cors", strings.Join(api.node.http.CorsAllowedOrigins, ","),
"vhosts", strings.Join(api.node.http.Vhosts, ","))
httpServer.Start()
api.node.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", httpServer.ListenerAddr),
"cors", strings.Join(httpServer.CorsAllowedOrigins, ","),
"vhosts", strings.Join(httpServer.Vhosts, ","))

api.node.RegisterHTTPServer(httpServer)
return true, nil
}

Expand All @@ -210,22 +219,25 @@ func (api *PrivateAdminAPI) StopRPC() (bool, error) {
api.node.lock.Lock()
defer api.node.lock.Unlock()

if api.node.http == nil {
return false, fmt.Errorf("HTTP RPC not running")
for _, httpServer := range api.node.httpServers {
if httpServer.RPCAllowed {
api.node.stopServer(httpServer)
return true, nil
}
}
api.node.stopHTTP()
return true, nil

return false, fmt.Errorf("HTTP RPC not running")
}

// StartWS starts the websocket RPC API server.
func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *string, apis *string) (bool, error) {
api.node.lock.Lock()
defer api.node.lock.Unlock()

if api.node.ws.Server != nil {
return false, fmt.Errorf("WebSocket RPC already running on %v", api.node.ws.ListenerAddr)
} else if api.node.http.WSAllowed {
return false, fmt.Errorf("WebSocket RPC already running on %v", api.node.http.ListenerAddr)
// check if an existing HTTP server already handles websocket
for _, httpServer := range api.node.httpServers {
if httpServer.WSAllowed {
return false, fmt.Errorf("WebSocket RPC already running on %v", httpServer.ListenerAddr)
}
}
// set host, port and endpoint
if host == nil {
Expand All @@ -238,15 +250,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
if port == nil {
port = &api.node.config.WSPort
}
api.node.ws.host = *host
api.node.ws.port = *port
api.node.ws.endpoint = fmt.Sprintf("%s:%d", *host, *port)

if api.node.ws.endpoint == api.node.http.endpoint && api.node.http.Server != nil {
api.node.http.WSAllowed = true
api.node.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", api.node.http.ListenerAddr))
return true, nil
}
endpoint := fmt.Sprintf("%s:%d", *host, *port)

origins := api.node.config.WSOrigins
if allowedOrigins != nil {
Expand All @@ -255,7 +259,14 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
origins = append(origins, strings.TrimSpace(origin))
}
}
api.node.ws.WsOrigins = origins
// check if an HTTP server exists on the given endpoint, and if so, enable websocket on that HTTP server
existingServer := api.node.ExistingHTTPServer(endpoint)
if existingServer != nil {
existingServer.WSAllowed = true
existingServer.WsOrigins = origins
api.node.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", existingServer.ListenerAddr))
return true, nil
}

modules := api.node.config.WSModules
if apis != nil {
Expand All @@ -264,15 +275,26 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
modules = append(modules, strings.TrimSpace(m))
}
}
api.node.ws.Whitelist = modules

api.node.ws.handler = api.node.ws.Srv.WebsocketHandler(api.node.ws.WsOrigins)
if err := api.node.CreateHTTPServer(api.node.ws, api.node.config.WSExposeAll); err != nil {
wsServer := &HTTPServer{
Srv: rpc.NewServer(),
endpoint: endpoint,
host: *host,
port: *port,
Whitelist: modules,
WsOrigins: origins,
WSAllowed: true,
}

wsServer.handler = wsServer.Srv.WebsocketHandler(wsServer.WsOrigins)
if err := api.node.CreateHTTPServer(wsServer, api.node.config.WSExposeAll); err != nil {
return false, err
}

api.node.ws.Start()
api.node.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", api.node.ws.ListenerAddr))
wsServer.Start()
api.node.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", wsServer.ListenerAddr))

api.node.RegisterHTTPServer(wsServer)
return true, nil
}

Expand All @@ -281,16 +303,19 @@ func (api *PrivateAdminAPI) StopWS() (bool, error) {
api.node.lock.Lock()
defer api.node.lock.Unlock()

if api.node.ws.Server == nil && !api.node.http.WSAllowed {
return false, fmt.Errorf("WebSocket RPC not running")
}
if api.node.http.WSAllowed {
api.node.http.WSAllowed = false
return true, nil
for _, httpServer := range api.node.httpServers {
if httpServer.WSAllowed {
httpServer.WSAllowed = false
// if RPC is not enabled on the WS http server, shut it down
if !httpServer.RPCAllowed {
api.node.stopServer(httpServer)
}

return true, nil
}
}

api.node.stopWS()
return true, nil
return false, fmt.Errorf("WebSocket RPC not running")
}

// PublicAdminAPI is the collection of administrative API methods exposed over
Expand Down
Loading

0 comments on commit ecde8cd

Please sign in to comment.