Skip to content

Commit

Permalink
Add low overhead payload protocol. Fixes #2427
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Sep 20, 2024
1 parent e9b9ff6 commit 57a3448
Show file tree
Hide file tree
Showing 18 changed files with 5,437 additions and 107 deletions.
2 changes: 1 addition & 1 deletion common/inspect/circuit_inspections.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type XgressDetail struct {
XgressPointer string `json:"xgressPointer"`
LinkSendBufferPointer string `json:"linkSendBufferPointer"`
Goroutines []string `json:"goroutines"`
Sequence int32 `json:"sequence"`
Sequence uint64 `json:"sequence"`
Flags string `json:"flags"`
}

Expand Down
2 changes: 2 additions & 0 deletions router/handler_link/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/forwarder"
metrics2 "github.com/openziti/ziti/router/metrics"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xlink"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,6 +80,7 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error {
binding.AddTypedReceiveHandler(newControlHandler(self.xlink, self.forwarder))
binding.AddPeekHandler(metrics2.NewChannelPeekHandler(self.xlink.Id(), self.forwarder.MetricsRegistry()))
binding.AddPeekHandler(trace.NewChannelPeekHandler(self.xlink.Id(), ch, self.forwarder.TraceController()))
binding.AddTransformHandler(xgress.PayloadTransformer{})
if err := self.xlink.Init(self.forwarder.MetricsRegistry()); err != nil {
return err
}
Expand Down
38 changes: 38 additions & 0 deletions router/xgress/heartbeat_transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package xgress

import (
"encoding/binary"
"github.com/openziti/channel/v3"
"time"
)

type PayloadTransformer struct {
}

func (self PayloadTransformer) Rx(*channel.Message, channel.Channel) {}

