Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paging netlink #23

Merged
merged 5 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ go 1.14
require (
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/davecgh/go-spew v1.1.1
github.com/free5gc/go-gtp5gnl v1.4.3
github.com/free5gc/go-gtp5gnl v1.4.4-0.20230105034620-3f4d02946ec0
github.com/free5gc/util v1.0.4-0.20221214003641-2f381be59a5a
github.com/hashicorp/go-version v1.6.0
github.com/khirono/go-genl v1.0.1
github.com/khirono/go-nl v1.0.5
github.com/khirono/go-rtnllink v1.1.1
github.com/khirono/go-rtnlroute v1.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/free5gc/go-gtp5gnl v1.4.3 h1:rNTyqNCE8sc1M6D32WFlqJhBaVDk1SG1oh5F6Jc+Q8Q=
github.com/free5gc/go-gtp5gnl v1.4.3/go.mod h1:lf7uoxlFbwgqcpleFBOj51RxETXfLWtGda5wE7cXFsc=
github.com/free5gc/go-gtp5gnl v1.4.4-0.20230105034620-3f4d02946ec0 h1:sPsn6/bN6SnziY0ExRF+au9Zxl7ERDzOkgY/Lr9VQko=
github.com/free5gc/go-gtp5gnl v1.4.4-0.20230105034620-3f4d02946ec0/go.mod h1:TT5aXB90NuSPMehuIK9lV2yJFnq6Qjw37ZqNB1QAKh0=
github.com/free5gc/util v1.0.4-0.20221214003641-2f381be59a5a h1:m17BhPzT6V2ug4ArQAgvsBB20nwR6M5cIK0kYEsayX0=
github.com/free5gc/util v1.0.4-0.20221214003641-2f381be59a5a/go.mod h1:H0DjCCEcHsdokJC7+/fowtZ85QgT6UndyaiA/NSH/LM=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
Expand Down
104 changes: 104 additions & 0 deletions internal/forwarder/buffnetlink/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package buffnetlink

import (
"encoding/binary"
"sync"
"syscall"

"github.com/khirono/go-genl"
"github.com/khirono/go-nl"
"github.com/pkg/errors"

"github.com/free5gc/go-gtp5gnl"
"github.com/free5gc/go-upf/internal/report"
)

type Server struct {
client *nl.Client
mux *nl.Mux
conn *nl.Conn
handler report.Handler
}

var native binary.ByteOrder = gtp5gnl.NativeEndian()

func OpenServer(wg *sync.WaitGroup, client *nl.Client, mux *nl.Mux) (*Server, error) {
s := &Server{
client: client,
mux: mux,
}

f, err := genl.GetFamily(s.client, "gtp5g")
if err != nil {
return nil, errors.Wrap(err, "get family")
}

s.conn, err = nl.Open(syscall.NETLINK_GENERIC, int(f.Groups[gtp5gnl.GENL_MCGRP].ID))
if err != nil {
return nil, errors.Wrap(err, "open netlink")
}

err = s.mux.PushHandler(s.conn, s)
if err != nil {
return nil, errors.Wrap(err, "push handler")
}

// wg.Add(1)
return s, nil
}

func (s *Server) Close() {
s.mux.PopHandler(s.conn)
s.conn.Close()
}

func (s *Server) Handle(handler report.Handler) {
s.handler = handler
}

func (s *Server) ServeMsg(msg *nl.Msg) bool {
b := msg.Body[genl.SizeofHeader:]

var pkt []byte
var seid uint64
var pdrid uint16
var action uint16

for len(b) > 0 {
hdr, n, err := nl.DecodeAttrHdr(b)
if err != nil {
return false
}
switch hdr.MaskedType() {
Copy link
Collaborator

@tim-ywliu tim-ywliu Dec 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ShouheiNishi

The latest report format is changed.
Please rebase your banch to the latest commit and refer to the latest internal/forwarder/buff to modify it to support DLDR & USAR report.
Thanks.

case gtp5gnl.BUFFER_ID:
pdrid = native.Uint16(b[n:])
case gtp5gnl.BUFFER_ACTION:
action = native.Uint16(b[n:])
case gtp5gnl.BUFFER_SEID:
seid = native.Uint64(b[n:])
case gtp5gnl.BUFFER_PACKET:
pkt = b[n:int(hdr.Len)]
}
b = b[hdr.Len.Align():]
}

if s.handler != nil && pkt != nil {
dldr := report.DLDReport{
PDRID: pdrid,
Action: action,
BufPkt: pkt,
}
s.handler.NotifySessReport(
report.SessReport{
SEID: seid,
Reports: []report.Report{dldr},
},
)
}

return true
}

func (s *Server) Pop(seid uint64, pdrid uint16) ([]byte, bool) {
return s.handler.PopBufPkt(seid, pdrid)
}
182 changes: 182 additions & 0 deletions internal/forwarder/buffnetlink/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package buffnetlink

import (
"bytes"
"encoding/binary"
"fmt"
"os"
"sync"
"syscall"
"testing"
"time"

"github.com/khirono/go-genl"
"github.com/khirono/go-nl"

"github.com/free5gc/go-upf/internal/report"
)

type testHandler struct {
q map[uint64]map[uint16]chan []byte
}

func NewTestHandler() *testHandler {
return &testHandler{q: make(map[uint64]map[uint16]chan []byte)}
}

func (h *testHandler) Close() {
for _, s := range h.q {
for _, q := range s {
close(q)
}
}
}

func (h *testHandler) NotifySessReport(sr report.SessReport) {
s, ok := h.q[sr.SEID]
if !ok {
return
}
for _, rep := range sr.Reports {
switch r := rep.(type) {
case report.DLDReport:
if r.Action&report.BUFF != 0 && len(r.BufPkt) > 0 {
q, ok := s[r.PDRID]
if !ok {
qlen := 10
s[r.PDRID] = make(chan []byte, qlen)
q = s[r.PDRID]
}
q <- r.BufPkt
}
default:
}
}
}

func (h *testHandler) PopBufPkt(seid uint64, pdrid uint16) ([]byte, bool) {
s, ok := h.q[seid]
if !ok {
return nil, false
}
q, ok := s[pdrid]
if !ok {
return nil, false
}
select {
case pkt := <-q:
return pkt, true
default:
return nil, false
}
}

func TestServer(t *testing.T) {
if testing.Short() {
t.Skip("skipping testing in short mode")
}

var wg sync.WaitGroup

mux, err := nl.NewMux()
if err != nil {
t.Fatal(err)
}
wg.Add(1)
go func() {
defer wg.Done()
err = mux.Serve()
if err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
}
}()

conn, err := nl.Open(syscall.NETLINK_GENERIC)
if err != nil {
t.Fatal(err)
}

c := nl.NewClient(conn, mux)
s, err := OpenServer(&wg, c, mux)
if err != nil {
t.Fatal(err)
}

f, err := genl.GetFamily(c, "gtp5g")
if err != nil {
t.Fatal(err)
}
conn.Close()

h := NewTestHandler()
defer func() {
h.Close()
s.Close()
mux.Close()
wg.Wait()
}()

fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, syscall.NETLINK_GENERIC)
if err != nil {
t.Fatal(err)
}
defer func() {
errClose := syscall.Close(fd)
if errClose != nil {
t.Fatal(errClose)
}
}()

