Skip to content

Commit

Permalink
Move routers to a subscription model where they subcribe to model dat…
Browse files Browse the repository at this point in the history
…a from one controller for a limited time. Fixes #2599
  • Loading branch information
plorenz committed Dec 18, 2024
1 parent 3b4d520 commit 31d90ad
Show file tree
Hide file tree
Showing 23 changed files with 1,205 additions and 466 deletions.
23 changes: 12 additions & 11 deletions common/event_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type EventCache interface {
CurrentIndex() (uint64, bool)

// ReplayFrom returns an array of events from startIndex and true if the replay may be facilitated.
// An empty slice and true is returned in cases where the requested startIndex is the current index.
// An empty slice and true is returned in cases where the requested startIndex is greater than the current index.
// An empty slice and false is returned in cases where the replay cannot be facilitated.
// This function is blocking.
ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.DataState_ChangeSet, bool)
Expand Down Expand Up @@ -211,6 +211,12 @@ func (cache *LoggingEventCache) ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.D
_, eventFound := cache.Events[startIndex]

if !eventFound {
// if we're asked to replay an index we haven't reached yet, return an empty list
headIndex := cache.Log[cache.HeadLogIndex]
if headIndex < startIndex {
return nil, true
}

return nil, false
}

Expand All @@ -228,27 +234,22 @@ func (cache *LoggingEventCache) ReplayFrom(startIndex uint64) ([]*edge_ctrl_pb.D
return nil, false
}

// no replay
if *startLogIndex == cache.HeadLogIndex {
return nil, true
}

// ez replay
if *startLogIndex < cache.HeadLogIndex {
// replay, no loop required
if *startLogIndex <= cache.HeadLogIndex {
var result []*edge_ctrl_pb.DataState_ChangeSet
for _, key := range cache.Log[*startLogIndex:cache.HeadLogIndex] {
for _, key := range cache.Log[*startLogIndex : cache.HeadLogIndex+1] {
result = append(result, cache.Events[key])
}
return result, true
}

//looping replay
// looping replay
var result []*edge_ctrl_pb.DataState_ChangeSet
for _, key := range cache.Log[*startLogIndex:] {
result = append(result, cache.Events[key])
}

for _, key := range cache.Log[0:cache.HeadLogIndex] {
for _, key := range cache.Log[:cache.HeadLogIndex+1] {
result = append(result, cache.Events[key])
}

Expand Down
Loading

0 comments on commit 31d90ad

Please sign in to comment.