Skip to content

Commit

Permalink
coroutines: use photon work_pool when nr_jobs > 0, and use photon lib…
Browse files Browse the repository at this point in the history
…c fn wrappers (vlang#19711)
  • Loading branch information
joe-conigliaro authored Oct 31, 2023
1 parent 57a7db1 commit a63f3e6
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 12 deletions.
1 change: 1 addition & 0 deletions cmd/tools/modules/testing/common.v
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ pub fn new_test_session(_vargs string, will_compile bool) TestSession {
// is only available on macos for now, and it is not yet trivial enough to
// build/install on the CI:
skip_files << 'examples/coroutines/simple_coroutines.v'
skip_files << 'examples/coroutines/coroutines_bench.v'
$if msvc {
skip_files << 'vlib/v/tests/const_comptime_eval_before_vinit_test.v' // _constructor used
skip_files << 'vlib/v/tests/project_with_cpp_code/compiling_cpp_files_with_a_cplusplus_compiler_test.v'
Expand Down
37 changes: 37 additions & 0 deletions examples/coroutines/coroutines_bench.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Build with (-gc none, until GC bug is fixed)
// v -gc none -use-coroutines coroutine_benchs.v
//
import coroutines
import time
import net.http
import sync

const run_time = 10 * time.second

fn request(mut mu sync.Mutex, count &int) {
for {
http.get('http://vlang.io/utc_now') or { panic(err) }
mu.@lock()
unsafe {
(*count)++
}
mu.unlock()
}
}

fn main() {
mut mu := sync.new_mutex()
mut count := 0

for _ in 0 .. 8 {
go request(mut mu, &count)
}
$if is_coroutine ? {
println('IS COROUTINE=true')
coroutines.sleep(run_time)
} $else {
println('IS COROUTINE=false')
time.sleep(run_time)
}
println('${count} requests made.')
}
25 changes: 23 additions & 2 deletions thirdparty/photon/photonwrapper.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef C_PHOTONWRAPPER_H_
#define C_PHOTONWRAPPER_H_

#include <sys/socket.h>


#ifdef __cplusplus

Expand All @@ -12,23 +14,42 @@
#include <photon/common/iovector.h>
#include <photon/fs/localfs.h>
#include <photon/net/socket.h>

#include <photon/net/basic_socket.h>
#include <photon/thread/workerpool.h>
#include <iostream>

extern "C" {
// using namespace photon;
// WorkPool* work_pool;
// WorkPool* new_photon_work_pool();
photon::WorkPool* work_pool;
#else

#endif

// custom v functions
void init_photon_work_pool(size_t);
void photon_thread_create_and_migrate_to_work_pool(void* (* f)(void*), void* arg);
// direct wrappers to photon functions
int photon_init_default();
void photon_thread_create(void* (* f)(void*), void* arg);
void photon_sleep_s(int n);
void photon_sleep_ms(int n);

// void* default_photon_thread_stack_alloc(void*, size_t size);
// void default_photon_thread_stack_dealloc(void*, void* ptr, size_t size);
void set_photon_thread_stack_allocator(
void* (*alloc_func)(void*, size_t),
void (*dealloc_func)(void*, void*, size_t)
);

int photon_socket(int domain, int type, int protocol);
int photon_connect(int fd, const struct sockaddr *addr, socklen_t addrlen, uint64_t timeout);
int photon_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, uint64_t timeout);
ssize_t photon_send(int fd, const void* buf, size_t len, int flags, uint64_t timeout);
// ssize_t photon_sendmsg(int fd, const struct msghdr* msg, int flags, uint64_t timeout);
ssize_t photon_recv(int fd, void* buf, size_t count, int flags, uint64_t timeout);
// ssize_t photon_recvmsg(int fd, struct msghdr* msg, int flags, uint64_t timeout);

#ifdef __cplusplus
}
#endif
Expand Down
11 changes: 10 additions & 1 deletion vlib/coroutines/coroutines.v
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
// that can be found in the LICENSE file.
module coroutines

import v.util
import time

#flag -I @VEXEROOT/thirdparty/photon
#flag @VEXEROOT/thirdparty/photon/photonwrapper.so

#include "photonwrapper.h"

fn C.photon_init_default() int
// struct C.WorkPool {}
// fn C.new_photon_work_pool) C.WorkPool
fn C.init_photon_work_pool(int)

// fn C.photon_thread_create_and_migrate_to_work_pool(f voidptr, arg voidptr)
fn C.photon_thread_create(f voidptr, arg voidptr)
fn C.photon_init_default() int
fn C.photon_sleep_s(n int)
fn C.photon_sleep_ms(n int)
fn C.set_photon_thread_stack_allocator(fn (voidptr, int) voidptr, fn (voidptr, voidptr, int))
Expand Down Expand Up @@ -41,6 +47,9 @@ fn init() {
}
C.set_photon_thread_stack_allocator(alloc, dealloc)
ret := C.photon_init_default()
if util.nr_jobs > 0 {
C.init_photon_work_pool(util.nr_jobs)
}
if ret < 0 {
panic('failed to initialize coroutines via photon (ret=${ret})')
}
Expand Down
6 changes: 6 additions & 0 deletions vlib/net/aasocket.c.v
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,11 @@ fn C.FD_ISSET(fd int, fdset &C.fd_set) int

fn C.inet_pton(family AddrFamily, saddr &char, addr voidptr) int

fn C.photon_socket(domain AddrFamily, typ SocketType, protocol int) int
fn C.photon_connect(int, &Addr, u32, timeout u64) int
fn C.photon_accept(int, voidptr, int, timeout u64) int
fn C.photon_send(int, voidptr, int, int, timeout u64) int
fn C.photon_recv(int, voidptr, int, int, timeout u64) int

[typedef]
pub struct C.fd_set {}
48 changes: 40 additions & 8 deletions vlib/net/tcp.v
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ pub fn (mut c TcpConn) close() ! {
}

pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int {
mut res := wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
mut res := $if is_coroutine ? {
wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))!
} $else {
wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
}
$if trace_tcp ? {
eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
}
Expand All @@ -127,7 +131,11 @@ pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int {
code := error_code()
if code == int(error_ewouldblock) {
c.wait_for_read()!
res = wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
res = $if is_coroutine ? {
wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))!
} $else {
wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
}
$if trace_tcp ? {
eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
}
Expand Down Expand Up @@ -177,7 +185,11 @@ pub fn (mut c TcpConn) write_ptr(b &u8, len int) !int {
for total_sent < len {
ptr := ptr_base + total_sent
remaining := len - total_sent
mut sent := C.send(c.sock.handle, ptr, remaining, msg_nosignal)
mut sent := $if is_coroutine ? {
C.photon_send(c.sock.handle, ptr, remaining, msg_nosignal, c.write_timeout)
} $else {
C.send(c.sock.handle, ptr, remaining, msg_nosignal)
}
$if trace_tcp_data_write ? {
eprintln('>>> TcpConn.write_ptr | data chunk, total_sent: ${total_sent:6}, remaining: ${remaining:6}, ptr: ${voidptr(ptr):x} => sent: ${sent:6}')
}
Expand Down Expand Up @@ -337,10 +349,18 @@ pub fn (mut l TcpListener) accept_only() !&TcpConn {
eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}')
}

mut new_handle := C.accept(l.sock.handle, 0, 0)
mut new_handle := $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout)
} $else {
C.accept(l.sock.handle, 0, 0)
}
if new_handle <= 0 {
l.wait_for_accept()!
new_handle = C.accept(l.sock.handle, 0, 0)
new_handle = $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout)
} $else {
C.accept(l.sock.handle, 0, 0)
}
if new_handle == -1 || new_handle == 0 {
return error('accept failed')
}
Expand Down Expand Up @@ -389,7 +409,11 @@ struct TcpSocket {
}