seid := uint64(6)
h.q[seid] = make(map[uint16]chan []byte)
s.Handle(h)

pkt := []byte{
0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x10,
0x00,
0x00, 0x00,
0x0c, 0x00, 0x06, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x03, 0x00,
0x06, 0x00, 0x05, 0x00, 0x03, 0x00, 0x00, 0x00,
0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00,
0x08, 0x00, 0x04, 0x00,
0xee, 0xbb,
0xdd, 0xcc,
}

binary.LittleEndian.PutUint16(pkt[4:6], f.ID)
binary.LittleEndian.PutUint32(pkt[0:4], uint32(len(pkt)))

N := 10
for i := 0; i < N; i++ {
addr := syscall.SockaddrNetlink{
Family: syscall.AF_NETLINK,
Groups: 1 << (f.Groups[0].ID - 1),
}
err = syscall.Sendmsg(fd, pkt, nil, &addr, 0)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)

pdrid := uint16(3)
pkt, ok := s.Pop(seid, pdrid)
if !ok {
t.Fatal("not found")
}

want := []byte{0xee, 0xbb, 0xdd, 0xcc}
if !bytes.Equal(pkt, want) {
t.Errorf("want %x; but got %x\n", want, pkt)
}

_, ok = s.Pop(seid, pdrid)
if ok {
t.Fatal("found")
}
}
}
19 changes: 16 additions & 3 deletions internal/forwarder/gtp5g.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/free5gc/go-gtp5gnl"
"github.com/free5gc/go-upf/internal/forwarder/buff"
"github.com/free5gc/go-upf/internal/forwarder/buffnetlink"
"github.com/free5gc/go-upf/internal/forwarder/perio"
"github.com/free5gc/go-upf/internal/gtpv1"
"github.com/free5gc/go-upf/internal/logger"
Expand All @@ -36,6 +37,7 @@ type Gtp5g struct {
conn *nl.Conn
client *gtp5gnl.Client
bs *buff.Server
bsnl *buffnetlink.Server
ps *perio.Server
log *logrus.Entry
}
Expand Down Expand Up @@ -93,6 +95,13 @@ func OpenGtp5g(wg *sync.WaitGroup, addr string, mtu uint32) (*Gtp5g, error) {
}
g.bs = bs

