Skip to content

Commit

Permalink
Merge #85031 #85131
Browse files Browse the repository at this point in the history
85031: sqlproxyccl: track which side broke the connection r=JeffSwenson a=JeffSwenson

Previously, the sqlproxy considered an error when copying from the
client->server as a client error and an error when copying from the
server->client as a server error. This logic is incorrect as either side
could be the source of the broken connection.

Now, a wrapper is applied to the connection, and the errors returned by
the wrapper are used to properly account for which side broke the
connection.

Release note: None

85131: sql: add columns to node_execution_insights r=matthewtodd a=j82w

This commit adds the following new columns
to crdb_internal.node_execution_insights table.

```
txn_id                     UUID NOT NULL,
txn_fingerprint_id         BYTES NOT NULL,
query                      STRING NOT NULL,
status                     STRING NOT NULL,
start_time                 TIMESTAMP NOT NULL,
end_time                   TIMESTAMP NOT NULL,
full_scan                  BOOL NOT NULL,
user_name                  STRING NOT NULL,
application_name           STRING NOT NULL,
database_name              STRING NOT NULL,
plan_gist                  STRING NOT NULL,
rows_read                  INT8 NOT NULL,
rows_written               INT8 NOT NULL,
priority                   FLOAT NOT NULL,
retries                    INT8 NOT NULL
```

Part of #81024

Release note (sql change): Adding txn_id,
  txn_fingerprint_id, query, status, start_time,
  end_time, full_scan, user_name, application_name,
  database_name, plan_gist, rows_read, rows_written,
  priority, and retries columns to
  crdb_internal.node_execution_insights

Co-authored-by: Jeff <[email protected]>
Co-authored-by: j82w <[email protected]>
  • Loading branch information
3 people committed Jul 29, 2022
3 parents 22d2ba2 + 11106af + 1b970af commit 933df73
Show file tree
Hide file tree
Showing 21 changed files with 489 additions and 36 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"conn_migration.go",
"connector.go",
"error.go",
"error_source.go",
"forwarder.go",
"frontend_admitter.go",
"metrics.go",
Expand Down Expand Up @@ -61,9 +62,11 @@ go_test(
"backend_dialer_test.go",
"conn_migration_test.go",
"connector_test.go",
"error_source_test.go",
"forwarder_test.go",
"frontend_admitter_test.go",
"main_test.go",
"metrics_test.go",
"proxy_handler_test.go",
"server_test.go",
],
Expand Down
16 changes: 14 additions & 2 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,22 @@ func (c *connector) dialSQLServer(
return nil, err
}

return &onConnectionClose{
// Add a connection wrapper that annotates errors as belonging to the sql
// server.
conn = &errorSourceConn{
Conn: conn,
readErrMarker: errServerRead,
writeErrMarker: errServerWrite,
}

// Add a connection wrapper that lets the balancer know the connection is
// closed.
conn = &onConnectionClose{
Conn: conn,
closerFn: serverAssignment.Close,
}, nil
}

return conn, nil
}

