Skip to content

Commit

Permalink
feat: Serve gRPC and REST requests on the same external port (#489)
Browse files Browse the repository at this point in the history
This PR uses the cmux package to expose a single external port that selects incoming connections to route to the gRPC endpoint or the REST endpoint.
(ref: https://medium.com/@drgarcia1986/listen-grpc-and-http-requests-on-the-same-port-263c40cb45ff and https://github.com/soheilhy/cmux)

I define an Endpoint interface wrapping both the gRPC and the REST endpoints, as well as the cmux multiplexer that does the redirection. This infrastructure will also allow future endpoints to be easily added when we need them.

This also sets up the infrastructure for graceful shutdown upon receipt of a signal, though we are waiting for a PR in the cmux dependency for that to work fully. (The issue is described in soheilhy/cmux#69 (comment).)

Implementation/review notes:
- the gRPC server set-up code is now in `cmd/gapic-showcase/endpoint.go`. It is split between setting up the server and running it.
- the REST server (also in `endpoint.go`) is currently a stub that simply responds to `/hello` requests. It will be expanded with auto-generated handlers.
  • Loading branch information
vchudnov-g authored Oct 22, 2020
1 parent 4b94fba commit d2ce150
Show file tree
Hide file tree
Showing 4 changed files with 363 additions and 94 deletions.
327 changes: 327 additions & 0 deletions cmd/gapic-showcase/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"strings"
"sync"

"github.com/googleapis/gapic-showcase/server"
pb "github.com/googleapis/gapic-showcase/server/genproto"
"github.com/googleapis/gapic-showcase/server/services"
fallback "github.com/googleapis/grpc-fallback-go/server"
"github.com/soheilhy/cmux"
"golang.org/x/sync/errgroup"
lropb "google.golang.org/genproto/googleapis/longrunning"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
)

// RuntimeConfig has the run-time settings necessary to run the
// Showcase servers.
type RuntimeConfig struct {
port string
fallbackPort string
tlsCaCert string
tlsCert string
tlsKey string
}

// Endpoint defines common operations for any of the various types of
// transport-specific network endpoints Showcase supports
type Endpoint interface {
// Serve beings the listen-and-serve loop for this
// Endpoint. It typically blocks until the server is shut
// down. The error it returns depends on the underlying
// implementation.
Serve() error

// Shutdown causes the currently running Endpoint to
// terminate. The error it returns depends on the underlying
// implementation.
Shutdown() error
}

// CreateAllEndpoints returns an Endpoint that can serve gRPC and
// HTTP/REST connections (on config.port) and gRPC-fallback
// connections (on config.fallbackPort)
func CreateAllEndpoints(config RuntimeConfig) Endpoint {
// Ensure port is of the right form.
if !strings.HasPrefix(config.port, ":") {
config.port = ":" + config.port
}

// Start listening.
lis, err := net.Listen("tcp", config.port)
if err != nil {
log.Fatalf("Showcase failed to listen on port '%s': %v", config.port, err)
}
stdLog.Printf("Showcase listening on port: %s", config.port)

m := cmux.New(lis)
grpcListener := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpListener := m.Match(cmux.HTTP1Fast())

gRPCServer := newEndpointGRPC(grpcListener, config)
restServer := newEndpointREST(httpListener)
cmuxServer := newEndpointMux(m, gRPCServer, restServer)
return cmuxServer
}

// endpointMux is an Endpoint for cmux, the connection multiplexer
// allowing different types of connections on the same port.
//
// We choose not to use grpc.Server.ServeHTTP because it is
// experimental and does not support some gRPC features available
// through grpc.Server.Serve. (cf
// https://godoc.org/google.golang.org/grpc#Server.ServeHTTP)
type endpointMux struct {
endpoints []Endpoint
cmux cmux.CMux
mux sync.Mutex
}

func newEndpointMux(cmuxEndpoint cmux.CMux, endpoints ...Endpoint) Endpoint {
return &endpointMux{
endpoints: endpoints,
cmux: cmuxEndpoint,
}
}

func (em *endpointMux) String() string {
return "endpoint multiplexer"
}

func (em *endpointMux) Serve() error {
g := new(errgroup.Group)
for idx, endpt := range em.endpoints {
if endpt != nil {
stdLog.Printf("Starting endpoint %d: %s", idx, endpt)
endpoint := endpt
g.Go(func() error {
err := endpoint.Serve()
err2 := em.Shutdown()
if err != nil {
return err
}
return err2
})
}
}
if em.cmux != nil {
stdLog.Printf("Starting %s", em)

g.Go(func() error {
err := em.cmux.Serve()
err2 := em.Shutdown()
if err != nil {
return err
}
return err2

})
}
return g.Wait()
}

func (em *endpointMux) Shutdown() error {
em.mux.Lock()
defer em.mux.Unlock()

var err error
if em.cmux != nil {
// TODO: Wait for https://github.com/soheilhy/cmux/pull/69 (due to
// https://github.com/soheilhy/cmux/pull/69#issuecomment-712928041.)
//
// err = em.mux.Close()
em.cmux = nil
}

for idx, endpt := range em.endpoints {
if endpt != nil {
// TODO: Wait for https://github.com/soheilhy/cmux/pull/69
// newErr := endpt.Shutdown()
// if err==nil {
// err = newErr
// }
em.endpoints[idx] = nil
}
}
return err
}

// endpointGRPC is an Endpoint for gRPC connections to the Showcase
// server.
type endpointGRPC struct {
server *grpc.Server
fallbackServer *fallback.FallbackServer
listener net.Listener
mux sync.Mutex
}

func newEndpointGRPC(lis net.Listener, config RuntimeConfig) Endpoint {
// Set up server.
logger := &loggerObserver{}
observerRegistry := server.ShowcaseObserverRegistry()
observerRegistry.RegisterUnaryObserver(logger)
observerRegistry.RegisterStreamRequestObserver(logger)
observerRegistry.RegisterStreamResponseObserver(logger)

opts := []grpc.ServerOption{
grpc.StreamInterceptor(observerRegistry.StreamInterceptor),
grpc.UnaryInterceptor(observerRegistry.UnaryInterceptor),
}

// load mutual TLS cert/key and root CA cert
if config.tlsCaCert != "" && config.tlsCert != "" && config.tlsKey != "" {
keyPair, err := tls.LoadX509KeyPair(config.tlsCert, config.tlsKey)
if err != nil {
log.Fatalf("Failed to load server TLS cert/key with error:%v", err)
}

cert, err := ioutil.ReadFile(config.tlsCaCert)
if err != nil {
log.Fatalf("Failed to load root CA cert file with error:%v", err)
}

pool := x509.NewCertPool()
pool.AppendCertsFromPEM(cert)

ta := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{keyPair},
ClientCAs: pool,
ClientAuth: tls.RequireAndVerifyClientCert,
})

opts = append(opts, grpc.Creds(ta))
}
s := grpc.NewServer(opts...)

// Creates services used by both the gRPC and REST servers
echoServer := services.NewEchoServer()

// Register Services to the server.
pb.RegisterEchoServer(s, echoServer)
pb.RegisterSequenceServiceServer(s, services.NewSequenceServer())
identityServer := services.NewIdentityServer()
pb.RegisterIdentityServer(s, identityServer)
messagingServer := services.NewMessagingServer(identityServer)
pb.RegisterMessagingServer(s, messagingServer)
operationsServer := services.NewOperationsServer(messagingServer)
pb.RegisterTestingServer(s, services.NewTestingServer(observerRegistry))
lropb.RegisterOperationsServer(s, operationsServer)

fb := fallback.NewServer(config.fallbackPort, "localhost"+config.port)

// Register reflection service on gRPC server.
reflection.Register(s)
return &endpointGRPC{
server: s,
fallbackServer: fb,
listener: lis,
}
}

