From f83bb9194e718df9e3eb3a664d17ec491072cdc0 Mon Sep 17 00:00:00 2001 From: Mohsin Kazmi Date: Thu, 11 Jul 2024 14:57:55 +0100 Subject: [PATCH] gomemif: Improves gomemif library (#216) This patch fixes support for multiqueue. It improves interrupt handling. It also implement interface details to string function. It improves the icmp interrupt example. Signed-off-by: Mohsin Kazmi --- .../icmp_responder_cb/icmp_responder_cb.go | 49 ++++++++++------- extras/gomemif/memif/control_channel.go | 40 ++++++++------ extras/gomemif/memif/interface.go | 55 +++++++++++++++---- extras/gomemif/memif/memif.go | 43 ++++++++------- extras/gomemif/memif/packet_reader.go | 26 ++++----- extras/gomemif/memif/packet_writer.go | 32 +++++------ 6 files changed, 147 insertions(+), 98 deletions(-) diff --git a/extras/gomemif/examples/icmp_responder_cb/icmp_responder_cb.go b/extras/gomemif/examples/icmp_responder_cb/icmp_responder_cb.go index b8038c51..ebb0a792 100644 --- a/extras/gomemif/examples/icmp_responder_cb/icmp_responder_cb.go +++ b/extras/gomemif/examples/icmp_responder_cb/icmp_responder_cb.go @@ -47,8 +47,8 @@ func Disconnected(i *memif.Interface) error { return nil } -func Responder(i *memif.Interface) error { - data, ok := i.GetPrivateData().(*interfaceData) +func Responder(itf *memif.Interface, rx_qid int) error { + data, ok := itf.GetPrivateData().(*interfaceData) if !ok { return fmt.Errorf("Invalid private data") } @@ -57,41 +57,47 @@ func Responder(i *memif.Interface) error { data.wg.Add(1) // allocate packet buffers - pkt := i.Pkt + pkt := itf.Pkt var tx_bufs []memif.MemifPacketBuffer for i := range pkt { pkt[i].Buf = make([]byte, 2048) pkt[i].Buflen = 2048 } + // get rx queue - rxq0, err := i.GetRxQueue(0) + rxq, err := itf.GetRxQueue(rx_qid) if err != nil { return err } + // As this is an example, we will use the same queue id for transmit. + // i.e. if rx_queue id is 1, we will use tx_queue id 1. // get tx queue - txq0, err := i.GetTxQueue(0) + txq, err := itf.GetTxQueue(rx_qid) if err != nil { return err } - _ = txq0 + _ = txq - nPackets, err := rxq0.Rx_burst(pkt) + nPackets, err := rxq.Rx_burst(pkt) if err != nil { return err } fmt.Println(nPackets) - rxq0.Refill(int(nPackets)) + + rxq.Refill(int(nPackets)) _ = err for i := 0; i < int(nPackets); i++ { gopkt := gopacket.NewPacket(pkt[i].Buf[:pkt[i].Buflen], layers.LayerTypeEthernet, gopacket.NoCopy) etherLayer := gopkt.Layer(layers.LayerTypeEthernet) + // received frame src mac address will become trasmit frame dst mac address. + tx_dstMAC := etherLayer.(*layers.Ethernet).SrcMAC if etherLayer.(*layers.Ethernet).EthernetType == layers.EthernetTypeARP { rEth := layers.Ethernet{ SrcMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}, - DstMAC: net.HardwareAddr{0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + DstMAC: tx_dstMAC, EthernetType: layers.EthernetTypeARP, } @@ -103,7 +109,7 @@ func Responder(i *memif.Interface) error { Operation: layers.ARPReply, SourceHwAddress: []byte(net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}), SourceProtAddress: []byte("\xc0\xa8\x01\x01"), - DstHwAddress: []byte(net.HardwareAddr{0x02, 0xfe, 0x08, 0x88, 0x45, 0x7f}), + DstHwAddress: []byte(tx_dstMAC), DstProtAddress: []byte("\xc0\xa8\x01\x02"), } buf := gopacket.NewSerializeBuffer() @@ -113,7 +119,7 @@ func Responder(i *memif.Interface) error { } gopacket.SerializeLayers(buf, opts, &rEth, &rArp) // write packet to shared memory - txq0.WritePacket(buf.Bytes()) + txq.WritePacket(buf.Bytes()) } if etherLayer.(*layers.Ethernet).EthernetType == layers.EthernetTypeIPv4 { ipLayer := gopkt.Layer(layers.LayerTypeIPv4) @@ -136,8 +142,8 @@ func Responder(i *memif.Interface) error { // Build packet layers. ethResp := layers.Ethernet{ - DstMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}, - SrcMAC: []byte(net.HardwareAddr{0x02, 0xfe, 0x08, 0x88, 0x45, 0x7f}), + SrcMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}, + DstMAC: []byte(tx_dstMAC), EthernetType: layers.EthernetTypeIPv4, } @@ -172,10 +178,11 @@ func Responder(i *memif.Interface) error { } } - txq0.Tx_burst(tx_bufs) - return nil + txq.Tx_burst(tx_bufs) + return nil } + func Connected(i *memif.Interface) error { data, ok := i.GetPrivateData().(*interfaceData) _ = data @@ -186,9 +193,10 @@ func Connected(i *memif.Interface) error { i.Pkt = make([]memif.MemifPacketBuffer, 64) // get rx queue - rxq0, err := i.GetRxQueue(0) - _ = err - rxq0.Refill(0) + for j := 0; j < int(i.GetMemoryConfig().NumQueuePairs); j++ { + rxq, _ := i.GetRxQueue(j) + rxq.Refill(0) + } return nil } @@ -246,6 +254,7 @@ func main() { } data := interfaceData{} + MemoryConfig := memif.MemoryConfig{NumQueuePairs: 2, Log2RingSize: 11} args := &memif.Arguments{ IsMaster: isMaster, ConnectedFunc: Connected, @@ -253,6 +262,7 @@ func main() { PrivateData: &data, Name: *name, InterruptFunc: Responder, + MemoryConfig: MemoryConfig, } i, err := socket.NewInterface(args) @@ -293,8 +303,7 @@ func main() { // start polling for events on this socket socket.StartPolling(memifErrChan) case "show": - fmt.Println("remote: ", i.GetRemoteName()) - fmt.Println("peer: ", i.GetPeerName()) + fmt.Print(i.String()) case "exit": err = socket.StopPolling() if err != nil { diff --git a/extras/gomemif/memif/control_channel.go b/extras/gomemif/memif/control_channel.go index c7aa3c31..041f40bc 100644 --- a/extras/gomemif/memif/control_channel.go +++ b/extras/gomemif/memif/control_channel.go @@ -131,10 +131,10 @@ func (socket *Socket) StartPolling(errChan chan<- error) { } for ev := 0; ev < num; ev++ { - if events[0].Fd == socket.wakeEvent.Fd { + if events[ev].Fd == socket.wakeEvent.Fd { continue } - err = socket.handleEvent(&events[0]) + err = socket.handleEvent(&events[ev]) if err != nil { errChan <- fmt.Errorf("handleEvent: ", err) } @@ -236,11 +236,17 @@ func (socket *Socket) handleEvent(event *syscall.EpollEvent) error { if socket.listener != nil && socket.listener.event.Fd == event.Fd { return socket.listener.handleEvent(event) } - intf := socket.interfaceList.Back().Value.(*Interface) - if intf.args.InterruptFunc != nil { - if int(event.Fd) == int(intf.args.InterruptFd) { - intf.onInterrupt(intf) - return nil + for elt := socket.interfaceList.Front(); elt != nil; elt = elt.Next() { + intf := elt.Value.(*Interface) + if intf.args.InterruptFunc != nil { + for rx_qid := 0; rx_qid < int(intf.GetMemoryConfig().NumQueuePairs); rx_qid++ { + queue, _ := intf.GetRxQueue(rx_qid) + interruptFd, _ := queue.GetEventFd() + if int(event.Fd) == interruptFd { + intf.onInterrupt(intf, rx_qid) + return nil + } + } } } @@ -767,18 +773,18 @@ func (cc *controlChannel) parseConnect() (err error) { if err != nil { return err } - q, err := cc.i.GetRxQueue(0) i := cc.i - if err != nil { - return err - } - if i.args.IsMaster { - i.args.InterruptFd = uint16(q.interruptFd) - } - err = i.socket.addInterrupt(q.interruptFd) - if err != nil { - return err + for j := 0; j < int(i.run.NumQueuePairs); j++ { + q, err := cc.i.GetRxQueue(j) + if err != nil { + return err + } + + err = i.socket.addInterrupt(q.interruptFd) + if err != nil { + return err + } } cc.isConnected = true diff --git a/extras/gomemif/memif/interface.go b/extras/gomemif/memif/interface.go index fa3eb732..9db2cf42 100644 --- a/extras/gomemif/memif/interface.go +++ b/extras/gomemif/memif/interface.go @@ -61,7 +61,7 @@ type ConnectedFunc func(i *Interface) error // DisconnectedFunc is a callback called when an interface is disconnected type DisconnectedFunc func(i *Interface) error -type InterruptFunc func(i *Interface) error +type InterruptFunc func(i *Interface, rx_qid int) error // MemoryConfig represents shared memory configuration type MemoryConfig struct { @@ -383,7 +383,7 @@ func (i *Interface) addRegion(hasPacketBuffers bool, hasRings bool) (err error) var r memoryRegion if hasRings { - r.packetBufferOffset = uint32((i.run.NumQueuePairs + i.run.NumQueuePairs) * (ringSize + descSize*(1< 0; nSlots-- { - q.setDescLength(head&mask, int(q.i.run.PacketBufferSize)) + q.setDescLength(head&int(mask), int(q.i.run.PacketBufferSize)) head++ } q.writeHead(head) @@ -101,11 +101,11 @@ refill: // ReadPacket reads one packet form the shared memory and // returns the number of packets func (q *Queue) Rx_burst(pkt []MemifPacketBuffer) (uint16, error) { - var mask int = q.ring.size - 1 + var mask uint32 = uint32(q.ring.size - 1) var slot int var lastSlot int - var length int - var offset int + var length uint32 + var offset uint32 var nSlots uint16 var desc descBuf = newDescBuf() @@ -127,11 +127,11 @@ func (q *Queue) Rx_burst(pkt []MemifPacketBuffer) (uint16, error) { rx := 0 for nSlots > 0 { // copy descriptor from shm - q.getDescBuf(slot&mask, desc) + q.getDescBuf(slot&int(mask), desc) length = desc.getLength() offset = desc.getOffset() copy(pkt[rx].Buf[:], q.i.regions[desc.getRegion()].data[offset:offset+length]) - pkt[rx].Buflen = length + pkt[rx].Buflen = int(length) rx++ nSlots-- slot++ diff --git a/extras/gomemif/memif/packet_writer.go b/extras/gomemif/memif/packet_writer.go index fb87a53e..02179715 100644 --- a/extras/gomemif/memif/packet_writer.go +++ b/extras/gomemif/memif/packet_writer.go @@ -20,10 +20,10 @@ package memif // WritePacket writes one packet to the shared memory and // returns the number of bytes written func (q *Queue) WritePacket(pkt []byte) int { - var mask int = q.ring.size - 1 + var mask uint32 = uint32(q.ring.size - 1) var slot int var nFree uint16 - var packetBufferSize int = int(q.i.run.PacketBufferSize) + var packetBufferSize uint32 = q.i.run.PacketBufferSize if q.i.args.IsMaster { slot = q.readTail() @@ -40,7 +40,7 @@ func (q *Queue) WritePacket(pkt []byte) int { // copy descriptor from shm desc := newDescBuf() - q.getDescBuf(slot&mask, desc) + q.getDescBuf(slot&int(mask), desc) // reset flags desc.setFlags(0) // reset length @@ -52,7 +52,7 @@ func (q *Queue) WritePacket(pkt []byte) int { // write packet into memif buffer n := copy(q.i.regions[desc.getRegion()].data[offset:offset+packetBufferSize], pkt[:]) - desc.setLength(n) + desc.setLength(uint32(n)) for n < len(pkt) { nFree-- if nFree == 0 { @@ -60,11 +60,11 @@ func (q *Queue) WritePacket(pkt []byte) int { return 0 } desc.setFlags(descFlagNext) - q.putDescBuf(slot&mask, desc) + q.putDescBuf(slot&int(mask), desc) slot++ // copy descriptor from shm - q.getDescBuf(slot&mask, desc) + q.getDescBuf(slot&int(mask), desc) // reset flags desc.setFlags(0) // reset length @@ -75,12 +75,12 @@ func (q *Queue) WritePacket(pkt []byte) int { offset := desc.getOffset() tmp := copy(q.i.regions[desc.getRegion()].data[offset:offset+packetBufferSize], pkt[:]) - desc.setLength(tmp) + desc.setLength(uint32(tmp)) n += tmp } // copy descriptor to shm - q.putDescBuf(slot&mask, desc) + q.putDescBuf(slot&int(mask), desc) slot++ if q.i.args.IsMaster { @@ -95,10 +95,10 @@ func (q *Queue) WritePacket(pkt []byte) int { } func (q *Queue) Tx_burst(pkt []MemifPacketBuffer) int { - var mask int = q.ring.size - 1 + var mask uint32 = uint32(q.ring.size - 1) var slot int var nFree uint16 - var packetBufferSize int = int(q.i.run.PacketBufferSize) + var packetBufferSize uint32 = q.i.run.PacketBufferSize if q.i.args.IsMaster { slot = q.readTail() @@ -117,7 +117,7 @@ func (q *Queue) Tx_burst(pkt []MemifPacketBuffer) int { for i := 0; i < len(pkt); i++ { // copy descriptor from shm desc := newDescBuf() - q.getDescBuf(slot&mask, desc) + q.getDescBuf(slot&int(mask), desc) // reset flags desc.setFlags(0) // reset length @@ -128,7 +128,7 @@ func (q *Queue) Tx_burst(pkt []MemifPacketBuffer) int { offset := desc.getOffset() // write packet into memif buffer n = copy(q.i.regions[desc.getRegion()].data[offset:offset+packetBufferSize], pkt[i].Buf[:]) - desc.setLength(n) + desc.setLength(uint32(n)) for n < len(pkt[i].Buf) { nFree-- if nFree == 0 { @@ -136,11 +136,11 @@ func (q *Queue) Tx_burst(pkt []MemifPacketBuffer) int { return 0 } desc.setFlags(descFlagNext) - q.putDescBuf(slot&mask, desc) + q.putDescBuf(slot&int(mask), desc) slot++ // copy descriptor from shm - q.getDescBuf(slot&mask, desc) + q.getDescBuf(slot&int(mask), desc) // reset flags desc.setFlags(0) // reset length @@ -151,12 +151,12 @@ func (q *Queue) Tx_burst(pkt []MemifPacketBuffer) int { offset := desc.getOffset() tmp := copy(q.i.regions[desc.getRegion()].data[offset:offset+packetBufferSize], pkt[i].Buf[:]) - desc.setLength(tmp) + desc.setLength(uint32(tmp)) n += tmp } // copy descriptor to shm - q.putDescBuf(slot&mask, desc) + q.putDescBuf(slot&int(mask), desc) slot++ } if q.i.args.IsMaster {