Skip to content

Commit

Permalink
cluser api: json rpc reverse proxy for grpc apis
Browse files Browse the repository at this point in the history
fixes: #252
refs: #254

Replace HTTP API with auto-generated API from GRPC API
  • Loading branch information
aastein authored Jun 28, 2018
1 parent cb48a13 commit dd5f759
Show file tree
Hide file tree
Showing 21 changed files with 3,385 additions and 473 deletions.
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ deps-install:
devdeps-install:
go get github.com/gogo/protobuf/protoc-gen-gogo
go get github.com/vektra/mockery/.../
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger

coverdeps-install:
go get golang.org/x/tools/cmd/cover
Expand All @@ -82,7 +84,16 @@ kubetypes:
%.pb.go: %.proto
protoc -I. \
-Ivendor -Ivendor/github.com/gogo/protobuf/protobuf \
-Ivendor/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
--gogo_out=plugins=grpc:. $<
protoc -I. \
-Ivendor -Ivendor/github.com/gogo/protobuf/protobuf \
-Ivendor/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
--grpc-gateway_out=logtostderr=true:. $<
protoc -I. \
-Ivendor -Ivendor/github.com/gogo/protobuf/protobuf \
-Ivendor/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
--swagger_out=logtostderr=true:. $<

mocks:
mockery -case=underscore -dir query -output query/mocks -name Client
Expand Down
2 changes: 1 addition & 1 deletion cmd/akash/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ovrclk/akash/keys"
"github.com/ovrclk/akash/manifest"
"github.com/ovrclk/akash/marketplace"
http "github.com/ovrclk/akash/provider/http"
"github.com/ovrclk/akash/provider/http"
"github.com/ovrclk/akash/sdl"
"github.com/ovrclk/akash/types"
. "github.com/ovrclk/akash/util"
Expand Down
10 changes: 8 additions & 2 deletions cmd/akash/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"github.com/ovrclk/akash/provider/cluster"
"github.com/ovrclk/akash/provider/cluster/kube"
"github.com/ovrclk/akash/provider/event"
"github.com/ovrclk/akash/provider/http"
"github.com/ovrclk/akash/provider/grpc"
"github.com/ovrclk/akash/provider/grpc/json"
psession "github.com/ovrclk/akash/provider/session"
"github.com/ovrclk/akash/types"
ptype "github.com/ovrclk/akash/types/provider"
Expand Down Expand Up @@ -198,7 +199,12 @@ func doProviderRunCommand(session session.Session, cmd *cobra.Command, args []st

go func() {
defer cancel()
errch <- http.RunServer(ctx, session.Log(), "3001", service.ManifestHandler(), cclient)
errch <- grpc.RunServer(ctx, session.Log(), "tcp", "9090", service.ManifestHandler(), cclient)
}()

go func() {
defer cancel()
errch <- json.Run(ctx, session.Log(), ":3001", "localhost:9090")
}()

var reterr error
Expand Down
2 changes: 2 additions & 0 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ import:
- package: github.com/juju/errors
- package: k8s.io/code-generator
- package: k8s.io/gengo
- package: github.com/hashicorp/golang-lru
- package: github.com/hashicorp/golang-lru
- package: github.com/grpc-ecosystem/grpc-gateway
version: ^1.4.1
49 changes: 49 additions & 0 deletions grpc/requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package grpc

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
)

