Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
Signed-off-by: Raguideau <[email protected]>
  • Loading branch information
Raguideau committed Apr 18, 2024
1 parent 1927a95 commit ceb1dbb
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 133 deletions.
195 changes: 97 additions & 98 deletions internal/scenario/scenarioManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ScenarioManager struct {
reportChan chan report
registrationQueue chan order
gnbWg *sync.WaitGroup
taskExecuterWg *sync.WaitGroup
taskExecutorWg *sync.WaitGroup
sigStop chan os.Signal
ueScenarios []UEScenario
stop bool
Expand Down Expand Up @@ -65,12 +65,17 @@ func Start(gnbConfs []config.GNodeB, amfConfs []*config.AMF, ueScenarios []UESce
func initManager(ueScenarios []UEScenario, timeBetweenRegistration int, timeout int) *ScenarioManager {
log.Debug("Init manager with timeBetweenRegistration = ", timeBetweenRegistration)
s := &ScenarioManager{}
s.taskExecuterWg = &sync.WaitGroup{}

s.ues = make(map[int]*ueTasksCtx)

s.taskExecutorWg = &sync.WaitGroup{}
s.gnbWg = &sync.WaitGroup{}

s.reportChan = make(chan report, 10)

s.registrationQueue = make(chan order, len(ueScenarios))
go startOrderRateController(s.registrationQueue, timeBetweenRegistration)
s.reportChan = make(chan report, 10)
s.ues = make(map[int]*ueTasksCtx)

s.sigStop = make(chan os.Signal, 1)
signal.Notify(s.sigStop, os.Interrupt)
if timeout > 0 {
Expand All @@ -82,22 +87,87 @@ func initManager(ueScenarios []UEScenario, timeBetweenRegistration int, timeout
return s
}

func startOrderRateController(orderChan <-chan order, timeBetweenRegistration int) {
waitTime := time.Duration(timeBetweenRegistration) * time.Millisecond
orderTicker := time.NewTicker(waitTime)
var order order
var open bool
for {
<-orderTicker.C
orderTicker.Stop()
order, open = <-orderChan
if open {
order.workerChan <- order.task
orderTicker.Reset(waitTime)
} else {
return
}
}
}

func (s *ScenarioManager) setupGnbs(gnbConfs []config.GNodeB, amfConfs []*config.AMF) {
s.gnbs = make(map[string]*context.GNBContext)

for gnbConf := range gnbConfs {
s.gnbs[gnbConfs[gnbConf].PlmnList.GnbId] = InitGnb(gnbConfs[gnbConf], amfConfs, s.gnbWg)
s.gnbWg.Add(1)
}

// Wait for gNB to be connected before registering UEs
// TODO: We should wait for NGSetupResponse instead
time.Sleep(2 * time.Second)
}

func (s *ScenarioManager) setupScenarios(ueScenarios []UEScenario) {
s.stop = false
s.ueScenarios = ueScenarios
for ueId := 1; ueId <= len(ueScenarios); ueId++ {
if err := s.setupUeTaskExecuter(ueId, ueScenarios[ueId-1]); err != nil {
if err := s.setupUeTaskExecutor(ueId, ueScenarios[ueId-1]); err != nil {
log.Errorf("scenario for UE %d could not be started: %v", ueId, err)
}
select {
case <-s.sigStop:
s.stop = true
ueId = len(ueScenarios) + 1 // exit loop
default:
}
}
}

func (s *ScenarioManager) setupUeTaskExecutor(ueId int, ueScenario UEScenario) error {
if ueId < 1 {
return errors.New("ueId must be at least 1")
}
_, exist := s.ues[ueId]
if exist {
return errors.New("this ue already have a task executor")
}
if ueScenario.Tasks == nil {
return errors.New("tasks list is nil")
}
if ueScenario.Config == (config.Ue{}) {
return errors.New("config is empty")
}
if s.reportChan == nil {
return errors.New("scenario manager's report channel is not set")
}
if s.gnbs == nil {
return errors.New("scenario manager's gnb list is not set")
}
ue := ueTasksCtx{
done: false,
nextTasks: ueScenario.Tasks,
workerChan: make(chan Task, 1),
}

worker := ueTaskExecutor{
UeId: ueId,
UeCfg: ueScenario.Config,
TaskChan: ue.workerChan,
ReportChan: s.reportChan,
Gnbs: s.gnbs,
Wg: s.taskExecutorWg,
}
worker.Run()

s.ues[ueId] = &ue
return nil
}

func (s *ScenarioManager) executeScenarios() {
for !s.stop {
select {
Expand All @@ -121,28 +191,6 @@ func (s *ScenarioManager) executeScenarios() {
}
}

func (s *ScenarioManager) cleanup() {
drainAndCloseOrderRateController(s.registrationQueue)

go func() {
open := true
var report report
for open {
report, open = <-s.reportChan
log.Debug("Got report from UE ", report.ueId, " during cleanup: ", report.reason)
}
}()

for _, ue := range s.ues {
ue.workerChan <- Task{
TaskType: Terminate,
}
}

s.taskExecuterWg.Wait()
close(s.reportChan)
}

func (s *ScenarioManager) handleReport(report report) {
if !report.success {
s.ues[report.ueId].nextTasks = []Task{}
Expand Down Expand Up @@ -194,75 +242,26 @@ func (s *ScenarioManager) restartUeScenario(ueId int, ueScenario UEScenario) err
return nil
}

func (s *ScenarioManager) setupGnbs(gnbConfs []config.GNodeB, amfConfs []*config.AMF) {
s.gnbs = make(map[string]*context.GNBContext)

for gnbConf := range gnbConfs {
s.gnbs[gnbConfs[gnbConf].PlmnList.GnbId] = InitGnb(gnbConfs[gnbConf], amfConfs, s.gnbWg)
s.gnbWg.Add(1)
}

// Wait for gNB to be connected before registering UEs
// TODO: We should wait for NGSetupResponse instead
time.Sleep(2 * time.Second)
}

func (s *ScenarioManager) setupUeTaskExecuter(ueId int, ueScenario UEScenario) error {
if ueId < 1 {
return errors.New("ueId must be at least 1")
}
_, exist := s.ues[ueId]
if exist {
return errors.New("this ue already have a task executer")
}
if ueScenario.Tasks == nil {
return errors.New("tasks list is nil")
}
if ueScenario.Config == (config.Ue{}) {
return errors.New("config is empty")
}
if s.reportChan == nil {
return errors.New("scenario manager's report channel is not set")
}
if s.gnbs == nil {
return errors.New("scenario manager's gnb list is not set")
}
ue := ueTasksCtx{
done: false,
nextTasks: ueScenario.Tasks,
workerChan: make(chan Task, 1),
}

worker := ueTaskExecuter{
UeId: ueId,
UeCfg: ueScenario.Config,
TaskChan: ue.workerChan,
ReportChan: s.reportChan,
Gnbs: s.gnbs,
Wg: s.taskExecuterWg,
}
worker.Run()
func (s *ScenarioManager) cleanup() {
drainAndCloseOrderRateController(s.registrationQueue)

s.ues[ueId] = &ue
return nil
}
go func() {
open := true
var report report
for open {
report, open = <-s.reportChan
log.Debug("Got report from UE ", report.ueId, " during cleanup: ", report.reason)
}
}()

func startOrderRateController(orderChan <-chan order, timeBetweenRegistration int) {
waitTime := time.Duration(timeBetweenRegistration) * time.Millisecond
orderTicker := time.NewTicker(waitTime)
var order order
var open bool
for {
<-orderTicker.C
orderTicker.Stop()
order, open = <-orderChan
if open {
order.workerChan <- order.task
orderTicker.Reset(waitTime)
} else {
return
for _, ue := range s.ues {
ue.workerChan <- Task{
TaskType: Terminate,
}
}

s.taskExecutorWg.Wait()
close(s.reportChan)
}

func drainAndCloseOrderRateController(orderChan chan order) {
Expand Down
26 changes: 13 additions & 13 deletions internal/scenario/scenarioManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func getDefaultManager() ScenarioManager {
ue.workerChan = make(chan Task, 5)
ue.nextTasks = []Task{}
s.ues[defaultUeId] = ue
s.taskExecuterWg = &sync.WaitGroup{}
s.taskExecutorWg = &sync.WaitGroup{}
s.gnbWg = &sync.WaitGroup{}
return s
}
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestSetupGnbs(t *testing.T) {
}
}

func TestSetupUeTaskExecuter(t *testing.T) {
func TestSetupUeTaskExecutor(t *testing.T) {
type input struct {
manager ScenarioManager
ueScenario UEScenario
Expand Down Expand Up @@ -714,8 +714,8 @@ func TestSetupUeTaskExecuter(t *testing.T) {
s := scenario.input.manager
s.reportChan = make(chan report, 1)
s.gnbs = map[string]*context.GNBContext{scenario.input.gnbs.GetGnbId(): scenario.input.gnbs}
s.taskExecuterWg = &sync.WaitGroup{}
s.setupUeTaskExecuter(scenario.input.ueId, scenario.input.ueScenario)
s.taskExecutorWg = &sync.WaitGroup{}
s.setupUeTaskExecutor(scenario.input.ueId, scenario.input.ueScenario)

time.Sleep(time.Duration(100) * time.Millisecond)

Expand All @@ -733,7 +733,7 @@ func TestSetupUeTaskExecuter(t *testing.T) {

}

func TestSetupUeTaskExecuterHasError(t *testing.T) {
func TestSetupUeTaskExecutorHasError(t *testing.T) {

gnbConf := config.GNodeB{
ControlIF: config.ControlIF{
Expand Down Expand Up @@ -785,19 +785,19 @@ func TestSetupUeTaskExecuterHasError(t *testing.T) {
s := getDefaultManager()
s.reportChan = make(chan report, 1)
s.gnbs = map[string]*context.GNBContext{gnb.GetGnbId(): gnb}
s.taskExecuterWg = &sync.WaitGroup{}
s.taskExecutorWg = &sync.WaitGroup{}
ueid := 0
assert.Error(t, s.setupUeTaskExecuter(ueid, ueScenario))
assert.Error(t, s.setupUeTaskExecutor(ueid, ueScenario))
})
t.Run("Task Executer already exists", func(t *testing.T) {
t.Run("Task Executor already exists", func(t *testing.T) {

s := getDefaultManager()
s.reportChan = make(chan report, 1)
s.gnbs = map[string]*context.GNBContext{gnb.GetGnbId(): gnb}
s.taskExecuterWg = &sync.WaitGroup{}
s.taskExecutorWg = &sync.WaitGroup{}
ueid := 0
s.setupUeTaskExecuter(ueid, ueScenario)
assert.Error(t, s.setupUeTaskExecuter(ueid, ueScenario))
s.setupUeTaskExecutor(ueid, ueScenario)
assert.Error(t, s.setupUeTaskExecutor(ueid, ueScenario))
})
}

Expand Down Expand Up @@ -841,8 +841,8 @@ func TestStartOrderRateController(t *testing.T) {
func TestCleanup(t *testing.T) {

s := getDefaultManager()
s.taskExecuterWg.Add(1)
time.AfterFunc(time.Duration(1)*time.Second, func() { s.taskExecuterWg.Done() })
s.taskExecutorWg.Add(1)
time.AfterFunc(time.Duration(1)*time.Second, func() { s.taskExecutorWg.Done() })
s.reportChan = make(chan report, 2)
s.reportChan <- report{ueId: 1, reason: "test"}
s.reportChan <- report{ueId: 2, reason: "test"}
Expand Down
Loading

0 comments on commit ceb1dbb

Please sign in to comment.