Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options to allow overriding SendBufferSize pre process name prefix #772

Merged
merged 1 commit into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -48,9 +49,9 @@ const (
// is specified in the context.
DefaultConnectTimeout = 5 * time.Second

// defaultConnectionBufferSize is the default size for the connection's
// read and write channels.
defaultConnectionBufferSize = 512
// DefaultConnectionBufferSize is the default size for the connection's read
//and write channels.
DefaultConnectionBufferSize = 512
)

// PeerVersion contains version related information for a specific peer.
Expand Down Expand Up @@ -124,6 +125,13 @@ func (e errConnectionUnknownState) Error() string {
return fmt.Sprintf("connection is in unknown state: %v at %v", e.state, e.site)
}

// SendBufferSizeOverride is used for overriding per-process send buffer channel size for a
// connection, using process name prefix matching.
type SendBufferSizeOverride struct {
ProcessNamePrefix string
SendBufferSize int
}

// ConnectionOptions are options that control the behavior of a Connection
type ConnectionOptions struct {
// The frame pool, allowing better management of frame buffers. Defaults to using raw heap.
Expand All @@ -135,6 +143,10 @@ type ConnectionOptions struct {
// The size of send channel buffers. Defaults to 512.
SendBufferSize int

// Per-process name prefix override for SendBufferSize
// Note that order matters, if there are multiple matches, the first one is used.
SendBufferSizeOverrides []SendBufferSizeOverride

// The type of checksum to use when sending messages.
ChecksumType ChecksumType

Expand Down Expand Up @@ -258,12 +270,21 @@ func (co ConnectionOptions) withDefaults() ConnectionOptions {
co.FramePool = DefaultFramePool
}
if co.SendBufferSize <= 0 {
co.SendBufferSize = defaultConnectionBufferSize
co.SendBufferSize = DefaultConnectionBufferSize
}
co.HealthChecks = co.HealthChecks.withDefaults()
return co
}

func (co ConnectionOptions) getSendBufferSize(processName string) int {
for _, override := range co.SendBufferSizeOverrides {
if strings.HasPrefix(processName, override.ProcessNamePrefix) {
return override.SendBufferSize
}
}
return co.SendBufferSize
}

func (ch *Channel) setConnectionTosPriority(tosPriority tos.ToS, c net.Conn) error {
tcpAddr, isTCP := c.RemoteAddr().(*net.TCPAddr)
if !isTCP {
Expand Down Expand Up @@ -311,7 +332,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str
connDirection: connDirection,
opts: opts,
state: connectionActive,
sendCh: make(chan *Frame, opts.SendBufferSize),
sendCh: make(chan *Frame, opts.getSendBufferSize(remotePeer.ProcessName)),
stopCh: make(chan struct{}),
localPeerInfo: peerInfo,
remotePeerInfo: remotePeer,
Expand Down
60 changes: 60 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,66 @@ func TestLastActivityTimePings(t *testing.T) {
})
}

func TestSendBufferSize(t *testing.T) {
opts := testutils.NewOpts().SetSendBufferSize(512).SetSendBufferSizeOverrides([]SendBufferSizeOverride{
{"abc", 1024},
{"abcd", 2048}, // This should never match, since we match the list in order.
{"xyz", 3072},
})
tests := []struct {
processName string
expectSendChCapacity int
}{
{
processName: "abc",
expectSendChCapacity: 1024,
},
{
processName: "abcd",
expectSendChCapacity: 1024,
},
{
processName: "bcd",
expectSendChCapacity: DefaultConnectionBufferSize,
},
lingyuan2014 marked this conversation as resolved.
Show resolved Hide resolved
{
processName: "dabc",
expectSendChCapacity: DefaultConnectionBufferSize,
},
{
processName: "dabcd",
expectSendChCapacity: DefaultConnectionBufferSize,
},
{
processName: "abcde",
expectSendChCapacity: 1024,
},
{
processName: "xyzabc",
expectSendChCapacity: 3072,
},
}
lingyuan2014 marked this conversation as resolved.
Show resolved Hide resolved

for _, tt := range tests {
t.Run(tt.processName, func(t *testing.T) {
testutils.WithTestServer(t, opts, func(tb testing.TB, ts *testutils.TestServer) {
client := ts.NewClient(opts.SetProcessName(tt.processName))

// Send an 'echo' to establish the connection.
testutils.RegisterEcho(ts.Server(), nil)
require.NoError(t, testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil))

// WithTestSever will test with and without relay.
if ts.HasRelay() {
assert.Equal(t, tt.expectSendChCapacity, getConnection(t, ts.Relay(), inbound).SendChCapacity)
} else {
assert.Equal(t, tt.expectSendChCapacity, getConnection(t, ts.Server(), inbound).SendChCapacity)
}
})
})
}
}

func TestInvalidTransportHeaders(t *testing.T) {
long100 := strings.Repeat("0123456789", 10)
long300 := strings.Repeat("0123456789", 30)
Expand Down
6 changes: 6 additions & 0 deletions testutils/channel_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func (o *ChannelOpts) SetSendBufferSize(bufSize int) *ChannelOpts {
return o
}

// SetSendBufferSizeOverrides sets the SendBufferOverrides in DefaultConnectionOptions.
func (o *ChannelOpts) SetSendBufferSizeOverrides(overrides []tchannel.SendBufferSizeOverride) *ChannelOpts {
o.DefaultConnectionOptions.SendBufferSizeOverrides = overrides
alxn marked this conversation as resolved.
Show resolved Hide resolved
return o
}

// SetTosPriority set TosPriority in DefaultConnectionOptions.
func (o *ChannelOpts) SetTosPriority(tosPriority tos.ToS) *ChannelOpts {
o.DefaultConnectionOptions.TosPriority = tosPriority
Expand Down