Skip to content

Commit

Permalink
Merge pull request #461 from uber/dev
Browse files Browse the repository at this point in the history
Release v1.0.9
  • Loading branch information
prashantv authored Jul 21, 2016
2 parents 21ec325 + 67b744f commit cddba8c
Show file tree
Hide file tree
Showing 18 changed files with 179 additions and 32 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

# 1.0.9

* Expose meta endpoints on the "tchannel" service name. (#459)
* Add Go version and tchannel-go library version to introspection. (#457)
* Better handling of peers where dialed host:port doesn't match the remote
connection's reported host:port. (#452)
* Expose the number of connections on a channel. (#451)

# 1.0.8

* Remove dependency on "testing" from "tchannel-go" introduced in v1.0.7.
Expand Down
20 changes: 12 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ endif

PATH := $(GOPATH)/bin:$(PATH)
EXAMPLES=./examples/bench/server ./examples/bench/client ./examples/ping ./examples/thrift ./examples/hyperbahn/echo-server
PKGS := . ./json ./hyperbahn ./thrift ./typed ./trace $(EXAMPLES)
ALL_PKGS := $(shell glide nv)
PROD_PKGS := . ./http ./hyperbahn ./json ./pprof ./raw ./relay ./stats ./thrift $(EXAMPLES)
TEST_ARG ?= -race -v -timeout 5m
BUILD := ./build
THRIFT_GEN_RELEASE := ./thrift-gen-release
Expand Down Expand Up @@ -74,7 +75,7 @@ install_ci: install_glide install_lint get_thrift install
GOPATH=$(OLD_GOPATH) go get -u github.com/mattn/goveralls

install_test:
go test -i $(TEST_ARG) $(shell glide nv)
go test -i $(TEST_ARG) $(ALL_PKGS)

help:
@egrep "^# target:" [Mm]akefile | sort -
Expand All @@ -87,20 +88,23 @@ clean:

fmt format:
echo Formatting Packages...
go fmt $(PKGS)
go fmt $(ALL_PKGS)
echo

test_ci: test

test: clean setup install_test
test: clean setup install_test check_no_test_deps
@echo Testing packages:
go test -parallel=4 $(TEST_ARG) $(shell glide nv)
go test -parallel=4 $(TEST_ARG) $(ALL_PKGS)
@echo Running frame pool tests
go test -run TestFramesReleased -stressTest $(TEST_ARG)

check_no_test_deps:
! go list -json $(PROD_PKGS) | jq -r .Deps[] | grep -e test -e mock

benchmark: clean setup
echo Running benchmarks:
go test $(PKGS) -bench=. -cpu=1 -benchmem -run NONE
go test $(ALL_PKGS) -bench=. -cpu=1 -benchmem -run NONE

cover_profile: clean setup
@echo Testing packages:
Expand Down Expand Up @@ -156,7 +160,7 @@ thrift_gen:
go build -o $(BUILD)/thrift-gen ./thrift/thrift-gen
$(BUILD)/thrift-gen --generateThrift --inputFile thrift/test.thrift --outputDir thrift/gen-go/
$(BUILD)/thrift-gen --generateThrift --inputFile examples/keyvalue/keyvalue.thrift --outputDir examples/keyvalue/gen-go
$(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/test.thrift --outputDir examples/thrift/gen-go
$(BUILD)/thrift-gen --generateThrift --inputFile examples/thrift/example.thrift --outputDir examples/thrift/gen-go
$(BUILD)/thrift-gen --generateThrift --inputFile hyperbahn/hyperbahn.thrift --outputDir hyperbahn/gen-go
rm -rf trace/thrift/gen-go/tcollector && $(BUILD)/thrift-gen --generateThrift --inputFile trace/tcollector.thrift --outputDir trace/thrift/gen-go/

Expand All @@ -166,5 +170,5 @@ release_thrift_gen: clean setup
tar -czf thrift-gen-release.tar.gz $(THRIFT_GEN_RELEASE)
mv thrift-gen-release.tar.gz $(THRIFT_GEN_RELEASE)/

.PHONY: all help clean fmt format get_thrift install install_ci install_lint install_glide release_thrift_gen packages_test test test_ci lint
.PHONY: all help clean fmt format get_thrift install install_ci install_lint install_glide release_thrift_gen packages_test check_no_test_deps test test_ci lint
.SILENT: all help clean fmt format test lint
27 changes: 25 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,19 @@ func (ch *Channel) Connect(ctx context.Context, hostPort string) (*Connection, e
return nil, err
}

return c, err
// It's possible that the connection we just created responds with a host:port
// that is not what we tried to connect to. E.g., we may have connected to
// 127.0.0.1:1234, but the returned host:port may be 10.0.0.1:1234.
// In this case, the connection won't be added to 127.0.0.1:1234 peer
// and so future calls to that peer may end up creating new connections. To
// avoid this issue, and to avoid clients being aware of any TCP relays, we
// add the connection to the intended peer.
if hostPort != c.remotePeerInfo.HostPort {
c.log.Debugf("Outbound connection host:port mismatch, adding to peer %v", c.remotePeerInfo.HostPort)
ch.addConnectionToPeer(hostPort, c, outbound)
}

return c, nil
}

// exchangeUpdated updates the peer heap.
Expand Down Expand Up @@ -564,7 +576,11 @@ func (ch *Channel) connectionActive(c *Connection, direction connectionDirection
return
}

p := ch.rootPeers().GetOrAdd(c.remotePeerInfo.HostPort)
ch.addConnectionToPeer(c.remotePeerInfo.HostPort, c, direction)
}

func (ch *Channel) addConnectionToPeer(hostPort string, c *Connection, direction connectionDirection) {
p := ch.rootPeers().GetOrAdd(hostPort)
if err := p.addConnection(c, direction); err != nil {
c.log.WithFields(
LogField{"remoteHostPort", c.remotePeerInfo.HostPort},
Expand Down Expand Up @@ -612,6 +628,13 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) {
peer.connectionCloseStateChange(c)
ch.updatePeer(peer)
}
if c.outboundHP != "" && c.outboundHP != c.remotePeerInfo.HostPort {
// Outbound connections may be in multiple peers.
if peer, ok := ch.rootPeers().Get(c.outboundHP); ok {
peer.connectionCloseStateChange(c)
ch.updatePeer(peer)
}
}

chState := ch.State()
if chState != ChannelStartClose && chState != ChannelInboundClosed {
Expand Down
12 changes: 9 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ type Connection struct {
commonStatsTags map[string]string
relay *Relayer

// outboundHP is the host:port we used to create this outbound connection.
// It may not match remotePeerInfo.HostPort, in which case the connection is
// added to peers for both host:ports. For inbound connections, this is empty.
outboundHP string

// closeNetworkCalled is used to avoid errors from being logged
// when this side closes a connection.
closeNetworkCalled atomic.Int32
Expand Down Expand Up @@ -213,16 +218,16 @@ func (ch *Channel) newOutboundConnection(timeout time.Duration, hostPort string,
return nil, err
}

return ch.newConnection(conn, connectionWaitingToSendInitReq, events), nil
return ch.newConnection(conn, hostPort, connectionWaitingToSendInitReq, events), nil
}

// Creates a new Connection based on an incoming connection from a peer
func (ch *Channel) newInboundConnection(conn net.Conn, events connectionEvents) (*Connection, error) {
return ch.newConnection(conn, connectionWaitingToRecvInitReq, events), nil
return ch.newConnection(conn, "" /* outboundHP */, connectionWaitingToRecvInitReq, events), nil
}

// Creates a new connection in a given initial state
func (ch *Channel) newConnection(conn net.Conn, initialState connectionState, events connectionEvents) *Connection {
func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState connectionState, events connectionEvents) *Connection {
opts := &ch.connectionOptions

checksumType := opts.ChecksumType
Expand Down Expand Up @@ -264,6 +269,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialState connectionState, ev
sendCh: make(chan *Frame, sendBufferSize),
stopCh: make(chan struct{}),
localPeerInfo: peerInfo,
outboundHP: outboundHP,
checksumType: checksumType,
inbound: newMessageExchangeSet(log, messageExchangeSetInbound),
outbound: newMessageExchangeSet(log, messageExchangeSetOutbound),
Expand Down
File renamed without changes.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// @generated Code generated by thrift-gen. Do not modify.

// Package test is generated code used to make or handle TChannel calls using Thrift.
package test
// Package example is generated code used to make or handle TChannel calls using Thrift.
package example

import (
"fmt"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/thrift/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"time"

tchannel "github.com/uber/tchannel-go"
gen "github.com/uber/tchannel-go/examples/thrift/gen-go/test"
gen "github.com/uber/tchannel-go/examples/thrift/gen-go/example"
"github.com/uber/tchannel-go/thrift"
)

Expand Down
31 changes: 31 additions & 0 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ type IntrospectionOptions struct {
IncludeTombstones bool `json:"includeTombstones"`
}

// RuntimeVersion includes version information about the runtime and
// the tchannel library.
type RuntimeVersion struct {
GoVersion string `json:"goVersion"`
LibraryVersion string `json:"tchannelVersion"`
}

// RuntimeState is a snapshot of the runtime state for a channel.
type RuntimeState struct {
// CreatedStack is the stack for how this channel was created.
Expand All @@ -66,6 +73,9 @@ type RuntimeState struct {

// OtherChannels is information about any other channels running in this process.
OtherChannels map[string][]ChannelInfo `json:"otherChannels,omitEmpty"`

// RuntimeVersion is the version information about the runtime and the library.
RuntimeVersion RuntimeVersion `json:"runtimeVersion"`
}

// GoRuntimeStateOptions are the options used when getting Go runtime state.
Expand Down Expand Up @@ -125,6 +135,7 @@ type ConnectionRuntimeState struct {
ConnectionState string `json:"connectionState"`
LocalHostPort string `json:"localHostPort"`
RemoteHostPort string `json:"remoteHostPort"`
OutboundHostPort string `json:"outboundHostPort"`
IsEphemeral bool `json:"isEphemeral"`
InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"`
OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"`
Expand Down Expand Up @@ -200,6 +211,7 @@ func (ch *Channel) IntrospectState(opts *IntrospectionOptions) *RuntimeState {
NumConnections: numConns,
Connections: connIDs,
OtherChannels: ch.IntrospectOthers(opts),
RuntimeVersion: introspectRuntimeVersion(),
}
}

Expand Down Expand Up @@ -314,6 +326,7 @@ func (c *Connection) IntrospectState(opts *IntrospectionOptions) ConnectionRunti
ConnectionState: c.state.String(),
LocalHostPort: c.conn.LocalAddr().String(),
RemoteHostPort: c.conn.RemoteAddr().String(),
OutboundHostPort: c.outboundHP,
IsEphemeral: c.remotePeerInfo.IsEphemeral,
InboundExchange: c.inbound.IntrospectState(opts),
OutboundExchange: c.outbound.IntrospectState(opts),
Expand Down Expand Up @@ -419,6 +432,15 @@ func (l *PeerList) IntrospectList(opts *IntrospectionOptions) []SubPeerScore {
return peers
}

// IntrospectNumConnections returns the number of connections returns the number
// of connections. Note: like other introspection APIs, this is not a stable API.
func (ch *Channel) IntrospectNumConnections() int {
ch.mutable.RLock()
numConns := len(ch.mutable.conns)
ch.mutable.RUnlock()
return numConns
}

func handleInternalRuntime(arg3 []byte) interface{} {
var opts GoRuntimeStateOptions
json.Unmarshal(arg3, &opts)
Expand All @@ -436,6 +458,13 @@ func handleInternalRuntime(arg3 []byte) interface{} {
return state
}

func introspectRuntimeVersion() RuntimeVersion {
return RuntimeVersion{
GoVersion: runtime.Version(),
LibraryVersion: VersionInfo,
}
}

// registerInternal registers the following internal handlers which return runtime state:
// _gometa_introspect: TChannel internal state.
// _gometa_runtime: Golang runtime stats.
Expand All @@ -448,6 +477,7 @@ func (ch *Channel) registerInternal() {
{"_gometa_runtime", handleInternalRuntime},
}

tchanSC := ch.GetSubChannel("tchannel")
for _, ep := range endpoints {
// We need ep in our closure.
ep := ep
Expand All @@ -465,5 +495,6 @@ func (ch *Channel) registerInternal() {
NewArgWriter(call.Response().Arg3Writer()).WriteJSON(ep.handler(arg3))
}
ch.Register(HandlerFunc(handler), ep.name)
tchanSC.Register(HandlerFunc(handler), ep.name)
}
}
44 changes: 43 additions & 1 deletion introspection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ import (
"testing"
"time"

. "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/json"
"github.com/uber/tchannel-go/testutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Purpose of this test is to ensure introspection doesn't cause any panics
// and we have coverage of the introspection code.
// TODO: Add introspection for the relay as well.
func TestIntrospection(t *testing.T) {
testutils.WithTestServer(t, nil, func(ts *testutils.TestServer) {
client := testutils.NewClient(t, nil)
Expand All @@ -54,5 +55,46 @@ func TestIntrospection(t *testing.T) {
"includeGoStacks": true,
}, &resp)
require.NoError(t, err, "Call _gometa_runtime failed")

if !ts.HasRelay() {
// Try making the call on the "tchannel" service which is where meta handlers
// are registered. This will only work when we call it directly as the relay
// will not forward the tchannel service.
err = json.CallPeer(ctx, peer, "tchannel", "_gometa_runtime", map[string]interface{}{
"includeGoStacks": true,
}, &resp)
require.NoError(t, err, "Call _gometa_runtime failed")
}
})
}

func TestIntrospectNumConnections(t *testing.T) {
// Disable the relay, since the relay does not maintain a 1:1 mapping betewen
// incoming connections vs outgoing connections.
opts := testutils.NewOpts().NoRelay()
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
ctx, cancel := NewContext(time.Second)
defer cancel()

assert.Equal(t, 0, ts.Server().IntrospectNumConnections(), "Expected no connection on new server")

for i := 0; i < 10; i++ {
client := ts.NewClient(nil)
defer client.Close()

require.NoError(t, client.Ping(ctx, ts.HostPort()), "Ping from new client failed")
assert.Equal(t, 1, client.IntrospectNumConnections(), "Client should have single connection")
assert.Equal(t, i+1, ts.Server().IntrospectNumConnections(), "Incorrect number of server connections")
}

// Make sure that a closed connection will reduce NumConnections.
client := ts.NewClient(nil)
require.NoError(t, client.Ping(ctx, ts.HostPort()), "Ping from new client failed")
assert.Equal(t, 11, ts.Server().IntrospectNumConnections(), "Number of connections expected to increase")

client.Close()
require.True(t, testutils.WaitFor(100*time.Millisecond, func() bool {
return ts.Server().IntrospectNumConnections() == 10
}), "Closed connection did not get removed, num connections is %v", ts.Server().IntrospectNumConnections())
})
}
Loading

0 comments on commit cddba8c

Please sign in to comment.