Skip to content

Commit

Permalink
add support for peerschangedpatches in new poll
Browse files Browse the repository at this point in the history
Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Mar 4, 2024
1 parent 385bb5d commit a612def
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 6 deletions.
16 changes: 16 additions & 0 deletions hscontrol/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func (m *Mapper) PeerChangedResponse(
mapRequest tailcfg.MapRequest,
node *types.Node,
changed map[types.NodeID]bool,
patches []*tailcfg.PeerChange,
pol *policy.ACLPolicy,
messages ...string,
) ([]byte, error) {
Expand Down Expand Up @@ -319,6 +320,21 @@ func (m *Mapper) PeerChangedResponse(

resp.PeersRemoved = removedIDs

// Sending patches as a part of a PeersChanged response
// is technically not suppose to be done, but they are
// applied after the PeersChanged. The patch list
// should _only_ contain Nodes that are not in the
// PeersChanged or PeersRemoved list and the caller
// should filter them out.
//
// From tailcfg docs:
// These are applied after Peers* above, but in practice the
// control server should only send these on their own, without
// the Peers* fields also set.
if patches != nil {
resp.PeersChangedPatch = patches
}

return m.marshalMapResponse(mapRequest, &resp, node, mapRequest.Compress, messages...)
}

Expand Down
60 changes: 54 additions & 6 deletions hscontrol/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (m *mapSession) serve() {

// true means changed, false means removed
var changed map[types.NodeID]bool
var patches []*tailcfg.PeerChange
var derp bool

// Set full to true to immediatly send a full mapresponse
Expand All @@ -209,8 +210,8 @@ func (m *mapSession) serve() {
var err error

// If a full update has been requested, then send it immediately
// otherwise wait for the "batching"
if full || (changed != nil && time.Since(prev) > wait) {
// otherwise wait for the "batching" of changes or patches
if full || ((changed != nil || patches != nil) && time.Since(prev) > wait) {
// Ensure the node object is updated, for example, there
// might have been a hostinfo update in a sidechannel
// which contains data needed to generate a map response.
Expand All @@ -221,19 +222,66 @@ func (m *mapSession) serve() {
return
}

// If there are patches _and_ fully changed nodes, filter the
// patches and remove all patches that are present for the full
// changes updates. This allows us to send them as part of the
// PeerChange update, but only for nodes that are not fully changed.
// The fully changed nodes will be updated from the database and
// have all the updates needed.
// This means that the patches left are for nodes that has no
// updates that requires a full update.
// Patches are not suppose to be mixed in, but can be.
//
// From tailcfg docs:
// These are applied after Peers* above, but in practice the
// control server should only send these on their own, without
//
// Currently, there is no effort to merge patch updates, they
// are all sent, and the client will apply them in order.
// TODO(kradalby): Merge Patches for the same IDs to send less
// data and give the client less work.
if patches != nil && changed != nil {
var filteredPatches []*tailcfg.PeerChange

for _, patch := range patches {
if _, ok := changed[types.NodeID(patch.NodeID)]; !ok {
filteredPatches = append(filteredPatches, patch)
}
}

patches = filteredPatches
}

// When deciding what update to send, the following is considered,
// Full is a superset of all updates, when a full update is requested,
// send only that and move on, all other updates will be present in
// a full map response.
//
// If a map of changed nodes exists, prefer sending that as it will
// contain all the updates for the node, including patches, as it
// is fetched freshly from the database when building the response.
//
// If there is full changes registered, but we have patches for individual
// nodes, send them.
//
// Finally, if a DERP map is the only request, send that alone.
if full {
m.tracef("Sending Full MapResponse")
data, err = m.mapper.FullMapResponse(m.req, m.node, m.h.ACLPolicy)
} else if changed != nil {
m.tracef(fmt.Sprintf("Sending Changed MapResponse: %v", lastMessage))
data, err = m.mapper.PeerChangedResponse(m.req, m.node, changed, patches, m.h.ACLPolicy, lastMessage)
} else if patches != nil {
m.tracef(fmt.Sprintf("Sending Changed Patch MapResponse: %v", lastMessage))
data, err = m.mapper.PeerChangedPatchResponse(m.req, m.node, patches, m.h.ACLPolicy)
} else if derp {
m.tracef("Sending DERPUpdate MapResponse")
data, err = m.mapper.DERPMapResponse(m.req, m.node, m.h.DERPMap)
} else {
m.tracef(fmt.Sprintf("Sending Changed MapResponse: %v", lastMessage))
data, err = m.mapper.PeerChangedResponse(m.req, m.node, changed, m.h.ACLPolicy, lastMessage)
}

// reset
changed = nil
patches = nil
lastMessage = ""
full = false
derp = false
Expand Down Expand Up @@ -298,7 +346,7 @@ func (m *mapSession) serve() {

lastMessage = update.Message
case types.StatePeerChangedPatch:
// TODO(kradalby):
patches = append(patches, update.ChangePatches...)
case types.StatePeerRemoved:
if changed == nil {
changed = make(map[types.NodeID]bool)
Expand Down

0 comments on commit a612def

Please sign in to comment.