func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel) {
if m.ContentType == channel.ContentTypeRaw && len(m.Body) > 1 {
if m.Body[0]&HeartbeatFlagMask != 0 && len(m.Body) > 12 {
now := time.Now().UnixNano()
m.PutUint64Header(channel.HeartbeatHeader, uint64(now))
binary.BigEndian.PutUint64(m.Body[len(m.Body)-8:], uint64(now))
}
}
}
2 changes: 2 additions & 0 deletions router/xgress/link_receive_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint
if payload.Sequence > buffer.maxSequence {
buffer.maxSequence = payload.Sequence
}
} else {
duplicatePayloadsMeter.Mark(1)
}
return true
}
Expand Down
68 changes: 45 additions & 23 deletions router/xgress/link_send_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"math"
"slices"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -161,7 +162,7 @@ func (buffer *LinkSendBuffer) Close() {
func (buffer *LinkSendBuffer) isBlocked() bool {
blocked := false

if buffer.windowsSize < buffer.linkRecvBufferSize {
if buffer.x.Options.TxPortalMaxSize < buffer.linkRecvBufferSize {
blocked = true
if !buffer.blockedByRemoteWindow {
buffer.blockedByRemoteWindow = true
Expand Down Expand Up @@ -202,17 +203,13 @@ func (buffer *LinkSendBuffer) run() {

for {
// bias acks, process all pending, since that should not block
processingAcks := true
for processingAcks {
select {
case ack := <-buffer.newlyReceivedAcks:
buffer.receiveAcknowledgement(ack)
case <-buffer.closeNotify:
buffer.close()
return
default:
processingAcks = false
}
select {
case ack := <-buffer.newlyReceivedAcks:
buffer.receiveAcknowledgement(ack)
case <-buffer.closeNotify:
buffer.close()
return
default:
}

// don't block when we're closing, since the only thing that should still be coming in is end-of-circuit
Expand All @@ -221,6 +218,21 @@ func (buffer *LinkSendBuffer) run() {
buffered = nil
} else {
buffered = buffer.newlyBuffered

select {
case txPayload := <-buffered:
buffer.buffer[txPayload.payload.GetSequence()] = txPayload
payloadSize := len(txPayload.payload.Data)
buffer.linkSendBufferSize += uint32(payloadSize)
atomic.AddInt64(&outstandingPayloads, 1)
atomic.AddInt64(&outstandingPayloadBytes, int64(payloadSize))
log.Tracef("buffering payload %v with size %v. payload buffer size: %v",
txPayload.payload.Sequence, len(txPayload.payload.Data), buffer.linkSendBufferSize)
case <-buffer.closeNotify:
buffer.close()
return
default:
}
}

select {
Expand Down Expand Up @@ -288,7 +300,7 @@ func (buffer *LinkSendBuffer) receiveAcknowledgement(ack *Acknowledgement) {
if buffer.windowsSize > buffer.x.Options.TxPortalMaxSize {
buffer.windowsSize = buffer.x.Options.TxPortalMaxSize
}
buffer.retxScale -= 0.02
buffer.retxScale -= 0.01
if buffer.retxScale < buffer.x.Options.RetxScale {
buffer.retxScale = buffer.x.Options.RetxScale
}
Expand Down Expand Up @@ -320,17 +332,27 @@ func (buffer *LinkSendBuffer) retransmit() {
log := pfxlog.ContextLogger(buffer.x.Label())

retransmitted := 0
var rtxList []*txPayload
for _, v := range buffer.buffer {
if v.isRetransmittable() && uint32(now-v.getAge()) >= buffer.retxThreshold {
v.markQueued()
retransmitter.queue(v)
retransmitted++
buffer.retransmits++
if buffer.retransmits >= buffer.x.Options.TxPortalRetxThresh {
buffer.accumulator = 0
buffer.retransmits = 0
buffer.scale(buffer.x.Options.TxPortalRetxScale)
}
age := v.getAge()
if age != math.MaxInt64 && v.isRetransmittable() && uint32(now-age) >= buffer.retxThreshold {
rtxList = append(rtxList, v)
}
}

slices.SortFunc(rtxList, func(a, b *txPayload) int {
return int(a.payload.Sequence - b.payload.Sequence)
})

for _, v := range rtxList {
v.markQueued()
retransmitter.queue(v)
retransmitted++
buffer.retransmits++
if buffer.retransmits >= buffer.x.Options.TxPortalRetxThresh {
buffer.accumulator = 0
buffer.retransmits = 0
buffer.scale(buffer.x.Options.TxPortalRetxScale)
}
}

Expand Down
124 changes: 77 additions & 47 deletions router/xgress/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,45 +62,80 @@ func (o Originator) String() string {
return "Terminator"
}

type Flag uint32
type PayloadFlag uint32

const (
PayloadFlagCircuitEnd Flag = 1
PayloadFlagOriginator Flag = 2
PayloadFlagCircuitStart Flag = 4
PayloadFlagChunk Flag = 8
PayloadFlagCircuitEnd PayloadFlag = 1
PayloadFlagOriginator PayloadFlag = 2
PayloadFlagCircuitStart PayloadFlag = 4
PayloadFlagChunk PayloadFlag = 8
)

func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement {
return &Acknowledgement{
CircuitId: circuitId,
Flags: SetOriginatorFlag(0, originator),
}
}

type Acknowledgement struct {
type Header struct {
CircuitId string
Flags uint32
RecvBufferSize uint32
RTT uint16
Sequence []int32
}

func (ack *Acknowledgement) GetCircuitId() string {
return ack.CircuitId
func (header *Header) GetCircuitId() string {
return header.CircuitId
}

func (ack *Acknowledgement) GetFlags() uint32 {
return ack.Flags
func (header *Header) GetFlags() uint32 {
return header.Flags
}

func (ack *Acknowledgement) GetOriginator() Originator {
if isFlagSet(ack.Flags, PayloadFlagOriginator) {
func (header *Header) GetOriginator() Originator {
if isPayloadFlagSet(header.Flags, PayloadFlagOriginator) {
return Terminator
}
return Initiator
}

func (header *Header) unmarshallHeader(msg *channel.Message) error {
circuitId, ok := msg.Headers[HeaderKeyCircuitId]
if !ok {
return fmt.Errorf("no circuitId found in xgress payload message")
}

// If no flags are present, it just means no flags have been set
flags, _ := msg.GetUint32Header(HeaderKeyFlags)

header.CircuitId = string(circuitId)
header.Flags = flags
if header.RecvBufferSize, ok = msg.GetUint32Header(HeaderKeyRecvBufferSize); !ok {
header.RecvBufferSize = math.MaxUint32
}

header.RTT, _ = msg.GetUint16Header(HeaderKeyRTT)

return nil
}

func (header *Header) marshallHeader(msg *channel.Message) {
msg.Headers[HeaderKeyCircuitId] = []byte(header.CircuitId)
if header.Flags != 0 {
msg.PutUint32Header(HeaderKeyFlags, header.Flags)
}

msg.PutUint32Header(HeaderKeyRecvBufferSize, header.RecvBufferSize)
}

func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement {
return &Acknowledgement{
Header: Header{
CircuitId: circuitId,
Flags: SetOriginatorFlag(0, originator),
},
}
}

type Acknowledgement struct {
Header
Sequence []int32
}

func (ack *Acknowledgement) GetSequence() []int32 {
return ack.Sequence
}
Expand Down Expand Up @@ -139,33 +174,16 @@ func (ack *Acknowledgement) unmarshallSequence(data []byte) error {
func (ack *Acknowledgement) Marshall() *channel.Message {
msg := channel.NewMessage(ContentTypeAcknowledgementType, ack.marshallSequence())
msg.PutUint16Header(HeaderKeyRTT, ack.RTT)
msg.Headers[HeaderKeyCircuitId] = []byte(ack.CircuitId)
if ack.Flags != 0 {
msg.PutUint32Header(HeaderKeyFlags, ack.Flags)
}
msg.PutUint32Header(HeaderKeyRecvBufferSize, ack.RecvBufferSize)
ack.marshallHeader(msg)
return msg
}

func UnmarshallAcknowledgement(msg *channel.Message) (*Acknowledgement, error) {
ack := &Acknowledgement{}

circuitId, ok := msg.Headers[HeaderKeyCircuitId]
if !ok {
return nil, fmt.Errorf("no circuitId found in xgress payload message")
}

// If no flags are present, it just means no flags have been set
flags, _ := msg.GetUint32Header(HeaderKeyFlags)

ack.CircuitId = string(circuitId)
ack.Flags = flags
if ack.RecvBufferSize, ok = msg.GetUint32Header(HeaderKeyRecvBufferSize); !ok {
ack.RecvBufferSize = math.MaxUint32
if err := ack.unmarshallHeader(msg); err != nil {
return nil, err
}

ack.RTT, _ = msg.GetUint16Header(HeaderKeyRTT)

if err := ack.unmarshallSequence(msg.Body); err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,13 +215,25 @@ type Payload struct {
Sequence int32
Headers map[uint8][]byte
Data []byte
raw []byte
}

func (payload *Payload) GetSequence() int32 {
return payload.Sequence
}

func (payload *Payload) Marshall() *channel.Message {
if payload.raw != nil {
if payload.raw[0]&RttFlagMask != 0 {
rtt := uint16(info.NowInMilliseconds())
b0 := byte(rtt)
b1 := byte(rtt >> 8)
payload.raw[2] = b0
payload.raw[3] = b1
}
return channel.NewMessage(channel.ContentTypeRaw, payload.raw)
}

msg := channel.NewMessage(ContentTypePayloadType, payload.Data)
addPayloadHeadersToMsg(msg, payload.Headers)
msg.Headers[HeaderKeyCircuitId] = []byte(payload.CircuitId)
Expand Down Expand Up @@ -263,12 +293,12 @@ func UnmarshallPayload(msg *channel.Message) (*Payload, error) {
return payload, nil
}

func isFlagSet(flags uint32, flag Flag) bool {
return Flag(flags)&flag == flag
func isPayloadFlagSet(flags uint32, flag PayloadFlag) bool {
return PayloadFlag(flags)&flag == flag
}

func setPayloadFlag(flags uint32, flag Flag) uint32 {
return uint32(Flag(flags) | flag)
func setPayloadFlag(flags uint32, flag PayloadFlag) uint32 {
return uint32(PayloadFlag(flags) | flag)
}

func (payload *Payload) GetCircuitId() string {
Expand All @@ -280,15 +310,15 @@ func (payload *Payload) GetFlags() uint32 {
}

func (payload *Payload) IsCircuitEndFlagSet() bool {
return isFlagSet(payload.Flags, PayloadFlagCircuitEnd)
return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitEnd)
}

func (payload *Payload) IsCircuitStartFlagSet() bool {
return isFlagSet(payload.Flags, PayloadFlagCircuitStart)
return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitStart)
}

func (payload *Payload) GetOriginator() Originator {
if isFlagSet(payload.Flags, PayloadFlagOriginator) {
if isPayloadFlagSet(payload.Flags, PayloadFlagOriginator) {
return Terminator
}
return Initiator
Expand Down
Loading

0 comments on commit 57a3448

Please sign in to comment.