Skip to content

Commit

Permalink
Use a channel to poll for incoming messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
ovalenti committed Apr 25, 2024
1 parent c34826e commit c3113f4
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions integration-tests/pkg/mock_sensor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type MockSensor struct {

OnCollectorRuntimeControlServiceConnect func()
collectorRuntimeControlService sensorAPI.CollectorService_CommunicateServer
runtimeFiltersAckReceived bool
receivedRuntimeMessages chan *sensorAPI.MsgFromCollector

// every event will be forwarded to these channels, to allow
// tests to look directly at the incoming data without
Expand Down Expand Up @@ -260,6 +260,8 @@ func (m *MockSensor) Start() {
m.connectionChannel = NewRingChan[*sensorAPI.NetworkConnection](gDefaultRingSize)
m.endpointChannel = NewRingChan[*sensorAPI.NetworkEndpoint](gDefaultRingSize)

m.receivedRuntimeMessages = make(chan *sensorAPI.MsgFromCollector)

go func() {
if err := m.grpcServer.Serve(m.listener); err != nil {
log.Fatalf("failed to serve: %v", err)
Expand All @@ -282,7 +284,13 @@ func (m *MockSensor) Stop() {

m.OnCollectorRuntimeControlServiceConnect = nil
m.collectorRuntimeControlService = nil
m.runtimeFiltersAckReceived = false

// unblock the communication thread
select {
case <-m.receivedRuntimeMessages:
default:
}
m.receivedRuntimeMessages = nil

m.processChannel.Stop()
m.lineageChannel.Stop()
Expand Down Expand Up @@ -519,28 +527,26 @@ func (m *MockSensor) Communicate(stream sensorAPI.CollectorService_CommunicateSe
break
}

switch v := message.GetMsg().(type) {
case *sensorAPI.MsgFromCollector_RuntimeFiltersAck:
m.logger.Printf("RuntimeFiltersAck")
m.runtimeFiltersAckReceived = true
default:
m.logger.Printf("Unknown object received through CollectorService with type %q", v)
}
m.receivedRuntimeMessages <- message
}
m.collectorRuntimeControlService = nil

return nil
}

func (m *MockSensor) WaitForRuntimeFiltersAck(timeout int) bool {
for timeout > 0 {
timeout--
if m.runtimeFiltersAckReceived {
break
t := time.After(time.Duration(timeout) * time.Second)

for {
select {
case message := <-m.receivedRuntimeMessages:
if message.GetRuntimeFiltersAck() != nil {
return true
}
case <-t:
return false
}
time.Sleep(time.Second)
}
return m.runtimeFiltersAckReceived
}

func (m *MockSensor) SendRuntimeFilters(filters *storage.RuntimeFilteringConfiguration) {
Expand Down

0 comments on commit c3113f4

Please sign in to comment.