Skip to content

Commit

Permalink
gomemif: Improves gomemif library (#216)
Browse files Browse the repository at this point in the history
This patch fixes support for multiqueue.
It improves interrupt handling.
It also implement interface details to string function.
It improves the icmp interrupt example.

Signed-off-by: Mohsin Kazmi <[email protected]>
  • Loading branch information
mohsinkazmi authored Jul 11, 2024
1 parent 38d7d39 commit f83bb91
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 98 deletions.
49 changes: 29 additions & 20 deletions extras/gomemif/examples/icmp_responder_cb/icmp_responder_cb.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func Disconnected(i *memif.Interface) error {
return nil
}

func Responder(i *memif.Interface) error {
data, ok := i.GetPrivateData().(*interfaceData)
func Responder(itf *memif.Interface, rx_qid int) error {
data, ok := itf.GetPrivateData().(*interfaceData)
if !ok {
return fmt.Errorf("Invalid private data")
}
Expand All @@ -57,41 +57,47 @@ func Responder(i *memif.Interface) error {
data.wg.Add(1)

// allocate packet buffers
pkt := i.Pkt
pkt := itf.Pkt
var tx_bufs []memif.MemifPacketBuffer
for i := range pkt {
pkt[i].Buf = make([]byte, 2048)
pkt[i].Buflen = 2048
}

// get rx queue
rxq0, err := i.GetRxQueue(0)
rxq, err := itf.GetRxQueue(rx_qid)
if err != nil {
return err
}
// As this is an example, we will use the same queue id for transmit.
// i.e. if rx_queue id is 1, we will use tx_queue id 1.
// get tx queue
txq0, err := i.GetTxQueue(0)
txq, err := itf.GetTxQueue(rx_qid)
if err != nil {
return err
}
_ = txq0
_ = txq

nPackets, err := rxq0.Rx_burst(pkt)
nPackets, err := rxq.Rx_burst(pkt)
if err != nil {
return err
}

fmt.Println(nPackets)
rxq0.Refill(int(nPackets))

rxq.Refill(int(nPackets))
_ = err

for i := 0; i < int(nPackets); i++ {
gopkt := gopacket.NewPacket(pkt[i].Buf[:pkt[i].Buflen], layers.LayerTypeEthernet, gopacket.NoCopy)
etherLayer := gopkt.Layer(layers.LayerTypeEthernet)

// received frame src mac address will become trasmit frame dst mac address.
tx_dstMAC := etherLayer.(*layers.Ethernet).SrcMAC
if etherLayer.(*layers.Ethernet).EthernetType == layers.EthernetTypeARP {
rEth := layers.Ethernet{
SrcMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa},
DstMAC: net.HardwareAddr{0xff, 0xff, 0xff, 0xff, 0xff, 0xff},
DstMAC: tx_dstMAC,

EthernetType: layers.EthernetTypeARP,
}
Expand All @@ -103,7 +109,7 @@ func Responder(i *memif.Interface) error {
Operation: layers.ARPReply,
SourceHwAddress: []byte(net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}),
SourceProtAddress: []byte("\xc0\xa8\x01\x01"),
DstHwAddress: []byte(net.HardwareAddr{0x02, 0xfe, 0x08, 0x88, 0x45, 0x7f}),
DstHwAddress: []byte(tx_dstMAC),
DstProtAddress: []byte("\xc0\xa8\x01\x02"),
}
buf := gopacket.NewSerializeBuffer()
Expand All @@ -113,7 +119,7 @@ func Responder(i *memif.Interface) error {
}
gopacket.SerializeLayers(buf, opts, &rEth, &rArp)
// write packet to shared memory
txq0.WritePacket(buf.Bytes())
txq.WritePacket(buf.Bytes())
}
if etherLayer.(*layers.Ethernet).EthernetType == layers.EthernetTypeIPv4 {
ipLayer := gopkt.Layer(layers.LayerTypeIPv4)
Expand All @@ -136,8 +142,8 @@ func Responder(i *memif.Interface) error {

// Build packet layers.
ethResp := layers.Ethernet{
DstMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa},
SrcMAC: []byte(net.HardwareAddr{0x02, 0xfe, 0x08, 0x88, 0x45, 0x7f}),
SrcMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa},
DstMAC: []byte(tx_dstMAC),

EthernetType: layers.EthernetTypeIPv4,
}
Expand Down Expand Up @@ -172,10 +178,11 @@ func Responder(i *memif.Interface) error {

}
}
txq0.Tx_burst(tx_bufs)
return nil
txq.Tx_burst(tx_bufs)

return nil
}

func Connected(i *memif.Interface) error {
data, ok := i.GetPrivateData().(*interfaceData)
_ = data
Expand All @@ -186,9 +193,10 @@ func Connected(i *memif.Interface) error {
i.Pkt = make([]memif.MemifPacketBuffer, 64)

// get rx queue
rxq0, err := i.GetRxQueue(0)
_ = err
rxq0.Refill(0)
for j := 0; j < int(i.GetMemoryConfig().NumQueuePairs); j++ {
rxq, _ := i.GetRxQueue(j)
rxq.Refill(0)
}

return nil
}
Expand Down Expand Up @@ -246,13 +254,15 @@ func main() {
}

data := interfaceData{}
MemoryConfig := memif.MemoryConfig{NumQueuePairs: 2, Log2RingSize: 11}
args := &memif.Arguments{
IsMaster: isMaster,
ConnectedFunc: Connected,
DisconnectedFunc: Disconnected,
PrivateData: &data,
Name: *name,
InterruptFunc: Responder,
MemoryConfig: MemoryConfig,
}

i, err := socket.NewInterface(args)
Expand Down Expand Up @@ -293,8 +303,7 @@ func main() {
// start polling for events on this socket
socket.StartPolling(memifErrChan)
case "show":
fmt.Println("remote: ", i.GetRemoteName())
fmt.Println("peer: ", i.GetPeerName())
fmt.Print(i.String())
case "exit":
err = socket.StopPolling()
if err != nil {
Expand Down
40 changes: 23 additions & 17 deletions extras/gomemif/memif/control_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func (socket *Socket) StartPolling(errChan chan<- error) {
}

for ev := 0; ev < num; ev++ {
if events[0].Fd == socket.wakeEvent.Fd {
if events[ev].Fd == socket.wakeEvent.Fd {
continue
}
err = socket.handleEvent(&events[0])
err = socket.handleEvent(&events[ev])
if err != nil {
errChan <- fmt.Errorf("handleEvent: ", err)
}
Expand Down Expand Up @@ -236,11 +236,17 @@ func (socket *Socket) handleEvent(event *syscall.EpollEvent) error {
if socket.listener != nil && socket.listener.event.Fd == event.Fd {
return socket.listener.handleEvent(event)
}
intf := socket.interfaceList.Back().Value.(*Interface)
if intf.args.InterruptFunc != nil {
if int(event.Fd) == int(intf.args.InterruptFd) {
intf.onInterrupt(intf)
return nil
for elt := socket.interfaceList.Front(); elt != nil; elt = elt.Next() {
intf := elt.Value.(*Interface)
if intf.args.InterruptFunc != nil {
for rx_qid := 0; rx_qid < int(intf.GetMemoryConfig().NumQueuePairs); rx_qid++ {
queue, _ := intf.GetRxQueue(rx_qid)
interruptFd, _ := queue.GetEventFd()
if int(event.Fd) == interruptFd {
intf.onInterrupt(intf, rx_qid)
return nil
}
}
}
}

Expand Down Expand Up @@ -767,18 +773,18 @@ func (cc *controlChannel) parseConnect() (err error) {
if err != nil {
return err
}
q, err := cc.i.GetRxQueue(0)
i := cc.i
if err != nil {
return err
}
if i.args.IsMaster {
i.args.InterruptFd = uint16(q.interruptFd)

}
err = i.socket.addInterrupt(q.interruptFd)
if err != nil {
return err
for j := 0; j < int(i.run.NumQueuePairs); j++ {
q, err := cc.i.GetRxQueue(j)
if err != nil {
return err
}

err = i.socket.addInterrupt(q.interruptFd)
if err != nil {
return err
}
}
cc.isConnected = true

Expand Down
55 changes: 43 additions & 12 deletions extras/gomemif/memif/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type ConnectedFunc func(i *Interface) error
// DisconnectedFunc is a callback called when an interface is disconnected
type DisconnectedFunc func(i *Interface) error

type InterruptFunc func(i *Interface) error
type InterruptFunc func(i *Interface, rx_qid int) error

// MemoryConfig represents shared memory configuration
type MemoryConfig struct {
Expand Down Expand Up @@ -383,7 +383,7 @@ func (i *Interface) addRegion(hasPacketBuffers bool, hasRings bool) (err error)
var r memoryRegion

if hasRings {
r.packetBufferOffset = uint32((i.run.NumQueuePairs + i.run.NumQueuePairs) * (ringSize + descSize*(1<<i.run.Log2RingSize)))
r.packetBufferOffset = uint32(uint32(i.run.NumQueuePairs+i.run.NumQueuePairs) * (ringSize + descSize*(1<<i.run.Log2RingSize)))
} else {
r.packetBufferOffset = 0
}
Expand Down Expand Up @@ -442,7 +442,7 @@ func (i *Interface) initializeQueues() (err error) {
desc = newDescBuf()
desc.setFlags(0)
desc.setRegion(0)
desc.setLength(int(i.run.PacketBufferSize))
desc.setLength(i.run.PacketBufferSize)

for qid := 0; qid < int(i.run.NumQueuePairs); qid++ {
/* TX */
Expand All @@ -459,15 +459,13 @@ func (i *Interface) initializeQueues() (err error) {
return err
}

i.args.InterruptFd = uint16(q.interruptFd)

q.putRing()
i.txQueues = append(i.txQueues, *q)

for j := 0; j < q.ring.size; j++ {
slot = qid*q.ring.size + j
desc.setOffset(int(i.regions[0].packetBufferOffset + uint32(slot)*i.run.PacketBufferSize))
q.putDescBuf(slot, desc)
desc.setOffset(i.regions[q.ring.region].packetBufferOffset + uint32(slot)*i.run.PacketBufferSize)
q.putDescBuf(j, desc)
}
}

Expand All @@ -490,17 +488,14 @@ func (i *Interface) initializeQueues() (err error) {
if err != nil {
return err
}
if !i.args.IsMaster {
i.args.InterruptFd = uint16(q.interruptFd)
}
i.socket.addInterrupt(q.interruptFd)
q.putRing()
i.rxQueues = append(i.rxQueues, *q)

for j := 0; j < q.ring.size; j++ {
slot = qid*q.ring.size + j
desc.setOffset(int(i.regions[0].packetBufferOffset + txPacketBufOffset + uint32(slot)*i.run.PacketBufferSize))
q.putDescBuf(slot, desc)
desc.setOffset(i.regions[q.ring.region].packetBufferOffset + txPacketBufOffset + uint32(slot)*i.run.PacketBufferSize)
q.putDescBuf(j, desc)
}
}

Expand Down Expand Up @@ -543,3 +538,39 @@ func (i *Interface) connect() (err error) {

return i.args.ConnectedFunc(i)
}

func (i *Interface) String() string {
socketFileName := i.GetSocket().GetFilename()
role := RoleToString(i.IsMaster())
id := i.GetId()
remoteName := i.GetRemoteName()
peerName := i.GetPeerName()
interfaceName := i.GetName()
memoryConfig := i.GetMemoryConfig()
isConnected := i.IsConnected()

result := fmt.Sprintf("Interface Name: %s\n", interfaceName)
result += fmt.Sprintf(" Socket: %s\n", socketFileName)
result += fmt.Sprintf(" Id: %v\n", id)
result += fmt.Sprintf(" Role: %s\n", role)
result += fmt.Sprintf(" Connected: %v\n", isConnected)
result += fmt.Sprintf(" Remote: %s\n", remoteName)
result += fmt.Sprintf(" Peer Interface Name: %s\n", peerName)
result += fmt.Sprintf(" Number of Queue Pairs: %v\n", memoryConfig.NumQueuePairs)
result += fmt.Sprintf(" Buffer Size: %v\n", memoryConfig.PacketBufferSize)

result += fmt.Sprintf("\n\tTX Queues\n")
for qid := 0; qid < int(memoryConfig.NumQueuePairs); qid++ {
txq, _ := i.GetTxQueue(qid)

result += fmt.Sprintf("\t\tqueue:%v Region:%v Type:%v Size:%v Offset:%v\n", qid, txq.ring.region, txq.ring.ringType, txq.ring.size, txq.ring.offset)
result += fmt.Sprintf("\t\tHead:%v Tail:%v Interrupt Fd:%v\n", txq.lastHead, txq.lastTail, txq.interruptFd)
}
result += fmt.Sprintf("\n\tRX Queues\n")
for qid := 0; qid < int(memoryConfig.NumQueuePairs); qid++ {
rxq, _ := i.GetRxQueue(qid)
result += fmt.Sprintf("\t\tqueue:%v Region:%v Type:%v Size:%v Offset:%v\n", qid, rxq.ring.region, rxq.ring.ringType, rxq.ring.size, rxq.ring.offset)
result += fmt.Sprintf("\t\tHead:%v Tail:%v Interrupt Fd:%v\n", rxq.lastHead, rxq.lastTail, rxq.interruptFd)
}
return result
}
Loading

0 comments on commit f83bb91

Please sign in to comment.