bsnl, err := buffnetlink.OpenServer(wg, c.Client, mux)
if err != nil {
g.Close()
return nil, errors.Wrap(err, "open buff(netlink) server")
}
g.bsnl = bsnl

ps, err := perio.OpenServer(wg)
if err != nil {
g.Close()
Expand All @@ -117,6 +126,9 @@ func (g *Gtp5g) Close() {
if g.bs != nil {
g.bs.Close()
}
if g.bsnl != nil {
g.bsnl.Close()
}
if g.ps != nil {
g.ps.Close()
}
Expand Down Expand Up @@ -427,7 +439,7 @@ func (g *Gtp5g) CreatePDR(lSeid uint64, req *ie.IE) error {
// Not in 3GPP spec, just used for buffering
attrs = append(attrs, nl.Attr{
Type: gtp5gnl.PDR_UNIX_SOCKET_PATH,
Value: nl.AttrString(SOCKPATH),
Value: nl.AttrString(gtp5gnl.PdrAddrForNetlink),
})

oid := gtp5gnl.OID{lSeid, pdrid}
Expand Down Expand Up @@ -1450,6 +1462,7 @@ func (g *Gtp5g) QueryURR(lSeid uint64, urrid uint32) ([]report.USAReport, error)

func (g *Gtp5g) HandleReport(handler report.Handler) {
g.bs.Handle(handler)
g.bsnl.Handle(handler)
g.ps.Handle(handler, g.QueryURR)
}

Expand All @@ -1468,7 +1481,7 @@ func (g *Gtp5g) applyAction(lSeid uint64, farid int, action uint16) {
// BUFF -> DROP
for _, pdrid := range far.PDRIDs {
for {
_, ok := g.bs.Pop(lSeid, pdrid)
_, ok := g.bsnl.Pop(lSeid, pdrid)
if !ok {
break
}
Expand Down Expand Up @@ -1497,7 +1510,7 @@ func (g *Gtp5g) applyAction(lSeid uint64, farid int, action uint16) {
}
}
for {
pkt, ok := g.bs.Pop(lSeid, pdrid)
pkt, ok := g.bsnl.Pop(lSeid, pdrid)
if !ok {
break
}
Expand Down