Skip to content

Commit

Permalink
channelz: stage 4 - add security and socket option info (#2098)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuxuan authored Jun 5, 2018
1 parent 4344c20 commit c1a21e2
Show file tree
Hide file tree
Showing 24 changed files with 1,121 additions and 73 deletions.
103 changes: 103 additions & 0 deletions channelz/service/func_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package service

import (
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/channelz"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
)

func sockoptToProto(skopts *channelz.SocketOptionData) []*channelzpb.SocketOption {
var opts []*channelzpb.SocketOption
if skopts.Linger != nil {
additional, err := ptypes.MarshalAny(&channelzpb.SocketOptionLinger{
Active: skopts.Linger.Onoff != 0,
Duration: convertToPtypesDuration(int64(skopts.Linger.Linger), 0),
})
if err == nil {
opts = append(opts, &channelzpb.SocketOption{
Name: "SO_LINGER",
Additional: additional,
})
}
}
if skopts.RecvTimeout != nil {
additional, err := ptypes.MarshalAny(&channelzpb.SocketOptionTimeout{
Duration: convertToPtypesDuration(int64(skopts.RecvTimeout.Sec), int64(skopts.RecvTimeout.Usec)),
})
if err == nil {
opts = append(opts, &channelzpb.SocketOption{
Name: "SO_RCVTIMEO",
Additional: additional,
})
}
}
if skopts.SendTimeout != nil {
additional, err := ptypes.MarshalAny(&channelzpb.SocketOptionTimeout{
Duration: convertToPtypesDuration(int64(skopts.SendTimeout.Sec), int64(skopts.SendTimeout.Usec)),
})
if err == nil {
opts = append(opts, &channelzpb.SocketOption{
Name: "SO_SNDTIMEO",
Additional: additional,
})
}
}
if skopts.TCPInfo != nil {
additional, err := ptypes.MarshalAny(&channelzpb.SocketOptionTcpInfo{
TcpiState: uint32(skopts.TCPInfo.State),
TcpiCaState: uint32(skopts.TCPInfo.Ca_state),
TcpiRetransmits: uint32(skopts.TCPInfo.Retransmits),
TcpiProbes: uint32(skopts.TCPInfo.Probes),
TcpiBackoff: uint32(skopts.TCPInfo.Backoff),
TcpiOptions: uint32(skopts.TCPInfo.Options),
// https://golang.org/pkg/syscall/#TCPInfo
// TCPInfo struct does not contain info about TcpiSndWscale and TcpiRcvWscale.
TcpiRto: skopts.TCPInfo.Rto,
TcpiAto: skopts.TCPInfo.Ato,
TcpiSndMss: skopts.TCPInfo.Snd_mss,
TcpiRcvMss: skopts.TCPInfo.Rcv_mss,
TcpiUnacked: skopts.TCPInfo.Unacked,
TcpiSacked: skopts.TCPInfo.Sacked,
TcpiLost: skopts.TCPInfo.Lost,
TcpiRetrans: skopts.TCPInfo.Retrans,
TcpiFackets: skopts.TCPInfo.Fackets,
TcpiLastDataSent: skopts.TCPInfo.Last_data_sent,
TcpiLastAckSent: skopts.TCPInfo.Last_ack_sent,
TcpiLastDataRecv: skopts.TCPInfo.Last_data_recv,
TcpiLastAckRecv: skopts.TCPInfo.Last_ack_recv,
TcpiPmtu: skopts.TCPInfo.Pmtu,
TcpiRcvSsthresh: skopts.TCPInfo.Rcv_ssthresh,
TcpiRtt: skopts.TCPInfo.Rtt,
TcpiRttvar: skopts.TCPInfo.Rttvar,
TcpiSndSsthresh: skopts.TCPInfo.Snd_ssthresh,
TcpiSndCwnd: skopts.TCPInfo.Snd_cwnd,
TcpiAdvmss: skopts.TCPInfo.Advmss,
TcpiReordering: skopts.TCPInfo.Reordering,
})
if err == nil {
opts = append(opts, &channelzpb.SocketOption{
Name: "TCP_INFO",
Additional: additional,
})
}
}
return opts
}
30 changes: 30 additions & 0 deletions channelz/service/func_nonlinux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// +build !linux

/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package service

import (
"google.golang.org/grpc/channelz"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
)

func sockoptToProto(skopts *channelz.SocketOptionData) []*channelzpb.SocketOption {
return nil
}
34 changes: 34 additions & 0 deletions channelz/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,24 @@ package service

import (
"net"
"time"

"github.com/golang/protobuf/ptypes"
durpb "github.com/golang/protobuf/ptypes/duration"
wrpb "github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz"
channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
)

func convertToPtypesDuration(sec int64, usec int64) *durpb.Duration {
return ptypes.DurationProto(time.Duration(sec*1e9 + usec*1e3))
}

// RegisterChannelzServiceToServer registers the channelz service to the given server.
func RegisterChannelzServiceToServer(s *grpc.Server) {
channelzgrpc.RegisterChannelzServer(s, &serverImpl{})
Expand Down Expand Up @@ -128,6 +135,26 @@ func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchann
return sc
}

func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security {
switch v := se.(type) {
case *credentials.TLSChannelzSecurityValue:
return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{
CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName},
LocalCertificate: v.LocalCertificate,
RemoteCertificate: v.RemoteCertificate,
}}}
case *credentials.OtherChannelzSecurityValue:
otherSecurity := &channelzpb.Security_OtherSecurity{
Name: v.Name,
}
if anyval, err := ptypes.MarshalAny(v.Value); err == nil {
otherSecurity.Value = anyval
}
return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}}
}
return nil
}