func (eg *endpointGRPC) String() string {
return "gRPC endpoint"
}

func (eg *endpointGRPC) Serve() error {
defer eg.Shutdown()
if eg.fallbackServer != nil {
stdLog.Printf("Listening for gRPC-fallback connections")
eg.fallbackServer.StartBackground()
}
if eg.server != nil {
stdLog.Printf("Listening for gRPC connections")
return eg.server.Serve(eg.listener)
}
return fmt.Errorf("gRPC server not set up")
}

func (eg *endpointGRPC) Shutdown() error {
eg.mux.Lock()
defer eg.mux.Unlock()

if eg.fallbackServer != nil {
stdLog.Printf("Stopping gRPC-fallback connections")
eg.fallbackServer.Shutdown()
eg.fallbackServer = nil
}

if eg.server != nil {
stdLog.Printf("Stopping gRPC connections")
eg.server.GracefulStop()
eg.server = nil
}
stdLog.Printf("Stopped gRPC")
return nil
}

// endpointREST is an Endpoint for HTTP/REST connections to the Showcase
// server.
type endpointREST struct {
server *http.Server
listener net.Listener
mux sync.Mutex
}

func newEndpointREST(lis net.Listener) Endpoint {
mux := http.NewServeMux()
mux.HandleFunc("/hello", func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte("GAPIC Showcase: HTTP/REST endpoint\n"))
})

return &endpointREST{
server: &http.Server{Handler: mux},
listener: lis,
}
}

func (er *endpointREST) String() string {
return "HTTP/REST endpoint"
}

func (er *endpointREST) Serve() error {
defer er.Shutdown()
if er.server != nil {
stdLog.Printf("Listening for REST connections")
return er.server.Serve(er.listener)
}
return fmt.Errorf("REST server not set up")
}

func (er *endpointREST) Shutdown() error {
er.mux.Lock()
defer er.mux.Unlock()
var err error
if er.server != nil {
stdLog.Printf("Stopping REST connections")
err = er.server.Shutdown(context.Background())
er.server = nil
}
stdLog.Printf("Stopped REST")
return err
}
Loading

0 comments on commit d2ce150

Please sign in to comment.