Skip to content

Commit

Permalink
Merge pull request #58 from wenhuwang/dubbo_protocol
Browse files Browse the repository at this point in the history
Support l7 Dubbo2 protocol.
  • Loading branch information
def authored Jan 16, 2024
2 parents 7ac7ff1 + 45e7e6a commit cffd56a
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 8 deletions.
2 changes: 2 additions & 0 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,8 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
stats.observe(r.Status.String(), "", r.Duration)
case l7.ProtocolRabbitmq, l7.ProtocolNats:
stats.observe(r.Status.String(), r.Method.String(), 0)
case l7.ProtocolDubbo2:
stats.observe(r.Status.String(), "", r.Duration)
}
}

Expand Down
2 changes: 2 additions & 0 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var (
l7.ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
l7.ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
l7.ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_total", Help: "Total number of outbound DUBBO requests"},
}
L7Latency = map[l7.Protocol]prometheus.HistogramOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
Expand All @@ -110,6 +111,7 @@ var (
l7.ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
l7.ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DUBBO request"},
}
)

Expand Down
16 changes: 8 additions & 8 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

71 changes: 71 additions & 0 deletions ebpftracer/ebpf/l7/dubbo2.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// https://cn.dubbo.apache.org/zh-cn/overview/reference/protocols/tcp/
// https://github.com/apache/dubbo
#define DUBBO_HEADER_LENGTH 16
#define DUBBO_MAGIC_HIGH 0xda
#define DUBBO_MAGIC_LOW 0xbb

#define DUBBO_SERIALMASK 0x1f
#define DUBBO_FLAG_REQUEST 0x80
#define DUBBO_TWO_WAY 0x40
#define DUBBO_HEARTBEAT_EVENT 0x20

#define DUBBO_RESPONSE_OK 20
#define DUBBO_RESPONSE_CLIENT_TIMEOUT 30
#define DUBBO_RESPONSE_SERVER_TIMEOUT 31
#define DUBBO_RESPONSE_BAD_REQUEST 40
#define DUBBO_RESPONSE_BAD_RESPONSE 50
#define DUBBO_RESPONSE_SERVICE_NOT_FOUND 60
#define DUBBO_RESPONSE_SERVICE_ERROR 70
#define DUBBO_RESPONSE_SERVER_ERROR 80
#define DUBBO_RESPONSE_CLIENT_ERROR 90
#define DUBBO_RESPONSE_SERVER_THREADPOOL_EXHAUSTED_ERROR 100

static __always_inline
int is_dubbo2_request(char *buf, __u64 buf_size) {
if (buf_size < DUBBO_HEADER_LENGTH) {
return 0;
}
__u8 b[16];
bpf_read(buf, b);
if (b[0] != DUBBO_MAGIC_HIGH || b[1] != DUBBO_MAGIC_LOW) {
return 0;
}

if ((b[2] & DUBBO_SERIALMASK) == 0 || (b[2] & DUBBO_FLAG_REQUEST) == 0 || (b[2] & DUBBO_TWO_WAY) == 0 || (b[2] & DUBBO_HEARTBEAT_EVENT) != 0) {
return 0;
}

return 1;
}


static __always_inline
int is_dubbo2_response(char *buf, __u32 *status) {
__u8 b[16];
bpf_read(buf, b);
if (b[0] != DUBBO_MAGIC_HIGH || b[1] != DUBBO_MAGIC_LOW) {
return 0;
}

if ((b[2] & DUBBO_SERIALMASK) == 0 || (b[2] & DUBBO_FLAG_REQUEST) != 0 || (b[2] & DUBBO_HEARTBEAT_EVENT) != 0) {
return 0;
}

if (b[3] == DUBBO_RESPONSE_OK) {
*status = STATUS_OK;
return 1;
} else if (b[3] == DUBBO_RESPONSE_CLIENT_TIMEOUT || b[3] == DUBBO_RESPONSE_SERVER_TIMEOUT) {
*status = STATUS_FAILED;
return 1;
} else if (b[3] == DUBBO_RESPONSE_BAD_REQUEST || b[3] == DUBBO_RESPONSE_CLIENT_ERROR || b[3] == DUBBO_RESPONSE_SERVICE_NOT_FOUND) {
*status = STATUS_FAILED;
return 1;
} else if (b[3] == DUBBO_RESPONSE_BAD_RESPONSE || b[3] == DUBBO_RESPONSE_SERVICE_ERROR || b[3] == DUBBO_RESPONSE_SERVER_ERROR || b[3] == DUBBO_RESPONSE_SERVER_THREADPOOL_EXHAUSTED_ERROR) {
*status = STATUS_FAILED;
return 1;
} else {
*status = STATUS_UNKNOWN;
return 1;
}
return 0;
}
6 changes: 6 additions & 0 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define PROTOCOL_RABBITMQ 9
#define PROTOCOL_NATS 10
#define PROTOCOL_HTTP2 11
#define PROTOCOL_DUBBO2 12

#define STATUS_UNKNOWN 0
#define STATUS_OK 200
Expand Down Expand Up @@ -49,6 +50,7 @@
#include "rabbitmq.c"
#include "nats.c"
#include "http2.c"
#include "dubbo2.c"

struct l7_event {
__u64 fd;
Expand Down Expand Up @@ -319,6 +321,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
COPY_PAYLOAD(e->payload, size, payload);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
return 0;
} else if (is_dubbo2_request(payload, size)) {
req->protocol = PROTOCOL_DUBBO2;
}

if (req->protocol == PROTOCOL_UNKNOWN) {
Expand Down Expand Up @@ -472,6 +476,8 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
}
} else if (e->protocol == PROTOCOL_KAFKA) {
response = is_kafka_response(payload, req->request_id);
} else if (e->protocol == PROTOCOL_DUBBO2) {
response = is_dubbo2_response(payload, &e->status);
}

if (!response) {
Expand Down
3 changes: 3 additions & 0 deletions ebpftracer/l7/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
ProtocolRabbitmq Protocol = 9
ProtocolNats Protocol = 10
ProtocolHTTP2 Protocol = 11
ProtocolDubbo2 Protocol = 12
)

func (p Protocol) String() string {
Expand All @@ -45,6 +46,8 @@ func (p Protocol) String() string {
return "NATS"
case ProtocolHTTP2:
return "HTTP2"
case ProtocolDubbo2:
return "Dubbo2"
}
return "UNKNOWN:" + strconv.Itoa(int(p))
}
Expand Down

0 comments on commit cffd56a

Please sign in to comment.