Skip to content

Commit

Permalink
grpc based transport
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Feb 5, 2025
1 parent ee3e8d9 commit f7e191b
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 0 deletions.
140 changes: 140 additions & 0 deletions xds/internal/clients/grpctransport/grpc_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package grpctransport provides an implementation of the
// [clients.TransportBuilder] interface using gRPC.
package grpctransport

import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/xds/internal/clients"
)

// ServerConfigExtension extends the [clients.ServerConfig] for the gRPC-based
// transport builder. Any implementation of this must implement ServerConfig()
// method.
type ServerConfigExtension interface {
ServerConfig() *ServerConfig
}

// ServerConfig holds the settings for connecting to an xDS management server
// using gRPC.
type ServerConfig struct {
// Credentials is the credential bundle containing the gRPC credentials for
// connecting to the xDS management server.
Credentials credentials.Bundle
}

// ServerConfig returns the ServerConfig itself. This method is designed
// to satisfy [ServerConfigExtension] interface requirement.
func (s *ServerConfig) ServerConfig() *ServerConfig {
return s
}

// Builder provides a way to build a gRPC-based transport to an xDS management
// server.
type Builder struct{}

// Build creates a new gRPC-based transport to an xDS management server using
// the provided [clients.ServerConfig]. This involves creating a
// grpc.ClientConn to the server using the provided credentials and server URI.
//
// If any of ServerURI or Extensions of `sc` are not present, Build() will return
// an error.
func (b *Builder) Build(sc clients.ServerConfig) (clients.Transport, error) {
if sc.ServerURI == "" {
return nil, fmt.Errorf("xds: ServerConfig's ServerURI field cannot be empty")
}
if sc.Extensions == nil {
return nil, fmt.Errorf("xds: ServerConfig's Extensions field cannot be nil for gRPC transport")
}
gtsce, ok := sc.Extensions.(ServerConfigExtension)
if !ok {
return nil, fmt.Errorf("xds: ServerConfig's Extensions field cannot be anything other than grpctransport.ServerConfigExtension for gRPC transport")
}
gtsc := gtsce.ServerConfig()
if gtsc.Credentials == nil {
return nil, fmt.Errorf("xsd: ServerConfigExtensions's Credentials field cannot be nil for gRPC transport")
}

// TODO: The `build()` method currently creates a new grpc channel every
// time. However, in future we will incorporate reference count map for
// existing transports and deduplicate transports based on server URI and
// credentials so that transport channel to same server can be shared
// between xDS and LRS client.

// Dial the xDS management server with the provided credentials, server URI,
// and a static keepalive configuration that is common across gRPC language
// implementations.
kpCfg := grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
Timeout: 20 * time.Second,
})
cc, err := grpc.NewClient(sc.ServerURI, kpCfg, grpc.WithCredentialsBundle(gtsc.Credentials))
if err != nil {
return nil, fmt.Errorf("error creating grpc client for server uri %s, %v", sc.ServerURI, err)
}
cc.Connect()

return &grpcTransport{cc: cc}, nil
}

type grpcTransport struct {
cc *grpc.ClientConn
}

// NewStream creates a new gRPC stream to the xDS management server for the
// specified method. The returned Stream interface can be used to send and
// receive messages on the stream.
func (g *grpcTransport) NewStream(ctx context.Context, method string) (clients.Stream, error) {
s, err := g.cc.NewStream(ctx, &grpc.StreamDesc{StreamName: method, ClientStreams: true, ServerStreams: true}, method)
if err != nil {
return nil, err
}
return &stream{stream: s}, nil
}

type stream struct {
stream grpc.ClientStream
}

// Send sends a message to the xDS management server.
func (s *stream) Send(msg []byte) error {
return s.stream.SendMsg(msg)
}

// Recv receives a message from the xDS management server.
func (s *stream) Recv() ([]byte, error) {
var typedRes []byte
err := s.stream.RecvMsg(&typedRes)
if err != nil {
return typedRes, err
}
return typedRes, nil
}

// Close closes the gRPC stream to the xDS management server.
func (g *grpcTransport) Close() error {
return g.cc.Close()
}
105 changes: 105 additions & 0 deletions xds/internal/clients/grpctransport/grpc_transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpctransport

import (
"testing"

"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/xds/internal/clients"
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestBuild verifies that the grpctransport.Builder creates a new
// grpc.ClientConn every time Build() is called.
//
// It covers the following scenarios:
// - ServerURI is empty.
// - Extensions is nil.
// - Extensions is not ServerConfigExtension.
// - Credentials are nil.
// - Success cases.
func (s) TestBuild(t *testing.T) {
tests := []struct {
name string
serverCfg clients.ServerConfig
wantErr bool
}{
{
name: "ServerURI_empty",
serverCfg: clients.ServerConfig{
ServerURI: "",
Extensions: &ServerConfig{Credentials: insecure.NewBundle()},
},
wantErr: true,
},
{
name: "Extensions_nil",
serverCfg: clients.ServerConfig{ServerURI: "server-address"},
wantErr: true,
},
{
name: "Extensions_not_ServerConfigExtension",
serverCfg: clients.ServerConfig{
ServerURI: "server-address",
Extensions: 1,
},
wantErr: true,
},
{
name: "ServerConfigExtension_Credentials_nil",
serverCfg: clients.ServerConfig{
ServerURI: "server-address",
Extensions: &ServerConfig{},
},
wantErr: true,
},
{
name: "success",
serverCfg: clients.ServerConfig{
ServerURI: "server-address",
Extensions: &ServerConfig{Credentials: insecure.NewBundle()},
},
wantErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
b := &Builder{}
tr, err := b.Build(test.serverCfg)
if (err != nil) != test.wantErr {
t.Fatalf("Build() error = %v, wantErr %v", err, test.wantErr)
}
if tr != nil {
defer tr.Close()
}
if !test.wantErr && tr == nil {
t.Fatalf("got non-nil transport from Build(), want nil")
}
})
}
}

0 comments on commit f7e191b

Please sign in to comment.