Skip to content

Commit

Permalink
Merge pull request #69 from coroot/invalidate_conn_timestamp
Browse files Browse the repository at this point in the history
Outbound connection invalidation
  • Loading branch information
def authored Feb 20, 2024
2 parents 09a9659 + adb3b7d commit ff1e357
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 42 deletions.
16 changes: 8 additions & 8 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

53 changes: 19 additions & 34 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,22 @@ struct user_msghdr {
};

static inline __attribute__((__always_inline__))
__u64 get_connection_timestamp(__u32 pid, __u64 fd) {
void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
struct sk_info sk = {};
sk.pid = pid;
sk.fd = fd;
__u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
if (timestamp) {
return *timestamp;
if (*timestamp == 0) {
return;
}
e->connection_timestamp = *timestamp;
} else {
e->connection_timestamp = 0;
}
return 0;
e->fd = fd;
e->pid = pid;
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
}

static inline __attribute__((__always_inline__))
Expand Down Expand Up @@ -241,13 +248,10 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
return 0;
}
e->protocol = PROTOCOL_POSTGRES;
e->fd = k.fd;
e->pid = k.pid;
e->method = METHOD_STATEMENT_CLOSE;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
e->payload_size = size;
COPY_PAYLOAD(e->payload, size, payload);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
}
req->protocol = PROTOCOL_POSTGRES;
Expand All @@ -262,13 +266,10 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
return 0;
}
e->protocol = PROTOCOL_MYSQL;
e->fd = k.fd;
e->pid = k.pid;
e->method = METHOD_STATEMENT_CLOSE;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
e->payload_size = size;
COPY_PAYLOAD(e->payload, size, payload);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
}
req->protocol = PROTOCOL_MYSQL;
Expand All @@ -280,23 +281,17 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
return 0;
}
e->protocol = PROTOCOL_RABBITMQ;
e->fd = k.fd;
e->pid = k.pid;
e->method = METHOD_PRODUCE;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
} else if (nats_method(payload, size) == METHOD_PRODUCE) {
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->protocol = PROTOCOL_NATS;
e->fd = k.fd;
e->pid = k.pid;
e->method = METHOD_PRODUCE;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
} else if (is_cassandra_request(payload, size, &k.stream_id)) {
req->protocol = PROTOCOL_CASSANDRA;
Expand All @@ -312,14 +307,11 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
return 0;
}
e->protocol = PROTOCOL_HTTP2;
e->fd = k.fd;
e->pid = k.pid;
e->method = METHOD_HTTP2_CLIENT_FRAMES;
e->duration = bpf_ktime_get_ns();
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
e->payload_size = size;
COPY_PAYLOAD(e->payload, size, payload);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
} else if (is_dubbo2_request(payload, size)) {
req->protocol = PROTOCOL_DUBBO2;
Expand Down Expand Up @@ -391,10 +383,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
if (!e) {
return 0;
}
e->fd = k.fd;
e->pid = k.pid;
e->protocol = PROTOCOL_UNKNOWN;
e->connection_timestamp = 0;
e->status = STATUS_UNKNOWN;
e->method = METHOD_UNKNOWN;
e->statement_id = 0;
Expand All @@ -403,15 +392,13 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
if (is_rabbitmq_consume(payload, ret)) {
e->protocol = PROTOCOL_RABBITMQ;
e->method = METHOD_CONSUME;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
}
if (nats_method(payload, ret) == METHOD_CONSUME) {
e->protocol = PROTOCOL_NATS;
e->method = METHOD_CONSUME;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
}

Expand All @@ -428,10 +415,9 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
e->protocol = PROTOCOL_HTTP2;
e->method = METHOD_HTTP2_SERVER_FRAMES;
e->duration = bpf_ktime_get_ns();
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
e->payload_size = ret;
COPY_PAYLOAD(e->payload, ret, payload);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
} else {
return 0;
Expand Down Expand Up @@ -484,8 +470,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
return 0;
}
e->duration = bpf_ktime_get_ns() - req->ns;
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
send_event(ctx, e, k.pid, k.fd);
return 0;
}

Expand Down
26 changes: 26 additions & 0 deletions ebpftracer/ebpf/tcp/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,29 @@ int sys_exit_connect(void *ctx) {
return 0;
}

static inline __attribute__((__always_inline__))
int trace_exit_accept(struct trace_event_raw_sys_exit__stub* ctx) {
if (ctx->ret < 0) {
return 0;
}
__u64 id = bpf_get_current_pid_tgid();
struct sk_info k = {};
k.pid = id >> 32;
k.fd = ctx->ret;
__u64 invalid_timestamp = 0;
bpf_map_update_elem(&connection_timestamps, &k, &invalid_timestamp, BPF_ANY);
return 0;
}

SEC("tracepoint/syscalls/sys_exit_accept")
int sys_exit_accept(struct trace_event_raw_sys_exit__stub* ctx) {
return trace_exit_accept(ctx);
}

SEC("tracepoint/syscalls/sys_exit_accept4")
int sys_exit_accept4(struct trace_event_raw_sys_exit__stub* ctx) {
return trace_exit_accept(ctx);
}



0 comments on commit ff1e357

Please sign in to comment.