// onConnectionClose is a net.Conn wrapper to ensure that our custom closerFn
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,11 @@ func TestConnector_dialSQLServer(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

wrappedConn, ok := conn.(*onConnectionClose)
onCloseWrapper, ok := conn.(*onConnectionClose)
require.True(t, ok)
require.Equal(t, crdbConn, wrappedConn.Conn)
onErrorWrapper, ok := onCloseWrapper.Conn.(*errorSourceConn)
require.True(t, ok)
require.Equal(t, crdbConn, onErrorWrapper.Conn)

conn.Close()
conns := tracker.GetConnsMap(tenantID)
Expand Down Expand Up @@ -710,9 +712,11 @@ func TestConnector_dialSQLServer(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

wrappedConn, ok := conn.(*onConnectionClose)
onCloseWrapper, ok := conn.(*onConnectionClose)
require.True(t, ok)
onErrorWrapper, ok := onCloseWrapper.Conn.(*errorSourceConn)
require.True(t, ok)
require.Equal(t, crdbConn, wrappedConn.Conn)
require.Equal(t, crdbConn, onErrorWrapper.Conn)

conn.Close()
conns := tracker.GetConnsMap(tenantID)
Expand Down
74 changes: 74 additions & 0 deletions pkg/ccl/sqlproxyccl/error_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package sqlproxyccl

import (
"net"

"github.com/cockroachdb/errors"
)

// errorSourceConn is used to annotate errors returned by the connection. The
// errors make it possible to determine which side of an io.Copy broke.
type errorSourceConn struct {
net.Conn
readErrMarker error
writeErrMarker error
}

// errClientWrite indicates the error occured when attempting to write to the
// client connection.
var errClientWrite = errors.New("client write error")

// errClientRead indicates the error occured when attempting to read from the
// client connection.
var errClientRead = errors.New("client read error")

// errServerWrite indicates the error occured when attempting to write to the
// sql server.
var errServerWrite = errors.New("server write error")

// errServerRead indicates the error occured when attempting to read from the
// sql server.
var errServerRead = errors.New("server read error")

// wrapConnectionError wraps the error with newErrorf and the appropriate code
// if it is a known connection error. nil is returned if the error is not
// recognized.
func wrapConnectionError(err error) error {
switch {
case errors.Is(err, errClientRead):
return newErrorf(codeClientReadFailed, "unable to read from client: %s", err)
case errors.Is(err, errClientWrite):
return newErrorf(codeClientWriteFailed, "unable to write to client: %s", err)
case errors.Is(err, errServerRead):
return newErrorf(codeBackendReadFailed, "unable to read from sql server: %s", err)
case errors.Is(err, errServerWrite):
return newErrorf(codeBackendWriteFailed, "unable to write to sql server: %s", err)
}
return nil
}

// Read wraps net.Conn.Read and annotates the returned error.
func (conn *errorSourceConn) Read(b []byte) (n int, err error) {
n, err = conn.Conn.Read(b)
if err != nil {
err = errors.Mark(err, conn.readErrMarker)
}
return n, err
}

// Write wraps net.Conn.Read and annotates the returned error.
func (conn *errorSourceConn) Write(b []byte) (n int, err error) {
n, err = conn.Conn.Write(b)
if err != nil {
err = errors.Mark(err, conn.writeErrMarker)
}
return n, err
}
109 changes: 109 additions & 0 deletions pkg/ccl/sqlproxyccl/error_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package sqlproxyccl

import (
"net"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

type fakeConn struct {
net.Conn
size int
readError error
writeError error
}

func (conn *fakeConn) Read(b []byte) (n int, err error) {
return conn.size, conn.readError
}

func (conn *fakeConn) Write(b []byte) (n int, err error) {
return conn.size, conn.writeError
}

func TestWrapConnectionError(t *testing.T) {
defer leaktest.AfterTest(t)()
type testCase struct {
marker error
code errorCode
}
tests := []testCase{
{errClientRead, codeClientReadFailed},
{errClientWrite, codeClientWriteFailed},
{errServerRead, codeBackendReadFailed},
{errServerWrite, codeBackendWriteFailed},
{errors.New("some random error"), 0},
}
for _, tc := range tests {
var code errorCode
err := wrapConnectionError(errors.Mark(errors.New("some inner error"), tc.marker))
if err != nil {
codeErr := &codeError{}
require.True(t, errors.As(err, &codeErr))
code = codeErr.code
}
require.Equal(t, code, tc.code)
}
}

func TestErrorSourceConn(t *testing.T) {
defer leaktest.AfterTest(t)()
newConn := func(size int, readError error, writeError error) net.Conn {
return &errorSourceConn{
Conn: &fakeConn{
size: size,
readError: readError,
writeError: writeError,
},
readErrMarker: errClientRead,
writeErrMarker: errClientWrite,
}
}

t.Run("WrapReadError", func(t *testing.T) {
internalErr := errors.New("some connection error")
conn := newConn(4, internalErr, nil)

size, err := conn.Read([]byte{})
require.Equal(t, size, 4)
require.True(t, errors.Is(err, internalErr))
require.True(t, errors.Is(err, errClientRead))
})

t.Run("WrapWriteError", func(t *testing.T) {
internalErr := errors.New("some connection error")
conn := newConn(4, nil, internalErr)

size, err := conn.Write([]byte{})
require.Equal(t, size, 4)
require.True(t, errors.Is(err, internalErr))
require.True(t, errors.Is(err, errClientWrite))
})

t.Run("OkayRead", func(t *testing.T) {
conn := newConn(4, nil, nil)

size, err := conn.Read([]byte{})
require.Equal(t, size, 4)
require.NoError(t, err)
})

t.Run("OkayWrite", func(t *testing.T) {
conn := newConn(4, nil, nil)

size, err := conn.Read([]byte{})
require.Equal(t, size, 4)
require.NoError(t, err)
})
}
10 changes: 8 additions & 2 deletions pkg/ccl/sqlproxyccl/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,10 @@ func wrapClientToServerError(err error) error {
errors.IsAny(err, context.Canceled, context.DeadlineExceeded) {
return nil
}
return newErrorf(codeClientDisconnected, "copying from client to target server: %v", err)
if err := wrapConnectionError(err); err != nil {
return err
}
return newErrorf(codeClientDisconnected, "unexpected error copying from client to target server: %v", err)
}

// wrapServerToClientError overrides server to client errors for external
Expand All @@ -371,7 +374,10 @@ func wrapServerToClientError(err error) error {
errors.IsAny(err, context.Canceled, context.DeadlineExceeded) {
return nil
}
return newErrorf(codeBackendDisconnected, "copying from target server to client: %s", err)
if err := wrapConnectionError(err); err != nil {
return err
}
return newErrorf(codeBackendDisconnected, "unexpected error copying from target server to client: %s", err)
}

// makeLogicalClockFn returns a function that implements a simple logical clock.
Expand Down
12 changes: 10 additions & 2 deletions pkg/ccl/sqlproxyccl/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,11 @@ func TestWrapClientToServerError(t *testing.T) {
// Forwarding errors.
{errors.New("foo"), newErrorf(
codeClientDisconnected,
"copying from client to target server: foo",
"unexpected error copying from client to target server: foo",
)},
{errors.Mark(errors.New("some write error"), errClientWrite), newErrorf(
codeClientWriteFailed,
"unable to write to client: some write error",
)},
} {
err := wrapClientToServerError(tc.input)
Expand Down Expand Up @@ -466,7 +470,11 @@ func TestWrapServerToClientError(t *testing.T) {
// Forwarding errors.
{errors.New("foo"), newErrorf(
codeBackendDisconnected,
"copying from target server to client: foo",
"unexpected error copying from target server to client: foo",
)},
{errors.Mark(errors.New("some read error"), errServerRead), newErrorf(
codeBackendReadFailed,
"unable to read from sql server: some read error",
)},
} {
err := wrapServerToClientError(tc.input)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ func (metrics *metrics) updateForError(err error) {
switch codeErr.code {
case codeExpiredClientConnection:
metrics.ExpiredClientConnCount.Inc(1)
case codeBackendDisconnected:
case codeBackendDisconnected, codeBackendReadFailed, codeBackendWriteFailed:
metrics.BackendDisconnectCount.Inc(1)
case codeClientDisconnected:
case codeClientDisconnected, codeClientWriteFailed, codeClientReadFailed:
metrics.ClientDisconnectCount.Inc(1)
case codeProxyRefusedConnection:
metrics.RefusedConnCount.Inc(1)
Expand Down
59 changes: 59 additions & 0 deletions pkg/ccl/sqlproxyccl/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package sqlproxyccl

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/stretchr/testify/require"
)

func TestMetricsUpdateForError(t *testing.T) {
defer leaktest.AfterTest(t)()
m := makeProxyMetrics()
type testCase struct {
code errorCode
counters []*metric.Counter
}
tests := []testCase{
{codeClientReadFailed, []*metric.Counter{m.ClientDisconnectCount}},
{codeClientWriteFailed, []*metric.Counter{m.ClientDisconnectCount}},
{codeClientDisconnected, []*metric.Counter{m.ClientDisconnectCount}},

{codeBackendDisconnected, []*metric.Counter{m.BackendDisconnectCount}},
{codeBackendReadFailed, []*metric.Counter{m.BackendDisconnectCount}},
{codeBackendWriteFailed, []*metric.Counter{m.BackendDisconnectCount}},

{codeExpiredClientConnection, []*metric.Counter{m.ExpiredClientConnCount}},

{codeProxyRefusedConnection, []*metric.Counter{m.RefusedConnCount, m.BackendDownCount}},

{codeParamsRoutingFailed, []*metric.Counter{m.RoutingErrCount, m.BackendDownCount}},
{codeUnavailable, []*metric.Counter{m.RoutingErrCount, m.BackendDownCount}},

{codeBackendDown, []*metric.Counter{m.BackendDownCount}},

{codeAuthFailed, []*metric.Counter{m.AuthFailedCount}},
}

for _, tc := range tests {
t.Run(tc.code.String(), func(t *testing.T) {
var before []int64
for _, counter := range tc.counters {
before = append(before, counter.Count())
}
m.updateForError(newErrorf(tc.code, "test error"))
for i, counter := range tc.counters {
require.Equal(t, counter.Count(), before[i]+1)
}
})
}
}
12 changes: 11 additions & 1 deletion pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,18 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn net.Conn)
log.Infof(ctx, "closing after %.2fs", timeutil.Since(connBegin).Seconds())
}()

// Wrap the client connection with an error annotater. WARNING: The TLS
// wrapper must be inside the errorSourceConn and not the other way around.
// The TLS connection attempts to cast errors to a net.Err and will behave
// incorrectly if handed a marked error.
clientConn := &errorSourceConn{
Conn: fe.Conn,
readErrMarker: errClientRead,
writeErrMarker: errClientWrite,
}

// Pass ownership of conn and crdbConn to the forwarder.
if err := f.run(fe.Conn, crdbConn); err != nil {
if err := f.run(clientConn, crdbConn); err != nil {
// Don't send to the client here for the same reason below.
handler.metrics.updateForError(err)
return errors.Wrap(err, "running forwarder")
Expand Down
Loading

0 comments on commit 933df73

Please sign in to comment.