fn new_tcp_socket(family AddrFamily) !TcpSocket {
handle := socket_error(C.socket(family, SocketType.tcp, 0))!
handle := $if is_coroutine ? {
socket_error(C.photon_socket(family, SocketType.tcp, 0))!
} $else {
socket_error(C.socket(family, SocketType.tcp, 0))!
}
mut s := TcpSocket{
handle: handle
}
Expand Down Expand Up @@ -519,7 +543,11 @@ const (

fn (mut s TcpSocket) connect(a Addr) ! {
$if !net_blocking_sockets ? {
res := C.connect(s.handle, voidptr(&a), a.len())
res := $if is_coroutine ? {
C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout)
} $else {
C.connect(s.handle, voidptr(&a), a.len())
}
if res == 0 {
return
}
Expand Down Expand Up @@ -555,7 +583,11 @@ fn (mut s TcpSocket) connect(a Addr) ! {
wrap_error(ecode)!
return
} $else {
x := C.connect(s.handle, voidptr(&a), a.len())
x := $if is_coroutine ? {
C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout)
} $else {
C.connect(s.handle, voidptr(&a), a.len())
}
socket_error(x)!
}
}
6 changes: 5 additions & 1 deletion vlib/v/gen/c/spawn_and_go.v
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ fn (mut g Gen) spawn_and_go_expr(node ast.SpawnExpr, mode SpawnGoMode) {
}
}
} else if is_go {
g.writeln('photon_thread_create((void*)${wrapper_fn_name}, &${arg_tmp_var});')
if util.nr_jobs > 0 {
g.writeln('photon_thread_create_and_migrate_to_work_pool((void*)${wrapper_fn_name}, &${arg_tmp_var});')
} else {
g.writeln('photon_thread_create((void*)${wrapper_fn_name}, &${arg_tmp_var});')
}
}
g.writeln('// end go')
if node.is_expr {
Expand Down

0 comments on commit a63f3e6

Please sign in to comment.