func Marshal(obj proto.Message) ([]byte, error) {
buf := bytes.Buffer{}
marshaler := jsonpb.Marshaler{}
if err := marshaler.Marshal(&buf, obj); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// func VerifySignature(request *types.GRPCRequest) (crypto.Address, error) {
// buf := bytes.Buffer{}
// marshaler := jsonpb.Marshaler{}
// // switch v := request.Payload.(type) {
// // case *types.GRPCRequest_ManifestRequest:
// // if err := marshaler.Marshal(&buf, v.ManifestRequest); err != nil {
// // return nil, err
// // }
// // default:
// // return nil, types.ErrInvalidPayload{Message: "invalid payload"}
// // }

// if err := marshaler.Marshal(&buf, request.ManifestRequest); err != nil {
// return nil, err
// }

// key, err := crypto.PubKeyFromBytes(request.Key)
// if err != nil {
// return nil, err
// }

// sig, err := crypto.SignatureFromBytes(request.Signature)
// if err != nil {
// return nil, err
// }

// if !key.VerifyBytes(buf.Bytes(), sig) {
// return nil, types.ErrInvalidSignature{"invalud signature"}
// }
// return key.Address(), err
// }
33 changes: 33 additions & 0 deletions provider/grpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package grpc

import (
"github.com/ovrclk/akash/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

type client struct {
target string
}

type Client interface {
Deploy(context.Context, *types.ManifestRequest) (*types.DeployRespone, error)
}

func NewClient(target string) (Client, error) {
return &client{target: target}, nil
}

func (c *client) Deploy(ctx context.Context, manifestRequest *types.ManifestRequest) (*types.DeployRespone, error) {
conn, err := grpc.Dial(c.target, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer conn.Close()
client := types.NewClusterClient(conn)
resp, err := client.Deploy(ctx, manifestRequest)
if err != nil {
return nil, err
}
return resp, nil
}
53 changes: 53 additions & 0 deletions provider/grpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package grpc

import (
"context"
"os"
"testing"
"time"

"github.com/ovrclk/akash/manifest"
kmocks "github.com/ovrclk/akash/provider/cluster/kube/mocks"
"github.com/ovrclk/akash/provider/manifest/mocks"
"github.com/ovrclk/akash/sdl"
"github.com/ovrclk/akash/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/tendermint/tmlibs/log"
)

func TestSendManifest(t *testing.T) {
c, err := NewClient("localhost:3001")
assert.NoError(t, err)

sdl, err := sdl.ReadFile("../../_docs/deployment.yml")
require.NoError(t, err)

mani, err := sdl.Manifest()
require.NoError(t, err)

_, kmgr := testutil.NewNamedKey(t)
signer := testutil.Signer(t, kmgr)

deployment := testutil.DeploymentAddress(t)

req, _, err := manifest.SignManifest(mani, signer, deployment)
assert.NoError(t, err)

handler := &mocks.Handler{}
handler.On("HandleManifest", mock.Anything).Return(nil)

client := &kmocks.Client{}

server := newServer(log.NewTMLogger(os.Stdout), "tcp", ":3001", handler, client)
go func() {
err := server.listenAndServe()
require.NoError(t, err)
}()

time.Sleep(1 * time.Second)

_, err = c.Deploy(context.TODO(), req)
assert.NoError(t, err)
}
63 changes: 63 additions & 0 deletions provider/grpc/json/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package json

import (
"net/http"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/ovrclk/akash/types"
"github.com/tendermint/tmlibs/log"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

type proxy struct {
endpoint string
addr string
log log.Logger
mux *runtime.ServeMux
}

func new(ctx context.Context, log log.Logger, addr, endpoint string) (*proxy, error) {
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := types.RegisterClusterHandlerFromEndpoint(ctx, mux, endpoint, opts)
if err != nil {
return nil, err
}
return &proxy{
endpoint: endpoint,
addr: addr,
log: log,
mux: mux,
}, nil
}

func (p *proxy) listenAndServe() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := types.RegisterClusterHandlerFromEndpoint(ctx, mux, p.endpoint, opts)
if err != nil {
return err
}
return http.ListenAndServe(p.addr, mux)
}

func Run(ctx context.Context, log log.Logger, address, endpoint string) error {
ctx, cancel := context.WithCancel(ctx)
proxy, err := new(ctx, log, address, endpoint)
if err != nil {
return err
}

log.Info("Starting GRPC JSON proxy server", "address", proxy.addr, "endpoint", proxy.endpoint)
err = proxy.listenAndServe()
cancel()

log.Info("GRPC server shutdown")

return err
}
Loading

0 comments on commit dd5f759

Please sign in to comment.