func addrToProto(a net.Addr) *channelzpb.Address {
switch a.Network() {
case "udp":
Expand Down Expand Up @@ -175,6 +202,13 @@ func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}

if sm.SocketData.SocketOptions != nil {
s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions)
}
if sm.SocketData.Security != nil {
s.Security = securityToProto(sm.SocketData.Security)
}

if sm.SocketData.LocalAddr != nil {
s.Local = addrToProto(sm.SocketData.LocalAddr)
}
Expand Down
196 changes: 196 additions & 0 deletions channelz/service/service_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// +build linux

/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// SocketOptions is only supported on linux system. The functions defined in
// this file are to parse the socket option field and the test is specifically
// to verify the behavior of socket option parsing.

package service

import (
"reflect"
"strconv"
"testing"

"github.com/golang/protobuf/ptypes"
pdur "github.com/golang/protobuf/ptypes/duration"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
"google.golang.org/grpc/channelz"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
)

func convertToDuration(d *pdur.Duration) (sec int64, usec int64) {
if d != nil {
if dur, err := ptypes.Duration(d); err == nil {
sec = int64(int64(dur) / 1e9)
usec = (int64(dur) - sec*1e9) / 1e3
}
}
return
}

func protoToLinger(protoLinger *channelzpb.SocketOptionLinger) *unix.Linger {
linger := &unix.Linger{}
if protoLinger.GetActive() {
linger.Onoff = 1
}
lv, _ := convertToDuration(protoLinger.GetDuration())
linger.Linger = int32(lv)
return linger
}

func protoToSocketOption(skopts []*channelzpb.SocketOption) *channelz.SocketOptionData {
skdata := &channelz.SocketOptionData{}
for _, opt := range skopts {
switch opt.GetName() {
case "SO_LINGER":
protoLinger := &channelzpb.SocketOptionLinger{}
err := ptypes.UnmarshalAny(opt.GetAdditional(), protoLinger)
if err == nil {
skdata.Linger = protoToLinger(protoLinger)
}
case "SO_RCVTIMEO":
protoTimeout := &channelzpb.SocketOptionTimeout{}
err := ptypes.UnmarshalAny(opt.GetAdditional(), protoTimeout)
if err == nil {
skdata.RecvTimeout = protoToTime(protoTimeout)
}
case "SO_SNDTIMEO":
protoTimeout := &channelzpb.SocketOptionTimeout{}
err := ptypes.UnmarshalAny(opt.GetAdditional(), protoTimeout)
if err == nil {
skdata.SendTimeout = protoToTime(protoTimeout)
}
case "TCP_INFO":
tcpi := &channelzpb.SocketOptionTcpInfo{}
err := ptypes.UnmarshalAny(opt.GetAdditional(), tcpi)
if err == nil {
skdata.TCPInfo = &unix.TCPInfo{
State: uint8(tcpi.TcpiState),
Ca_state: uint8(tcpi.TcpiCaState),
Retransmits: uint8(tcpi.TcpiRetransmits),
Probes: uint8(tcpi.TcpiProbes),
Backoff: uint8(tcpi.TcpiBackoff),
Options: uint8(tcpi.TcpiOptions),
Rto: tcpi.TcpiRto,
Ato: tcpi.TcpiAto,
Snd_mss: tcpi.TcpiSndMss,
Rcv_mss: tcpi.TcpiRcvMss,
Unacked: tcpi.TcpiUnacked,
Sacked: tcpi.TcpiSacked,
Lost: tcpi.TcpiLost,
Retrans: tcpi.TcpiRetrans,
Fackets: tcpi.TcpiFackets,
Last_data_sent: tcpi.TcpiLastDataSent,
Last_ack_sent: tcpi.TcpiLastAckSent,
Last_data_recv: tcpi.TcpiLastDataRecv,
Last_ack_recv: tcpi.TcpiLastAckRecv,
Pmtu: tcpi.TcpiPmtu,
Rcv_ssthresh: tcpi.TcpiRcvSsthresh,
Rtt: tcpi.TcpiRtt,
Rttvar: tcpi.TcpiRttvar,
Snd_ssthresh: tcpi.TcpiSndSsthresh,
Snd_cwnd: tcpi.TcpiSndCwnd,
Advmss: tcpi.TcpiAdvmss,
Reordering: tcpi.TcpiReordering}
}
}
}
return skdata
}

