Skip to content

Commit

Permalink
Expose #idle_duration - for experimental load calculation. (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Mar 4, 2024
1 parent 031e90c commit c61bb2b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 2 deletions.
24 changes: 23 additions & 1 deletion ext/io/event/selector/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ struct IO_Event_Selector_EPoll
int descriptor;
int blocked;

struct timespec idle_duration;

struct IO_Event_Interrupt interrupt;
struct IO_Event_Array descriptors;
};
Expand Down Expand Up @@ -384,6 +386,15 @@ VALUE IO_Event_Selector_EPoll_loop(VALUE self) {
return selector->backend.loop;
}

VALUE IO_Event_Selector_EPoll_idle_duration(VALUE self) {
struct IO_Event_Selector_EPoll *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_EPoll, &IO_Event_Selector_EPoll_Type, selector);

double duration = selector->idle_duration.tv_sec + (selector->idle_duration.tv_nsec / 1000000000.0);

return DBL2NUM(duration);
}

VALUE IO_Event_Selector_EPoll_close(VALUE self) {
struct IO_Event_Selector_EPoll *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_EPoll, &IO_Event_Selector_EPoll_Type, selector);
Expand Down Expand Up @@ -966,6 +977,9 @@ VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) {
struct IO_Event_Selector_EPoll *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_EPoll, &IO_Event_Selector_EPoll_Type, selector);

selector->idle_duration.tv_sec = 0;
selector->idle_duration.tv_nsec = 0;

int ready = IO_Event_Selector_queue_flush(&selector->backend);

struct select_arguments arguments = {
Expand All @@ -991,8 +1005,15 @@ VALUE IO_Event_Selector_EPoll_select(VALUE self, VALUE duration) {
arguments.timeout = make_timeout(duration, &arguments.storage);

if (!timeout_nonblocking(arguments.timeout)) {
// Wait for events to occur
struct timespec start_time;
IO_Event_Selector_current_time(&start_time);

// Wait for events to occur:
select_internal_without_gvl(&arguments);

struct timespec end_time;
IO_Event_Selector_current_time(&end_time);
IO_Event_Selector_elapsed_time(&start_time, &end_time, &selector->idle_duration);
}
}

Expand Down Expand Up @@ -1025,6 +1046,7 @@ void Init_IO_Event_Selector_EPoll(VALUE IO_Event_Selector) {
rb_define_method(IO_Event_Selector_EPoll, "initialize", IO_Event_Selector_EPoll_initialize, 1);

rb_define_method(IO_Event_Selector_EPoll, "loop", IO_Event_Selector_EPoll_loop, 0);
rb_define_method(IO_Event_Selector_EPoll, "idle_duration", IO_Event_Selector_EPoll_idle_duration, 0);

rb_define_method(IO_Event_Selector_EPoll, "transfer", IO_Event_Selector_EPoll_transfer, 0);
rb_define_method(IO_Event_Selector_EPoll, "resume", IO_Event_Selector_EPoll_resume, -1);
Expand Down
22 changes: 22 additions & 0 deletions ext/io/event/selector/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ struct IO_Event_Selector_KQueue
int descriptor;
int blocked;

struct timespec idle_duration;

#ifdef IO_EVENT_SELECTOR_KQUEUE_USE_INTERRUPT
struct IO_Event_Interrupt interrupt;
#endif
Expand Down Expand Up @@ -367,6 +369,15 @@ VALUE IO_Event_Selector_KQueue_loop(VALUE self) {
return selector->backend.loop;
}

VALUE IO_Event_Selector_KQueue_idle_duration(VALUE self) {
struct IO_Event_Selector_KQueue *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_EPoll, &IO_Event_Selector_KQueue_Type, selector);

double duration = selector->idle_duration.tv_sec + (selector->idle_duration.tv_nsec / 1000000000.0);

return DBL2NUM(duration);
}

VALUE IO_Event_Selector_KQueue_close(VALUE self) {
struct IO_Event_Selector_KQueue *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, selector);
Expand Down Expand Up @@ -949,6 +960,9 @@ VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
struct IO_Event_Selector_KQueue *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, selector);

selector->idle_duration.tv_sec = 0;
selector->idle_duration.tv_nsec = 0;

int ready = IO_Event_Selector_queue_flush(&selector->backend);

struct select_arguments arguments = {
Expand Down Expand Up @@ -985,8 +999,15 @@ VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
if (!timeout_nonblocking(arguments.timeout)) {
arguments.count = KQUEUE_MAX_EVENTS;

struct timespec start_time;
IO_Event_Selector_current_time(&start_time);

if (DEBUG) fprintf(stderr, "IO_Event_Selector_KQueue_select timeout=" PRINTF_TIMESPEC "\n", PRINTF_TIMESPEC_ARGS(arguments.storage));
select_internal_without_gvl(&arguments);

struct timespec end_time;
IO_Event_Selector_current_time(&end_time);
IO_Event_Selector_elapsed_time(&start_time, &end_time, &selector->idle_duration);
}
}

