Skip to content

Commit

Permalink
Merge pull request #21 from kenriortega/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
kenriortega authored Sep 24, 2021
2 parents 7188f7c + a542f46 commit c180179
Show file tree
Hide file tree
Showing 16 changed files with 1,442 additions and 23 deletions.
129 changes: 129 additions & 0 deletions cmd/cli/grpcxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package cli

import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"strings"
"time"

"github.com/kenriortega/ngonx/pkg/logger"
"github.com/spf13/cobra"
"github.com/talos-systems/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var director proxy.StreamDirector
var grpcCmd = &cobra.Command{
Use: "grpc",
Short: "Run ngonx as a grpc proxy",
Run: func(cmd *cobra.Command, args []string) {
var opts []grpc.ServerOption

lis, err := net.Listen("tcp", configFromYaml.GrpcProxy.Listener)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

logger.LogInfo(fmt.Sprintf("Proxy running at %q\n", configFromYaml.GrpcProxy.Listener))
simpleBackendGen := func(hostname string) proxy.Backend {
return &proxy.SingleBackend{
GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)

outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
if configFromYaml.GrpcSSL.Enable {
creds, sslErr := credentials.NewClientTLSFromFile(
configFromYaml.GrpcClientCert, "")
if sslErr != nil {
log.Fatalf("Failed to parse credentials: %v", sslErr)
}
conn, err := grpc.DialContext(
ctx,
hostname,
grpc.WithTransportCredentials(creds),
grpc.WithCodec(proxy.Codec()),
) //nolint: staticcheck
return outCtx, conn, err
}
conn, err := grpc.DialContext(
ctx,
hostname,
grpc.WithInsecure(),
grpc.WithCodec(proxy.Codec()),
) //nolint: staticcheck

return outCtx, conn, err
},
}
}

director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
for _, bkd := range configFromYaml.GrpcEndpoints {
// Make sure we never forward internal services.
if !strings.HasPrefix(fullMethodName, bkd.Name) {
return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[":authority"]; exists {
return proxy.One2One, []proxy.Backend{
simpleBackendGen(bkd.HostURI),
}, nil
}
}
}
return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
opts = append(opts,
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)
// SSL
if configFromYaml.GrpcSSL.Enable {

creds, sslErr := credentials.NewServerTLSFromFile(
configFromYaml.GrpcSSL.CrtFile,
configFromYaml.GrpcSSL.KeyFile,
)
if sslErr != nil {
log.Fatalf("Failed to parse credentials: %v", sslErr)
return
}
opts = append(opts, grpc.Creds(creds))
}

server := grpc.NewServer(opts...)

go gracefulShutdown(server)
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
},
}

func init() {

rootCmd.AddCommand(grpcCmd)
}

func gracefulShutdown(server *grpc.Server) {
quit := make(chan os.Signal, 1)

signal.Notify(quit, os.Interrupt)
sig := <-quit
logger.LogInfo(fmt.Sprintf("server is shutting down %s", sig.String()))

_, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

server.GracefulStop()
logger.LogInfo("server stopped")
}
39 changes: 35 additions & 4 deletions cmd/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package cli

