Skip to content

Commit

Permalink
SSH request tracing
Browse files Browse the repository at this point in the history
Add tracing support for ssh global requests and  channels. Wrappers
for `ssh.Client`, `ssh.Channel`, and `ssh.NewChannel` provide a
mechanism for tracing context to be propagated via a `context.Context`.

In order to maintain backwards compatibility the ssh.Client wrapper
tries to open a TracingChannel when constructed. Any servers that
don't support tracing will reject the unknown channel. The client
will only provide tracing context to servers which do NOT reject
the TracingChannel request.

In order to include pass tracing context along all ssh payloads
are wrapped in an Envelope that includes the original payload
AND any tracing context. Servers now try to unmarshal all payloads
into said Envelope when processing messages. If there is an Envelope
provided, a new span will be created and the original payload will
be pass along to handlers.

Part of #12241
  • Loading branch information
rosstimothy committed Jul 6, 2022
1 parent ab8ffb2 commit e404084
Show file tree
Hide file tree
Showing 16 changed files with 1,348 additions and 215 deletions.
73 changes: 73 additions & 0 deletions api/observability/tracing/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tracing

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
)

// Option applies an option value for a Config.
type Option interface {
apply(*Config)
}

// Config stores tracing related properties to customize
// creating Tracers and extracting TraceContext
type Config struct {
TracerProvider oteltrace.TracerProvider
TextMapPropagator propagation.TextMapPropagator
}

// NewConfig returns a Config configured with all the passed Option.
func NewConfig(opts []Option) *Config {
c := &Config{
TracerProvider: otel.GetTracerProvider(),
TextMapPropagator: otel.GetTextMapPropagator(),
}
for _, o := range opts {
o.apply(c)
}
return c
}

type tracerProviderOption struct{ tp oteltrace.TracerProvider }

func (o tracerProviderOption) apply(c *Config) {
if o.tp != nil {
c.TracerProvider = o.tp
}
}

// WithTracerProvider returns an Option to use the trace.TracerProvider when
// creating a trace.Tracer.
func WithTracerProvider(tp oteltrace.TracerProvider) Option {
return tracerProviderOption{tp: tp}
}

type propagatorOption struct{ p propagation.TextMapPropagator }

func (o propagatorOption) apply(c *Config) {
if o.p != nil {
c.TextMapPropagator = o.p
}
}

// WithTextMapPropagator returns an Option to use the propagation.TextMapPropagator when extracting
// and injecting trace context.
func WithTextMapPropagator(p propagation.TextMapPropagator) Option {
return propagatorOption{p: p}
}
106 changes: 106 additions & 0 deletions api/observability/tracing/ssh/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2022 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ssh

import (
"context"
"encoding/json"
"fmt"

"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/crypto/ssh"

"github.com/gravitational/teleport/api/observability/tracing"
)

// Channel is a wrapper around ssh.Channel that adds tracing support.
type Channel struct {
ssh.Channel
tracingSupported bool
opts []tracing.Option
}

// NewTraceChannel creates a new Channel.
func NewTraceChannel(ch ssh.Channel, opts ...tracing.Option) *Channel {
return &Channel{
Channel: ch,
opts: opts,
}
}

// SendRequest sends a global request, and returns the
// reply. If tracing is enabled, the provided payload
// is wrapped in an Envelope to forward any tracing context.
func (c *Channel) SendRequest(ctx context.Context, name string, wantReply bool, payload []byte) (bool, error) {
config := tracing.NewConfig(c.opts)
tracer := config.TracerProvider.Tracer(instrumentationName)

ctx, span := tracer.Start(
ctx,
fmt.Sprintf("ssh.ChannelRequest/%s", name),
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(
semconv.RPCServiceKey.String("ssh.Channel"),
semconv.RPCMethodKey.String("SendRequest"),
semconv.RPCSystemKey.String("ssh"),
),
)
defer span.End()

ok, err := c.Channel.SendRequest(name, wantReply, wrapPayload(ctx, c.tracingSupported, config.TextMapPropagator, payload))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

return ok, err
}

// NewChannel is a wrapper around ssh.NewChannel that allows an
// Envelope to be provided to new channels.
type NewChannel struct {
ssh.NewChannel
Envelope Envelope
}

// NewTraceNewChannel wraps the ssh.NewChannel in a new NewChannel
//
// The provided ssh.NewChannel will have any Envelope provided
// via ExtraData extracted so that the original payload can be
// provided to callers of NewCh.ExtraData.
func NewTraceNewChannel(nch ssh.NewChannel) *NewChannel {
ch := &NewChannel{
NewChannel: nch,
}

data := nch.ExtraData()

var envelope Envelope
if err := json.Unmarshal(data, &envelope); err == nil {
ch.Envelope = envelope
} else {
ch.Envelope.Payload = data
}

return ch
}

// ExtraData returns the arbitrary payload for this channel, as supplied
// by the client. This data is specific to the channel type.
func (n NewChannel) ExtraData() []byte {
return n.Envelope.Payload
}
212 changes: 212 additions & 0 deletions api/observability/tracing/ssh/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright 2022 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ssh

import (
"context"
"errors"
"fmt"
"io"
"net"

"github.com/gravitational/trace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/crypto/ssh"

"github.com/gravitational/teleport/api/observability/tracing"
)

// Client is a wrapper around ssh.Client that adds tracing support.
type Client struct {
*ssh.Client
opts []tracing.Option
tracingSupported bool
rejectedError error
}

