Skip to content

Commit

Permalink
add units to bandwidth and latency
Browse files Browse the repository at this point in the history
  • Loading branch information
pfandzelter committed Apr 30, 2024
1 parent 50acd9a commit d7d7d0d
Show file tree
Hide file tree
Showing 23 changed files with 233 additions and 256 deletions.
6 changes: 4 additions & 2 deletions celestial/proto_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ def make_update_request_iter(
group=_machineID_group(target),
id=_machineID_id(target),
),
latency=typing.cast(int, celestial.types.Link_latency_us(link)),
bandwidth=typing.cast(
latency_us=typing.cast(
int, celestial.types.Link_latency_us(link)
),
bandwidth_kbits=typing.cast(
int, celestial.types.Link_bandwidth_kbits(link)
),
blocked=False,
Expand Down
14 changes: 7 additions & 7 deletions docs/runtime/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ Returns:
"shell": 1,
"id": 10,
},
"delay": 10000,
"bandwidth": 10000,
"delay_us": 10000,
"bandwidth_kbits": 10000,
"blocked": false,
"segments": [
{
Expand All @@ -223,8 +223,8 @@ Returns:
"shell": 1,
"id": 9,
},
"delay": 4000,
"bandwidth": 20000,
"delay_us": 4000,
"bandwidth_kbits": 20000,
},
{
"source":{
Expand All @@ -235,11 +235,11 @@ Returns:
"shell": 1,
"id": 10,
},
"delay": 6000,
"bandwidth": 10000,
"delay_us": 6000,
"bandwidth_kbits": 10000,
},
],
}
```

Note that `delay` is in microseconds and `bandwidth` in kbit/s.
Note that `delay_us` is in microseconds and `bandwidth_kbits` in kbit/s.
19 changes: 8 additions & 11 deletions pkg/ebpfem/ebpf/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ struct

static inline int throttle_flow(struct __sk_buff *skb, __u32 ip_address, uint32_t *throttle_rate_kbps)
{
// use ip as key in map
int key = ip_address;

// find out if the packet should be dropped (i.e. if the rate is 0)
if (*throttle_rate_kbps == 0)
Expand All @@ -68,6 +66,9 @@ static inline int throttle_flow(struct __sk_buff *skb, __u32 ip_address, uint32_
// return TC_ACT_OK;
}

// use ip as key in map
uint32_t key = ip_address;

// when was the last packet sent?
uint64_t *last_tstamp = bpf_map_lookup_elem(&flow_map, &key);
// calculate delay between packets based on bandwidth and packet size (kbps = byte/1000/second)
Expand All @@ -92,7 +93,6 @@ static inline int throttle_flow(struct __sk_buff *skb, __u32 ip_address, uint32_
// if it does not work, drop the packet
if (bpf_map_update_elem(&flow_map, &key, &tstamp, BPF_ANY))
return TC_ACT_SHOT;
// return TC_ACT_OK;

return TC_ACT_OK;
}
Expand All @@ -108,7 +108,6 @@ static inline int throttle_flow(struct __sk_buff *skb, __u32 ip_address, uint32_
// update last timestamp in map
if (bpf_map_update_elem(&flow_map, &key, &next_tstamp, BPF_EXIST))
return TC_ACT_SHOT;
// return TC_ACT_OK;

// set delayed timestamp for packet
skb->tstamp = next_tstamp;
Expand All @@ -119,20 +118,18 @@ static inline int throttle_flow(struct __sk_buff *skb, __u32 ip_address, uint32_

static inline int inject_delay(struct __sk_buff *skb, uint32_t *delay_us)
{
uint64_t delay_ns;
uint64_t now = bpf_ktime_get_ns();
delay_ns = (*delay_us) * NS_PER_US;
uint64_t ts = skb->tstamp;
uint64_t new_ts = ((uint64_t)skb->tstamp) + delay_ns;
uint64_t delay_ns = (*delay_us) * NS_PER_US;

// sometimes skb-tstamp is reset to 0
// https://patchwork.kernel.org/project/netdevbpf/patch/[email protected]/
// check if skb->tstamp == 0
if (ts == 0)
if (skb->tstamp == 0)
{
skb->tstamp = now + delay_ns;
skb->tstamp = bpf_ktime_get_ns() + delay_ns;
return TC_ACT_OK;
}

uint64_t new_ts = ((uint64_t)skb->tstamp) + delay_ns;
// otherwise add additional delay to packets
skb->tstamp = new_ts;

Expand Down
6 changes: 3 additions & 3 deletions pkg/ebpfem/ebpfem.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (v *vm) getHBD(target net.IPNet) *handleKbpsDelay {
return hbd
}

func (e *EBPFem) SetBandwidth(source orchestrator.MachineID, target net.IPNet, bandwidth uint64) error {
func (e *EBPFem) SetBandwidth(source orchestrator.MachineID, target net.IPNet, bandwidthKbits uint64) error {
e.RLock()
v, ok := e.vms[source]
e.RUnlock()
Expand All @@ -140,7 +140,7 @@ func (e *EBPFem) SetBandwidth(source orchestrator.MachineID, target net.IPNet, b

hbd := v.getHBD(target)

hbd.throttleRateKbps = uint32(bandwidth)
hbd.throttleRateKbps = uint32(bandwidthKbits)

ips, err := parseNetToLongs(target)

Expand All @@ -149,7 +149,7 @@ func (e *EBPFem) SetBandwidth(source orchestrator.MachineID, target net.IPNet, b
}

for _, ip := range ips {
log.Tracef("updating bandwidth for %d to %d", ip, bandwidth)
log.Tracef("updating bandwidth for %d to %d", ip, bandwidthKbits)
err = v.objs.IP_HANDLE_KBPS_DELAY.Put(ip, hbd)
if err != nil {
return errors.WithStack(err)
Expand Down
Binary file modified pkg/ebpfem/edt_x86_bpfel.o
Binary file not shown.
4 changes: 2 additions & 2 deletions pkg/ebpfem/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func getIface(name string) (netlink.Link, error) {
return iface, nil
}

// CreateClsactQdisc creates clsact qdisc. This is necessary for the tc egress hook. The packets are forwarded to network_simulation.c.
// CreateClsactQdisc creates clsact qdisc. This is necessary for the tc egress hook. The packets are forwarded to net.c.
// The eBPF-program adjusts the departure time of the packets based on the values in the eBPF-map.
func createClsactQdisc(iface netlink.Link) (*netlink.GenericQdisc, error) {
attrs := netlink.QdiscAttrs{
Expand All @@ -119,7 +119,7 @@ func createClsactQdisc(iface netlink.Link) (*netlink.GenericQdisc, error) {
}

/*
Creates fq qdisc which uses a timing wheel and then releases the pakets to the network interface based on their
Creates fq qdisc which uses a timing wheel and then releases the packets to the network interface based on their
time stamp.
*/
func createFQdisc(iface netlink.Link) (*netlink.Fq, error) {
Expand Down
20 changes: 10 additions & 10 deletions pkg/info/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ type Constellation struct {
}

type Segment struct {
Source Identifier `json:"source"`
Target Identifier `json:"target"`
Delay uint32 `json:"delay,omitempty"`
Bandwidth uint64 `json:"bandwidth,omitempty"`
Source Identifier `json:"source"`
Target Identifier `json:"target"`
DelayUs uint32 `json:"delay_us,omitempty"`
BandwidthKbps uint64 `json:"bandwidth_kbps,omitempty"`
}

// Path is returned by `/path/{source_group}/{source_id}/{target_group}/{target_id}`.
type Path struct {
Source Identifier `json:"source"`
Target Identifier `json:"target"`
Delay uint32 `json:"delay,omitempty"`
Bandwidth uint64 `json:"bandwidth,omitempty"`
Blocked bool `json:"blocked,omitempty"`
Segments []Segment `json:"segments"`
Source Identifier `json:"source"`
Target Identifier `json:"target"`
DelayUs uint32 `json:"delay_us,omitempty"`
BandwidthKbps uint64 `json:"bandwidth_kbits,omitempty"`
Blocked bool `json:"blocked,omitempty"`
Segments []Segment `json:"segments"`
}
8 changes: 4 additions & 4 deletions pkg/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,8 @@ func (i *infoserver) getPath(w http.ResponseWriter, r *http.Request) {
}

if !p.Blocked {
s.Delay = p.Latency
s.Bandwidth = p.Bandwidth
s.DelayUs = p.LatencyUs
s.BandwidthKbps = p.BandwidthKbps
s.Segments = make([]Segment, len(p.Segments))

for j, seg := range p.Segments {
Expand All @@ -425,8 +425,8 @@ func (i *infoserver) getPath(w http.ResponseWriter, r *http.Request) {
ID: seg.Target.Id,
Name: targetName,
},
Delay: seg.Latency,
Bandwidth: seg.Bandwidth,
DelayUs: seg.LatencyUs,
BandwidthKbps: seg.BandwidthKbps,
}
}
} else {
Expand Down
18 changes: 9 additions & 9 deletions pkg/netem/netem.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
)

type link struct {
blocked bool
latency uint32
bandwidth uint64
blocked bool
latencyUs uint32
bandwidthKbps uint64

// tc specific
tcIndex uint16
Expand Down Expand Up @@ -191,7 +191,7 @@ func (n *Netem) checkLink(source orchestrator.MachineID, target net.IPNet) error
return nil
}

func (n *Netem) SetBandwidth(source orchestrator.MachineID, target net.IPNet, bandwidth uint64) error {
func (n *Netem) SetBandwidth(source orchestrator.MachineID, target net.IPNet, bandwidthKbps uint64) error {
v, ok := n.vms[source]

if !ok {
Expand All @@ -207,18 +207,18 @@ func (n *Netem) SetBandwidth(source orchestrator.MachineID, target net.IPNet, ba
return err
}

err = v.updateBandwidth(target, bandwidth)
err = v.updateBandwidth(target, bandwidthKbps)

if err != nil {
return err
}

n.vms[source].links[fromIPNet(target)].bandwidth = bandwidth
n.vms[source].links[fromIPNet(target)].bandwidthKbps = bandwidthKbps

return nil
}

func (n *Netem) SetLatency(source orchestrator.MachineID, target net.IPNet, latency uint32) error {
func (n *Netem) SetLatency(source orchestrator.MachineID, target net.IPNet, latencyUs uint32) error {
v, ok := n.vms[source]

if !ok {
Expand All @@ -234,13 +234,13 @@ func (n *Netem) SetLatency(source orchestrator.MachineID, target net.IPNet, late
return err
}

err = v.updateDelay(target, latency)
err = v.updateDelay(target, latencyUs)

if err != nil {
return err
}

n.vms[source].links[fromIPNet(target)].latency = latency
n.vms[source].links[fromIPNet(target)].latencyUs = latencyUs

return nil
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/netem/tc.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func (v *vm) createQDisc(target net.IPNet) (uint16, error) {
return v.handle, nil
}

func (v *vm) updateDelay(target net.IPNet, delay uint32) error {
func (v *vm) updateDelay(target net.IPNet, delayUs uint32) error {

log.Tracef("updating delay on %s for %s to %d", v.netIf, target.String(), delay)
log.Tracef("updating delay on %s for %s to %d", v.netIf, target.String(), delayUs)

// get the index
l, ok := v.links[fromIPNet(target)]
Expand All @@ -120,8 +120,8 @@ func (v *vm) updateDelay(target net.IPNet, delay uint32) error {

// convert to milliseconds
// x.y ms
x := delay / 1000
y := delay % 1000 / 10 // ignore the last digit, netem is not that accurate anyway
x := delayUs / 1000
y := delayUs % 1000 / 10 // ignore the last digit, netem is not that accurate anyway

// tc qdisc change dev [TAP_NAME] parent 1:[INDEX] handle [INDEX]: netem delay [DELAY].0ms limit 1000000
cmd := exec.Command(TC_BIN, "qdisc", "change", "dev", v.netIf, "parent", fmt.Sprintf("1:%d", l.tcIndex), "handle", fmt.Sprintf("%d:", l.tcIndex), "netem", "delay", fmt.Sprintf("%d.%dms", x, y), "limit", "1000000")
Expand All @@ -133,9 +133,9 @@ func (v *vm) updateDelay(target net.IPNet, delay uint32) error {
return nil
}

func (v *vm) updateBandwidth(target net.IPNet, bandwidth uint64) error {
func (v *vm) updateBandwidth(target net.IPNet, bandwidthKbps uint64) error {

log.Tracef("updating bandwidth on %s for %s to %d", v.netIf, target.String(), bandwidth)
log.Tracef("updating bandwidth on %s for %s to %d", v.netIf, target.String(), bandwidthKbps)

// get the index
l, ok := v.links[fromIPNet(target)]
Expand All @@ -144,7 +144,7 @@ func (v *vm) updateBandwidth(target net.IPNet, bandwidth uint64) error {
return errors.Errorf("unknown link %s", target.String())
}

rate := fmt.Sprintf("%d.0kbit", bandwidth)
rate := fmt.Sprintf("%d.0kbit", bandwidthKbps)

// tc class change dev [TAP_NAME] parent 1: classid 1:[INDEX] htb rate [RATE] quantum 1514
cmd := exec.Command(TC_BIN, "class", "change", "dev", v.netIf, "parent", "1:", "classid", fmt.Sprintf("1:%d", l.tcIndex), "htb", "rate", rate, "quantum", "1514")
Expand Down
20 changes: 10 additions & 10 deletions pkg/orchestrator/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ type ConstellationInfo struct {
}

type SegmentInfo struct {
Source MachineID
Target MachineID
Latency uint32
Bandwidth uint64
Source MachineID
Target MachineID
LatencyUs uint32
BandwidthKbps uint64
}

type PathInfo struct {
Source MachineID
Target MachineID
Latency uint32
Bandwidth uint64
Segments []SegmentInfo
Blocked bool
Source MachineID
Target MachineID
LatencyUs uint32
BandwidthKbps uint64
Segments []SegmentInfo
Blocked bool
}

func (o *Orchestrator) InfoGetIPAddressByID(id MachineID) (net.IP, error) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/orchestrator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (l Link) String() string {
return "blocked"
}

return fmt.Sprintf("%dus %dbps (next: %s)", l.Latency, l.Bandwidth, l.Next.String())
return fmt.Sprintf("%dus %dkbps (next: %s)", l.LatencyUs, l.BandwidthKbps, l.Next.String())
}

func path(a, b MachineID, n NetworkState) (PathInfo, error) {
Expand All @@ -67,8 +67,8 @@ func path(a, b MachineID, n NetworkState) (PathInfo, error) {
return p, nil
}

p.Latency = n[a][b].Latency
p.Bandwidth = n[a][b].Bandwidth
p.LatencyUs = n[a][b].LatencyUs
p.BandwidthKbps = n[a][b].BandwidthKbps
p.Segments = make([]SegmentInfo, 0)

for a != b {
Expand All @@ -80,10 +80,10 @@ func path(a, b MachineID, n NetworkState) (PathInfo, error) {
}

s := SegmentInfo{
Source: a,
Target: hop.Next,
Latency: n[a][hop.Next].Latency,
Bandwidth: n[a][hop.Next].Bandwidth,
Source: a,
Target: hop.Next,
LatencyUs: n[a][hop.Next].LatencyUs,
BandwidthKbps: n[a][hop.Next].BandwidthKbps,
}

p.Segments = append(p.Segments, s)
Expand Down
Loading

0 comments on commit d7d7d0d

Please sign in to comment.