func socketProtoToStruct(s *channelzpb.Socket) *dummySocket {
ds := &dummySocket{}
pdata := s.GetData()
ds.streamsStarted = pdata.GetStreamsStarted()
ds.streamsSucceeded = pdata.GetStreamsSucceeded()
ds.streamsFailed = pdata.GetStreamsFailed()
ds.messagesSent = pdata.GetMessagesSent()
ds.messagesReceived = pdata.GetMessagesReceived()
ds.keepAlivesSent = pdata.GetKeepAlivesSent()
if t, err := ptypes.Timestamp(pdata.GetLastLocalStreamCreatedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastLocalStreamCreatedTimestamp = t
}
}
if t, err := ptypes.Timestamp(pdata.GetLastRemoteStreamCreatedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastRemoteStreamCreatedTimestamp = t
}
}
if t, err := ptypes.Timestamp(pdata.GetLastMessageSentTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastMessageSentTimestamp = t
}
}
if t, err := ptypes.Timestamp(pdata.GetLastMessageReceivedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastMessageReceivedTimestamp = t
}
}
if v := pdata.GetLocalFlowControlWindow(); v != nil {
ds.localFlowControlWindow = v.Value
}
if v := pdata.GetRemoteFlowControlWindow(); v != nil {
ds.remoteFlowControlWindow = v.Value
}
if v := pdata.GetOption(); v != nil {
ds.SocketOptions = protoToSocketOption(v)
}
if v := s.GetSecurity(); v != nil {
ds.Security = protoToSecurity(v)
}
if local := s.GetLocal(); local != nil {
ds.localAddr = protoToAddr(local)
}
if remote := s.GetRemote(); remote != nil {
ds.remoteAddr = protoToAddr(remote)
}
ds.remoteName = s.GetRemoteName()
return ds
}

func TestGetSocketOptions(t *testing.T) {
channelz.NewChannelzStorage()
ss := []*dummySocket{
{
SocketOptions: &channelz.SocketOptionData{
Linger: &unix.Linger{Onoff: 1, Linger: 2},
RecvTimeout: &unix.Timeval{Sec: 10, Usec: 1},
SendTimeout: &unix.Timeval{},
TCPInfo: &unix.TCPInfo{State: 1},
},
},
}
svr := newCZServer()
ids := make([]int64, len(ss))
svrID := channelz.RegisterServer(&dummyServer{}, "")
for i, s := range ss {
ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
}
for i, s := range ss {
resp, _ := svr.GetSocket(context.Background(), &channelzpb.GetSocketRequest{SocketId: ids[i]})
metrics := resp.GetSocket()
if !reflect.DeepEqual(metrics.GetRef(), &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}) || !reflect.DeepEqual(socketProtoToStruct(metrics), s) {
t.Fatalf("resp.GetSocket() want: metrics.GetRef() = %#v and %#v, got: metrics.GetRef() = %#v and %#v", &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}, s, metrics.GetRef(), socketProtoToStruct(metrics))
}
}
}
Loading

0 comments on commit c1a21e2

Please sign in to comment.