import (
"net/http"
"net/url"
"os"
"time"

"github.com/gorilla/mux"
domain "github.com/kenriortega/ngonx/internal/mngt/domain"
handlers "github.com/kenriortega/ngonx/internal/mngt/handlers"
services "github.com/kenriortega/ngonx/internal/mngt/services"
"github.com/kenriortega/ngonx/pkg/config"
"github.com/kenriortega/ngonx/pkg/healthcheck"
"github.com/kenriortega/ngonx/pkg/httpsrv"
"github.com/kenriortega/ngonx/pkg/logger"
"github.com/spf13/cobra"
Expand All @@ -27,7 +30,6 @@ func Execute() {
logger.LogError(err.Error())
os.Exit(1)
}
go StartMngt(configFromYaml)

}
func init() {
Expand All @@ -43,6 +45,8 @@ func initConfig() {
if errConfig != nil {
logger.LogError("Yaml file not found please run command setup " + errConfig.Error())
}
go StartMngt(configFromYaml)

}

func StartMngt(config config.Config) {
Expand All @@ -54,11 +58,12 @@ func StartMngt(config config.Config) {
// populate data from config file list of services

for _, endpoints := range config.ProxyGateway.EnpointsProxy {
for _, endpoint := range endpoints.Endpoints {
hostUri := endpoints.HostURI
for _, it := range endpoints.Endpoints {
endpointMap := make(map[string]interface{})
endpointMap["path_url"] = endpoint.PathEndpoint
endpointMap["path_url"] = hostUri + it.PathEndpoint
endpointMap["status"] = "down"
mh.RegisterEnpoint(endpointMap)
mh.RegisterEndpoint(endpointMap)
}
}
// Routes...
Expand All @@ -70,5 +75,31 @@ func StartMngt(config config.Config) {
port,
r,
)

go func() {
t := time.NewTicker(time.Second * 30)
for range t.C {
logger.LogInfo("Starting health check...")
endpoints, err := service.ListEndpoints()
if err != nil {
logger.LogError(err.Error())
}
for _, it := range endpoints {
u, err := url.Parse(it.PathUrl)
if err != nil {
logger.LogError(err.Error())
}
status := healthcheck.IsBackendAlive(u)
if status {
it.Status = "up"
} else {
it.Status = "down"
}
mh.UpdateEndpoint(it)
}
logger.LogInfo("Health check completed")
}
}()

server.Start()
}
154 changes: 154 additions & 0 deletions examples/calculator/calc_client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"context"
"fmt"
"io"
"log"
"time"

"github.com/kenriortega/ngonx/examples/calculator/calculatorpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// #nosec
func main() {
fmt.Println("Client calculator")

cc, err := grpc.DialContext(context.TODO(), "localhost:50000", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}

defer cc.Close()

c := calculatorpb.NewCalculatorServiceClient(cc)

doUnary(c)
doStreaming(c)
doClientStreaming(c)
doBiDiStreaming(c)
doSqrt(c)
}

// #nosec
func doSqrt(c calculatorpb.CalculatorServiceClient) {

num := -12
// correct call
res, err := c.SquareRoot(context.Background(), &calculatorpb.SquareRootRequest{Number: int32(num)})
if err != nil {
respErr, ok := status.FromError(err)
if ok {
fmt.Println(respErr.Code())
fmt.Println(respErr.Message())
if respErr.Code() == codes.InvalidArgument {
fmt.Println("We probably sent a negative number")
return
}
} else {
log.Fatalf("Big error calling SquareRoot: %v", respErr)
return
}
}
fmt.Println(res.GetNumberRoot())
}

// #nosec
func doUnary(c calculatorpb.CalculatorServiceClient) {
req := &calculatorpb.SumRequest{
FirstNumber: 5,
SecondNumber: 40,
}
res, err := c.Sum(context.Background(), req)
if err != nil {
log.Fatalf("error while calling sum rpc: %v", err)
}
log.Println("response: ", res)
}

// #nosec
func doStreaming(c calculatorpb.CalculatorServiceClient) {
req := &calculatorpb.PrimeNumberDecompositionRequest{
Number: 12,
}

stream, err := c.PrimeNumberDecomposition(context.Background(), req)
if err != nil {
log.Fatalf("error while calling PrimeNumberDecomposition %v", err)
}

for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error: %v", err)
}

log.Printf("Recv msg %v\n", res.GetPrimerFactor())
}
}

// #nosec
func doClientStreaming(c calculatorpb.CalculatorServiceClient) {
stream, err := c.ComputeAverage(context.Background())
if err != nil {
log.Fatalf("err %v", err)
}

numbers := []int32{1, 23, 4, 5, 6, 677, 2}

for _, number := range numbers {
stream.Send(&calculatorpb.ComputeAverageRequest{
Number: number,
})
}

res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("err: %v", err)
}
fmt.Println("Result ", res.GetAverage())
}

// #nosec
func doBiDiStreaming(c calculatorpb.CalculatorServiceClient) {

stream, err := c.FindMaximun(context.Background())
if err != nil {
log.Fatalf("err %v", err)
}

waitc := make(chan struct{})
go func() {
numbers := []int32{3, 4, 55, 67, 8, 23}
for _, v := range numbers {
stream.Send(&calculatorpb.FindMaximunRequest{
Number: v,
})
time.Sleep(1000 * time.Millisecond)
}
stream.CloseSend()
}()

go func() {
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("err: %v", err)
break
}
maximun := res.GetMaximun()
fmt.Println(maximun)
}
close(waitc)
}()
<-waitc
}
Loading

0 comments on commit c180179

Please sign in to comment.