Expand Down Expand Up @@ -1041,6 +1062,7 @@ void Init_IO_Event_Selector_KQueue(VALUE IO_Event_Selector) {
rb_define_method(IO_Event_Selector_KQueue, "initialize", IO_Event_Selector_KQueue_initialize, 1);

rb_define_method(IO_Event_Selector_KQueue, "loop", IO_Event_Selector_KQueue_loop, 0);
rb_define_method(IO_Event_Selector_KQueue, "idle_duration", IO_Event_Selector_KQueue_idle_duration, 0);

rb_define_method(IO_Event_Selector_KQueue, "transfer", IO_Event_Selector_KQueue_transfer, 0);
rb_define_method(IO_Event_Selector_KQueue, "resume", IO_Event_Selector_KQueue_resume, -1);
Expand Down
22 changes: 22 additions & 0 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct IO_Event_Selector_URing
size_t pending;
int blocked;

struct timespec idle_duration;

struct IO_Event_Array completions;
struct IO_Event_List free_list;
};
Expand Down Expand Up @@ -266,6 +268,15 @@ VALUE IO_Event_Selector_URing_loop(VALUE self) {
return selector->backend.loop;
}

VALUE IO_Event_Selector_URing_idle_duration(VALUE self) {
struct IO_Event_Selector_URing *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);

double duration = selector->idle_duration.tv_sec + (selector->idle_duration.tv_nsec / 1000000000.0);

return DBL2NUM(duration);
}

VALUE IO_Event_Selector_URing_close(VALUE self) {
struct IO_Event_Selector_URing *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
Expand Down Expand Up @@ -1009,6 +1020,9 @@ VALUE IO_Event_Selector_URing_select(VALUE self, VALUE duration) {
struct IO_Event_Selector_URing *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);

selector->idle_duration.tv_sec = 0;
selector->idle_duration.tv_nsec = 0;

// Flush any pending events:
io_uring_submit_flush(selector);

Expand All @@ -1031,9 +1045,16 @@ VALUE IO_Event_Selector_URing_select(VALUE self, VALUE duration) {
arguments.timeout = make_timeout(duration, &arguments.storage);

if (!selector->backend.ready && !timeout_nonblocking(arguments.timeout)) {
struct timespec start_time;
IO_Event_Selector_current_time(&start_time);

// This is a blocking operation, we wait for events:
result = select_internal_without_gvl(&arguments);

struct timespec end_time;
IO_Event_Selector_current_time(&end_time);
IO_Event_Selector_elapsed_time(&start_time, &end_time, &selector->idle_duration);

// After waiting/flushing the SQ, check if there are any completions:
if (result > 0) {
result = select_process_completions(selector);
Expand Down Expand Up @@ -1083,6 +1104,7 @@ void Init_IO_Event_Selector_URing(VALUE IO_Event_Selector) {
rb_define_method(IO_Event_Selector_URing, "initialize", IO_Event_Selector_URing_initialize, 1);

rb_define_method(IO_Event_Selector_URing, "loop", IO_Event_Selector_URing_loop, 0);
rb_define_method(IO_Event_Selector_URing, "idle_duration", IO_Event_Selector_URing_idle_duration, 0);

rb_define_method(IO_Event_Selector_URing, "transfer", IO_Event_Selector_URing_transfer, 0);
rb_define_method(IO_Event_Selector_URing, "resume", IO_Event_Selector_URing_resume, -1);
Expand Down
4 changes: 4 additions & 0 deletions lib/io/event/debug/selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def initialize(selector, log: nil)
@log = log
end

def idle_duration
@selector.idle_duration
end

def now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
Expand Down
13 changes: 13 additions & 0 deletions lib/io/event/selector/select.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ def initialize(loop)

@ready = Queue.new
@interrupt = Interrupt.attach(self)

@idle_duration = 0.0
end

attr :loop

# This is the amount of time the event loop was idle during the last select call.
attr :idle_duration

# If the event loop is currently sleeping, wake it up.
def wakeup
if @blocked
Expand Down Expand Up @@ -415,6 +420,10 @@ def select(duration = nil)
duration = 0 unless @ready.empty?
error = nil

if duration && duration > 0.0
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
Thread.handle_interrupt(::Exception => :on_blocking) do
@blocked = true
Expand All @@ -423,6 +432,10 @@ def select(duration = nil)
# Requeue below...
ensure
@blocked = false
if start_time
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@idle_duration = end_time - start_time
end
end

if error
Expand Down
9 changes: 8 additions & 1 deletion test/io/event/selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def transfer
end

Selector = Sus::Shared("a selector") do
with '.select' do
with '#select' do
let(:quantum) {0.2}

it "can select with 0s timeout" do
Expand All @@ -47,6 +47,13 @@ def transfer
end
end

with '#idle_duration' do
it 'can report idle duration' do
selector.select(0.01)
expect(selector.idle_duration).to be > 0.0
end
end

with '#wakeup' do
it "can wakeup selector from different thread" do
thread = Thread.new do
Expand Down

0 comments on commit c61bb2b

Please sign in to comment.