diff --git a/internal/pfcpsim/helpers.go b/internal/pfcpsim/helpers.go index 892c05d..f0f5cae 100644 --- a/internal/pfcpsim/helpers.go +++ b/internal/pfcpsim/helpers.go @@ -4,7 +4,6 @@ package pfcpsim import ( - "context" "errors" "fmt" "net" @@ -39,15 +38,11 @@ func ConnectPFCPSim() error { sim = pfcpsim.NewPFCPClient(localAddr.String()) } - ctx, cancel := context.WithCancel(context.Background()) - - err := sim.ConnectN4(ctx, remotePeerAddress) + err := sim.ConnectN4(remotePeerAddress) if err != nil { - cancel() return err } - cancelFunc = cancel remotePeerConnected = true return nil @@ -58,9 +53,7 @@ func DisconnectPFCPSim() error { return notInit } - cancelFunc() - - return nil + return sim.TeardownAssociation() } func isConfigured() bool { diff --git a/internal/pfcpsim/state.go b/internal/pfcpsim/state.go index 34f5d08..14d0ce1 100644 --- a/internal/pfcpsim/state.go +++ b/internal/pfcpsim/state.go @@ -4,8 +4,6 @@ package pfcpsim import ( - "context" - "github.com/omec-project/pfcpsim/pkg/pfcpsim" ) @@ -18,5 +16,4 @@ var ( // Emulates 5G SMF/ 4G SGW sim *pfcpsim.PFCPClient remotePeerConnected bool - cancelFunc context.CancelFunc ) diff --git a/pkg/pfcpsim/pfcpsim.go b/pkg/pfcpsim/pfcpsim.go index 4a87ee2..1a92ff7 100644 --- a/pkg/pfcpsim/pfcpsim.go +++ b/pkg/pfcpsim/pfcpsim.go @@ -85,8 +85,8 @@ type PFCPClient struct { aliveLock sync.Mutex isAssociationActive bool - ctx context.Context - cancelHeartbeats context.CancelFunc + ctx context.Context + cancel context.CancelFunc heartbeatsChan chan *message.HeartbeatResponse recvChan chan message.Message @@ -109,7 +109,9 @@ func NewPFCPClient(localAddr string) *PFCPClient { responseTimeout: DefaultResponseTimeout, } - client.ctx = context.Background() + ctx, cancelFunc := context.WithCancel(context.Background()) + client.ctx = ctx + client.cancel = cancelFunc client.heartbeatsChan = make(chan *message.HeartbeatResponse) client.recvChan = make(chan message.Message) @@ -166,21 +168,12 @@ func (c *PFCPClient) sendMsg(msg message.Message) error { return nil } -func (c *PFCPClient) receiveFromN4(ctx context.Context) { +func (c *PFCPClient) receiveFromN4() { buf := make([]byte, 3000) for { select { - case <-ctx.Done(): - if c.cancelHeartbeats != nil { - c.cancelHeartbeats() - } - - err := c.conn.Close() - if err != nil { - fmt.Println(err) - } - + case <-c.ctx.Done(): return default: n, _, err := c.conn.ReadFrom(buf) @@ -210,7 +203,7 @@ func (c *PFCPClient) receiveFromN4(ctx context.Context) { } } -func (c *PFCPClient) ConnectN4(ctx context.Context, remoteAddr string) error { +func (c *PFCPClient) ConnectN4(remoteAddr string) error { addr := fmt.Sprintf("%s:%d", remoteAddr, PFCPStandardPort) if host, port, err := net.SplitHostPort(remoteAddr); err == nil { @@ -232,14 +225,15 @@ func (c *PFCPClient) ConnectN4(ctx context.Context, remoteAddr string) error { c.conn = rxconn - go c.receiveFromN4(ctx) + go c.receiveFromN4() return nil } func (c *PFCPClient) DisconnectN4() { - if c.cancelHeartbeats != nil { - c.cancelHeartbeats() + if c.cancel != nil { + c.cancel() + c.cancel = nil } err := c.conn.Close() @@ -330,8 +324,13 @@ func (c *PFCPClient) SendAssociationSetupRequest(ie ...*ieLib.IE) error { // SendAssociationTeardownRequest sends PFCP Teardown Request towards a peer. // A caller should make sure that the PFCP connection is established before invoking this function. func (c *PFCPClient) SendAssociationTeardownRequest(ie ...*ieLib.IE) error { + raddr, err := net.ResolveUDPAddr("udp", c.remoteAddr) + if err != nil { + return err + } + teardownReq := message.NewAssociationReleaseRequest(0, - ieLib.NewNodeID(c.conn.RemoteAddr().String(), "", ""), + ieLib.NewNodeID(raddr.String(), "", ""), ) teardownReq.IEs = append(teardownReq.IEs, ie...) @@ -400,12 +399,12 @@ func (c *PFCPClient) SendSessionDeletionRequest(localSEID uint64, remoteSEID uin return c.sendMsg(delReq) } -func (c *PFCPClient) StartHeartbeats(stopCtx context.Context) { +func (c *PFCPClient) StartHeartbeats() { ticker := time.NewTicker(DefaultHeartbeatPeriod * time.Second) for { select { - case <-stopCtx.Done(): + case <-c.ctx.Done(): return case <-ticker.C: err := c.SendAndRecvHeartbeat() @@ -460,12 +459,9 @@ func (c *PFCPClient) SetupAssociation() error { return NewInvalidResponseError(assocFailed) } - ctx, cancelFunc := context.WithCancel(c.ctx) - c.cancelHeartbeats = cancelFunc - c.setAssociationStatus(true) - go c.StartHeartbeats(ctx) + go c.StartHeartbeats() return nil } @@ -498,8 +494,9 @@ func (c *PFCPClient) TeardownAssociation() error { return NewInvalidResponseError() } - if c.cancelHeartbeats != nil { - c.cancelHeartbeats() + if c.cancel != nil { + c.cancel() + c.cancel = nil } c.setAssociationStatus(false)