// NewClient creates a new Client.
//
// The server being connected to is probed to determine if it supports
// ssh tracing. This is done by attempting to open a TracingChannel channel.
// If the channel is successfully opened then all payloads delivered to the
// server will be wrapped in an Envelope with tracing context. All Session
// and Channel created from the returned Client will honor the clients view
// of whether they should provide tracing context.
//
// Note: a channel is used instead of a global request in order prevent blocking
// forever in the event that the connection is rejected. In that case, the server
// doesn't service any global requests and writes the error to the first opened
// channel.
func NewClient(c ssh.Conn, chans <-chan ssh.NewChannel, reqs <-chan *ssh.Request, opts ...tracing.Option) *Client {
clt := &Client{
Client: ssh.NewClient(c, chans, reqs),
opts: opts,
}

// Check if the server supports tracing
ch, _, err := clt.Client.OpenChannel(TracingChannel, nil)
if err != nil {
var openError *ssh.OpenChannelError
if errors.As(err, &openError) {
switch openError.Reason {
case ssh.Prohibited:
// prohibited errors due to locks and session control are expected by callers of NewSession
clt.rejectedError = err
default:
}

return clt
}

return clt
}

_ = ch.Close()
clt.tracingSupported = true

return clt
}

// DialContext initiates a connection to the addr from the remote host.
// The resulting connection has a zero LocalAddr() and RemoteAddr().
func (c *Client) DialContext(ctx context.Context, n, addr string) (net.Conn, error) {
tracer := tracing.NewConfig(c.opts).TracerProvider.Tracer(instrumentationName)

_, span := tracer.Start(
ctx,
"ssh.DialContext",
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(
append(
peerAttr(c.Conn.RemoteAddr()),
attribute.String("network", n),
attribute.String("address", addr),
semconv.RPCServiceKey.String("ssh.Client"),
semconv.RPCMethodKey.String("Dial"),
semconv.RPCSystemKey.String("ssh"),
)...,
),
)
defer span.End()

conn, err := c.Client.Dial(n, addr)
if err != nil {
return nil, trace.Wrap(err)
}

return conn, nil
}

// SendRequest sends a global request, and returns the
// reply. If tracing is enabled, the provided payload
// is wrapped in an Envelope to forward any tracing context.
func (c *Client) SendRequest(ctx context.Context, name string, wantReply bool, payload []byte) (bool, []byte, error) {
config := tracing.NewConfig(c.opts)
tracer := config.TracerProvider.Tracer(instrumentationName)

ctx, span := tracer.Start(
ctx,
fmt.Sprintf("ssh.GlobalRequest/%s", name),
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(
append(
peerAttr(c.Conn.RemoteAddr()),
attribute.Bool("want_reply", wantReply),
semconv.RPCServiceKey.String("ssh.Client"),
semconv.RPCMethodKey.String("SendRequest"),
semconv.RPCSystemKey.String("ssh"),
)...,
),
)
defer span.End()

ok, resp, err := c.Client.SendRequest(name, wantReply, wrapPayload(ctx, c.tracingSupported, config.TextMapPropagator, payload))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

return ok, resp, err
}

// OpenChannel tries to open a channel. If tracing is enabled,
// the provided payload is wrapped in an Envelope to forward
// any tracing context.
func (c *Client) OpenChannel(ctx context.Context, name string, data []byte) (*Channel, <-chan *ssh.Request, error) {
config := tracing.NewConfig(c.opts)
tracer := config.TracerProvider.Tracer(instrumentationName)
ctx, span := tracer.Start(
ctx,
fmt.Sprintf("ssh.OpenChannel/%s", name),
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(
append(
peerAttr(c.Conn.RemoteAddr()),
semconv.RPCServiceKey.String("ssh.Client"),
semconv.RPCMethodKey.String("OpenChannel"),
semconv.RPCSystemKey.String("ssh"),
)...,
),
)
defer span.End()

ch, reqs, err := c.Client.OpenChannel(name, wrapPayload(ctx, c.tracingSupported, config.TextMapPropagator, data))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

return &Channel{
Channel: ch,
opts: c.opts,
}, reqs, err
}

// NewSession creates a new SSH session that is passed tracing context
// so that spans may be correlated properly over the ssh connection.
func (c *Client) NewSession(ctx context.Context) (*ssh.Session, error) {
tracer := tracing.NewConfig(c.opts).TracerProvider.Tracer(instrumentationName)

_, span := tracer.Start(
ctx,
"ssh.NewSession",
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(
append(
peerAttr(c.Conn.RemoteAddr()),
semconv.RPCServiceKey.String("ssh.Client"),
semconv.RPCMethodKey.String("NewSession"),
semconv.RPCSystemKey.String("ssh"),
)...,
),
)
defer span.End()

session, err := c.Client.NewSession()
// An EOF here means that the server closed our connection. In
// the event that the rejectedError was populated when opening the
// TracingChannel channel, the connection was prohibited due to a lock or
// session control. Callers to NewSession are expecting to receive
// the reason the session was rejected, so we need to propagate the
// rejectedError here instead of returning the EOF.
if errors.Is(err, io.EOF) && c.rejectedError != nil {
return nil, trace.Wrap(c.rejectedError)
}

return session, trace.Wrap(err)
}
Loading

0 comments on commit e404084

Please sign in to comment.