Skip to content

Commit

Permalink
refactor signal client sync func (#147)
Browse files Browse the repository at this point in the history
* refactor: move goroutine that runs Signal Client Receive to the engine for better control

* chore: fix comments typo

* test: fix golint

* chore: comments update

* chore: consider connection state=READY in signal and management clients

* chore: fix typos

* test: fix signal ping-pong test

* chore: add wait condition to signal client

* refactor: add stream status to the Signal client

* refactor: defer mutex unlock
  • Loading branch information
braginini authored Nov 6, 2021
1 parent 4d34fb4 commit ed1e4df
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 124 deletions.
118 changes: 64 additions & 54 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"
"github.com/cenkalti/backoff/v4"
ice "github.com/pion/ice/v2"
"github.com/pion/ice/v2"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/iface"
mgm "github.com/wiretrustee/wiretrustee/management/client"
Expand Down Expand Up @@ -142,7 +142,7 @@ func (e *Engine) initializePeer(peer Peer) {
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 5 * time.Second,
MaxElapsedTime: time.Duration(0), //never stop
MaxElapsedTime: 0, //never stop
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, e.ctx)
Expand All @@ -157,8 +157,7 @@ func (e *Engine) initializePeer(peer Peer) {
}

if err != nil {
log.Warnln(err)
log.Debugf("retrying connection because of error: %s", err.Error())
log.Infof("retrying connection because of error: %s", err.Error())
return err
}
return nil
Expand Down Expand Up @@ -332,6 +331,8 @@ func (e *Engine) receiveManagementEvents() {
return nil
})
if err != nil {
// happens if management is unavailable for a long time.
// We want to cancel the operation of the whole client
e.cancel()
return
}
Expand Down Expand Up @@ -414,68 +415,77 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {

// receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers
func (e *Engine) receiveSignalEvents() {
// connect to a stream of messages coming from the signal server
e.signal.Receive(func(msg *sProto.Message) error {

e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()

conn := e.conns[msg.Key]
if conn == nil {
return fmt.Errorf("wrongly addressed message %s", msg.Key)
}
go func() {
// connect to a stream of messages coming from the signal server
err := e.signal.Receive(func(msg *sProto.Message) error {

if conn.Config.RemoteWgKey.String() != msg.Key {
return fmt.Errorf("unknown peer %s", msg.Key)
}
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()

switch msg.GetBody().Type {
case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
conn := e.conns[msg.Key]
if conn == nil {
return fmt.Errorf("wrongly addressed message %s", msg.Key)
}
err = conn.OnOffer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})

if err != nil {
return err
if conn.Config.RemoteWgKey.String() != msg.Key {
return fmt.Errorf("unknown peer %s", msg.Key)
}

return nil
case sProto.Body_ANSWER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
}
err = conn.OnAnswer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
switch msg.GetBody().Type {
case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
}
err = conn.OnOffer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})

if err != nil {
return err
}
if err != nil {
return err
}

case sProto.Body_CANDIDATE:
return nil
case sProto.Body_ANSWER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
}
err = conn.OnAnswer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})

candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
if err != nil {
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
return err
}
if err != nil {
return err
}

err = conn.OnRemoteCandidate(candidate)
if err != nil {
log.Errorf("error handling CANDIATE from %s", msg.Key)
return err
case sProto.Body_CANDIDATE:

candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
if err != nil {
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
return err
}

err = conn.OnRemoteCandidate(candidate)
if err != nil {
log.Errorf("error handling CANDIATE from %s", msg.Key)
return err
}
}
}

return nil
})
return nil
})
if err != nil {
// happens if signal is unavailable for a long time.
// We want to cancel the operation of the whole client
e.cancel()
return
}
}()

e.signal.WaitConnected()
e.signal.WaitStreamConnected()
}
27 changes: 24 additions & 3 deletions management/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package client
import (
"context"
"crypto/tls"
"fmt"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/client/system"
"github.com/wiretrustee/wiretrustee/encryption"
"github.com/wiretrustee/wiretrustee/management/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"io"
Expand Down Expand Up @@ -71,12 +73,18 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 30 * time.Minute, //stop after an 30 min of trying, the error will be propagated to the general retry of the client
MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
}

// ready indicates whether the client is okay and ready to be used
// for now it just checks whether gRPC connection to the service is ready
func (c *Client) ready() bool {
return c.conn.GetState() == connectivity.Ready
}

// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
// Blocking request. The result will be sent via msgHandler callback function
func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
Expand All @@ -85,6 +93,12 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {

operation := func() error {

log.Debugf("management connection state %v", c.conn.GetState())

if !c.ready() {
return fmt.Errorf("no connection to management")
}

// todo we already have it since we did the Login, maybe cache it locally?
serverPubKey, err := c.GetServerPublicKey()
if err != nil {
Expand All @@ -98,7 +112,7 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
return err
}

log.Infof("connected to the Management Service Stream")
log.Infof("connected to the Management Service stream")

// blocking until error
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
Expand Down Expand Up @@ -139,7 +153,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
for {
update, err := stream.Recv()
if err == io.EOF {
log.Errorf("managment stream was closed: %s", err)
log.Errorf("Management stream has been closed by server: %s", err)
return err
}
if err != nil {
Expand All @@ -165,6 +179,10 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server

// GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server)
func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
if !c.ready() {
return nil, fmt.Errorf("no connection to management")
}

mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting
defer cancel()
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
Expand All @@ -181,6 +199,9 @@ func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
}

func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) {
if !c.ready() {
return nil, fmt.Errorf("no connection to management")
}
loginReq, err := encryption.EncryptMessage(serverKey, c.key, req)
if err != nil {
log.Errorf("failed to encrypt message: %s", err)
Expand Down
Loading

0 comments on commit ed1e4df

Please sign in to comment.