Skip to content

Commit

Permalink
This is just ridiculous now
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed May 12, 2023
1 parent 42fc265 commit 726cedf
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 54 deletions.
3 changes: 3 additions & 0 deletions src/io_uring/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ struct us_timer_t {

struct us_loop_t {
struct io_uring ring;
struct io_uring_buf_ring *buf_ring;

struct us_timer_t *timer;


struct us_socket_context_t *head;
struct us_socket_context_t *iterator;
Expand Down
2 changes: 1 addition & 1 deletion src/io_uring/io_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ struct us_socket_t *us_socket_context_connect(int ssl, struct us_socket_context_


struct iovec iovecs = {s->sendBuf, 16 * 1024};
printf("register: %d\n", io_uring_register_buffers_update_tag(&context->loop->ring, s->dd, &iovecs, 0, 1));
//printf("register: %d\n", io_uring_register_buffers_update_tag(&context->loop->ring, s->dd, &iovecs, 0, 1));


io_uring_sqe_set_data(sqe, (char *)s + SOCKET_CONNECT);
Expand Down
77 changes: 26 additions & 51 deletions src/io_uring/io_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ char bufs[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0};
int group_id = 1337;



void add_provide_buf(struct io_uring *ring, __u16 bid, unsigned gid) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_provide_buffers(sqe, bufs[bid], MAX_MESSAGE_LEN, 1, gid, bid);
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);

io_uring_sqe_set_data64(sqe, 6);
}

/* This functions should never run recursively */
void us_internal_timer_sweep(struct us_loop_t *loop) {
struct us_loop_t *loop_data = loop;
Expand Down Expand Up @@ -158,7 +149,7 @@ void us_loop_run(struct us_loop_t *loop) {

// register this send buffer as registered buffer (using the DD of the socket as index!)
struct iovec iovecs = {s->sendBuf, 16 * 1024};
printf("register: %d\n", io_uring_register_buffers_update_tag(&loop->ring, s->dd, &iovecs, 0, 1));
//printf("register: %d\n", io_uring_register_buffers_update_tag(&loop->ring, s->dd, &iovecs, 0, 1));

int sock_conn_fd = cqe->res;
// only read when there is no error, >= 0
Expand All @@ -184,7 +175,13 @@ void us_loop_run(struct us_loop_t *loop) {
int bid = cqe->flags >> 16;
if (cqe->res <= 0) {
// read failed, re-add the buffer
add_provide_buf(&loop->ring, bid, group_id);

//add_provide_buf(&loop->ring, bid, group_id);

io_uring_buf_ring_add(loop->buf_ring, bufs[bid], MAX_MESSAGE_LEN, bid, io_uring_buf_ring_mask(4096), 0);
loop->buf_ring->tail++;


// connection closed or error
//close(conn_i.fd);
struct io_uring_sqe *sqe = io_uring_get_sqe(&loop->ring);
Expand All @@ -207,11 +204,9 @@ void us_loop_run(struct us_loop_t *loop) {

s->context->on_data(s, bufs[bid], bytes_read);

struct io_uring_sqe *sqe = io_uring_get_sqe(&loop->ring);
io_uring_prep_provide_buffers(sqe, bufs[bid], MAX_MESSAGE_LEN, 1, group_id, bid);
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);

io_uring_sqe_set_data64(sqe, 6); // nothing we handle
io_uring_buf_ring_add(loop->buf_ring, bufs[bid], MAX_MESSAGE_LEN, bid, io_uring_buf_ring_mask(4096), 0);
loop->buf_ring->tail++;


//add_socket_write(&loop->ring, s->dd, bid, bytes_read, IOSQE_FIXED_FILE);
Expand All @@ -222,7 +217,7 @@ void us_loop_run(struct us_loop_t *loop) {
// add a new read for the existing connection

} else if (type == SOCKET_CONNECT) {
printf("we are connectred: %d\n", cqe->res);
//printf("we are connectred: %d\n", cqe->res);

struct us_socket_t *s = object;

Expand Down Expand Up @@ -329,14 +324,9 @@ struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t
//loop->post_cb = post_cb;
//loop->iteration_nr = 0;

// initialize io_uring
struct io_uring_params params;
//struct io_uring ring;
memset(&params, 0, sizeof(params));

params.flags = IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER;
//params.sq_thread_idle = 10000;

if (io_uring_queue_init_params(2048, &loop->ring, &params) < 0) {
perror("io_uring_init_failed...\n");
exit(1);
Expand All @@ -346,41 +336,26 @@ struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t
exit(1);
}

if (io_uring_register_buffers_sparse(&loop->ring, 1024)) {
exit(1);
}
io_uring_register_ring_fd(&loop->ring);

// check if IORING_FEAT_FAST_POLL is supported
if (!(params.features & IORING_FEAT_FAST_POLL)) {
printf("IORING_FEAT_FAST_POLL not available in the kernel, quiting...\n");
exit(0);
}
// create buffer ring here
struct io_uring_buf_reg reg = {0};
posix_memalign(&reg.ring_addr, 1024 * 4, sizeof(struct io_uring_buf) * 4096);
reg.ring_entries = 4096;
reg.bgid = 1337;
loop->buf_ring = reg.ring_addr;

// check if buffer selection is supported
struct io_uring_probe *probe;
probe = io_uring_get_probe_ring(&loop->ring);
if (!probe || !io_uring_opcode_supported(probe, IORING_OP_PROVIDE_BUFFERS)) {
printf("Buffer select not supported, skipping...\n");
exit(0);
// registrera buffer ring bvuffer
if (io_uring_register_buf_ring(&loop->ring, &reg, 0)) {
printf("Failed to register ring\n");
exit(1);
}
io_uring_free_probe(probe);
io_uring_buf_ring_init(loop->buf_ring);

// register buffers for buffer selection
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;

sqe = io_uring_get_sqe(&loop->ring);
io_uring_prep_provide_buffers(sqe, bufs, MAX_MESSAGE_LEN, BUFFERS_COUNT, group_id, 0);

// also register these buffers as fixed

io_uring_submit(&loop->ring);
io_uring_wait_cqe(&loop->ring, &cqe);
if (cqe->res < 0) {
printf("cqe->res = %d\n", cqe->res);
exit(1);
for (int i = 0; i < 4096; i++) {
io_uring_buf_ring_add(loop->buf_ring, bufs[i], MAX_MESSAGE_LEN, i, io_uring_buf_ring_mask(4096), i);
}
io_uring_cqe_seen(&loop->ring, cqe);
io_uring_buf_ring_advance(loop->buf_ring, 4096);

return loop;
}
Expand Down
4 changes: 2 additions & 2 deletions src/io_uring/io_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ int us_socket_write(int ssl, struct us_socket_t *s, const char *data, int length

//printf("writing on socket now\n");

if (data != s->sendBuf) {
//if (data != s->sendBuf) {
//printf("WHAT THE HECK!\n");
memcpy(s->sendBuf, data, length);
}
//}


struct io_uring_sqe *sqe = io_uring_get_sqe(&s->context->loop->ring);
Expand Down

0 comments on commit 726cedf

Please sign in to comment.