Skip to content

Commit

Permalink
WIP: Mutlithreaded test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuriy Vountesmery committed Nov 3, 2024
1 parent 55b499d commit 2e5bf9f
Show file tree
Hide file tree
Showing 14 changed files with 800 additions and 33 deletions.
17 changes: 14 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.SUFFIXES:
.ONESHELL:
.NOTPARALLEL:
#.NOTPARALLEL:

# default is ON
NNG_WITH_MBEDTLS?=ON
Expand All @@ -12,6 +12,9 @@ DINC=nngd extern/libnng/libnng
DTESTS=$(wildcard tests/test*.d)
RUNTESTS=$(basename $(notdir $(DTESTS)))

TESTBENCH=testnng.d
BINTESTBENCH=./build/$(basename $(notdir $(TESTBENCH)))

ifeq ($(NNG_WITH_MBEDTLS),ON)
DCFLAGS=-O -d -m64 -i -debug -g -version=withtls
DLFLAGS=-Lextern/libnng/extern/nng/build/lib/ -Lextern/libnng/extern/mbedtls/build/target/lib/ -lnng -lmbedtls -lmbedcrypto -lmbedx509
Expand All @@ -21,9 +24,17 @@ else
endif


all: extern mime lib buildtest
all: extern mime lib testbench
@echo "All done!"

testbench: $(BINTESTBENCH)

$(BINTESTBENCH):
$(DC) $(DCFLAGS) -od=build/ -of=$@ ${addprefix -I,$(DINC)} ${addprefix -L,$(DLFLAGS)} $(TESTBENCH)

runtest: testbench
$(BINTESTBENCH) && echo "All passed!" || echo "Tests failed, see log"

buildtest: $(DTESTS)

extern:
Expand Down Expand Up @@ -76,4 +87,4 @@ clean-extern:
proper-extern:
$(MAKE) proper -C extern/

.PHONY: all extern mime lib clean $(DTESTS) $(RUNTESTS)
.PHONY: all extern mime lib clean $(TESTBENCH) testbench runtest $(DTESTS) $(RUNTESTS)
2 changes: 1 addition & 1 deletion nngd/mime.d
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ const string[string] nng_mime_map = [
,".sslkeylogfile": "application/sslkeylogfile"
,".stratum": "application/stratum"
,".tnauthlist": "application/tnauthlist"
,".toml": "application/toml"
,".trig": "application/trig"
,".tzif": "application/tzif"
,".ulpfec": "application/ulpfec"
Expand Down Expand Up @@ -304,7 +305,6 @@ const string[string] nng_mime_map = [
,".heic": "image/heic"
,".heif": "image/heif"
,".hej2k": "image/hej2k"
,".hsj2": "image/hsj2"
,".ief": "image/ief"
,".j2c": "image/j2c"
,".jls": "image/jls"
Expand Down
31 changes: 15 additions & 16 deletions nngd/nngd.d
Original file line number Diff line number Diff line change
Expand Up @@ -3392,10 +3392,9 @@ unittest{
struct WebSocketClient {

@disable this();




private:

ws_state localstate;
ubyte[4] masking_key;
ubyte[] rxbuf;
Expand Down Expand Up @@ -3428,7 +3427,6 @@ struct WebSocketClient {
}

int openstate(string url, string origin){

int rc;
char[1024] buf;

Expand All @@ -3449,8 +3447,8 @@ struct WebSocketClient {
string hello = format("GET /%s HTTP/1.1\r\n", join(u.path,"/"))
~ "Upgrade: websocket\r\n"
~ "Connection: upgrade\r\n"
~ "Host: localhost\r\n"
;
hello ~= (u.port == "80") ? format("Host: %s\r\n",u.host) : format("Host: %s:%s\r\n",u.host,u.port);
if(origin !is null)
hello ~= format("Origin: %s\r\n", origin);
hello ~= "Pragma: no-cache\r\n"
Expand Down Expand Up @@ -3581,6 +3579,7 @@ struct WebSocketClient {
for(int j=0; j < ws.N; ++j)
rxbuf[i+ws.header_size] ^= ws.masking_key[j & 0x03];
send_data(ws_opcode.PONG,rxbuf[ws.header_size .. ws.header_size + ws.N]);
rxbuf = rxbuf[ws.header_size + ws.N .. $];
break;
case ws_opcode.PONG:
break;
Expand All @@ -3597,6 +3596,7 @@ struct WebSocketClient {
if(ws.fin){
cb(received_data);
received_data.length = 0;
rxbuf = rxbuf[ws.header_size + ws.N .. $];
}
break;
default:
Expand All @@ -3607,7 +3607,6 @@ struct WebSocketClient {
}
}


public:

ws_options opt;
Expand Down Expand Up @@ -3689,27 +3688,27 @@ struct WebSocketClient {
}
}

void send( string msg ){
send_data(ws_opcode.TEXT_FRAME, cast(ubyte[])msg.dup);
void send(T)( T msg ) if(is(T == string))
{
send_data(ws_opcode.TEXT_FRAME, cast(ubyte[])msg);
}

void send_b( ubyte[] msg ){

void send(T)( T msg ) if(is(T == ubyte[]))
{
send_data(ws_opcode.BINARY_FRAME, msg);
}

void send_b( string msg ){
send_data(ws_opcode.BINARY_FRAME, cast(ubyte[])msg.dup);
}

void send_ping(){
send_data(ws_opcode.PING, null);
}

void dispatch(ws_client_handler cb){
void dispatch(F)(F cb) if(is(F == ws_client_handler))
{
dispatch_data((ubyte[] message){cb((cast(string)(message)[0..$]));});
}

void dispatch_b(ws_client_handler_b cb){
void dispatch(F)(F cb) if(is(F == ws_client_handler_b))
{
dispatch_data((ubyte[] message){cb(message);});
}

Expand Down
23 changes: 23 additions & 0 deletions nngd/nngtests/package.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module nngd.nngtests;

public {

import nngd.nngtests.test00;
import nngd.nngtests.test01;
import nngd.nngtests.test02;
import nngd.nngtests.test03;
import nngd.nngtests.test04;

static immutable string[] testlist = (){
return [
nngd.nngtests.test00._testclass,
nngd.nngtests.test01._testclass,
nngd.nngtests.test02._testclass,
nngd.nngtests.test03._testclass,
nngd.nngtests.test04._testclass
];
}();


}

24 changes: 24 additions & 0 deletions nngd/nngtests/test00.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module nngd.nngtests.test00;

import std.stdio;

import nngd;

const _testclass = "nngd.nngtests.nng_test00_template";


class nng_test00_template : NNGTest {

this(Args...)(auto ref Args args) { super(args); }

override string[] run(){
log("NNG test template");
return [];
}


}




120 changes: 120 additions & 0 deletions nngd/nngtests/test01.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
module nngd.nngtests.test01;

import std.stdio;
import std.concurrency;
import std.exception;
import std.json;
import std.format;
import core.thread;
import core.thread.osthread;
import nngd;

const _testclass = "nngd.nngtests.nng_test01_pushpull_buffer";

class nng_test01_pushpull_buffer : NNGTest {

this(Args...)(auto ref Args args) { super(args); }

override string[] run(){
log("NNG test 01: pushpuli send buffer");
this.uri = "tcp://127.0.0.1:31000";
workers ~= new Thread(&(this.receiver_worker)).start();
workers ~= new Thread(&(this.sender_worker)).start();
foreach(w; workers)
w.join();
return [];
}


void sender_worker() @trusted {
const NDIALS = 32;
const NMSGS = 32;
uint k = 0;
int rc;
try{
thread_attachThis();
rt_moduleTlsCtor();
log("SS: Conncting to " ~ uri);
NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_PUSH);
s.sendtimeout = msecs(1000);
s.sendbuf = 4096;
while( ++k < NDIALS ){
rc = s.dial(uri);
if(rc == 0) break;
if(rc == nng_errno.NNG_ECONNREFUSED){
log("SS: Connection refused attempt %d", k);
nng_sleep(msecs(100));
continue;
}
error("SS: Dial error after %d attempts: %d", NDIALS, rc);
enforce(rc == 0);
}
if(s.state is nng_socket_state.NNG_STATE_CONNECTED){
log("SS: connected with : " ~ nng_errstr(s.errno));
}else{
enforce(false, "SS: connection timed out");
}
k = 0;
while(++k < NMSGS){
auto line = format(`{"msg": %d, "check": %d, "time": %f}`,
k, mkrot3(k), timestamp());
log(line);
auto sbuf = cast(ubyte[])line.dup;
rc = s.send!(ubyte[])(sbuf);
enforce(rc == 0);
log("SS: sent: " ~ line);
nng_sleep(msecs(100));
}
nng_sleep(msecs(100));
} catch(Throwable e) {
error(dump_exception_recursive(e, "SS: Sender worker"));
}
}

void receiver_worker() @trusted {
const NDIALS = 32;
const NMSGS = 32;
const BSIZE = 4096;
ubyte[BSIZE] rbuf;
uint k = 0;
int rc;
size_t sz = rbuf.length;
try{
thread_attachThis();
rt_moduleTlsCtor();
NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_PULL);
s.recvtimeout = msecs(1000);
rc = s.listen(uri);
enforce(rc == 0, "RR: Error listening socket");
while(1){
sz = s.receivebuf(rbuf, rbuf.length);
if(sz < 0 || sz == size_t.max){
error("REcv error: " ~ nng_errstr(s.errno));
continue;
}
auto line = cast(string)rbuf[0..sz];
log("RR: received: " ~ line);
auto jdata = parseJSON(line);
k = cast(uint)(jdata["msg"].integer);
auto c = cast(uint)(jdata["check"].integer);
if(!chkrot3(k,c)){
error("Invalid message data: " ~ line);
continue;
}
if(k >= NMSGS-1)
break;
}
} catch(Throwable e) {
error(dump_exception_recursive(e, "RR: Receiver worker"));
}
}

private:
Thread[] workers;
string uri;

}




Loading

0 comments on commit 2e5bf9f

Please sign in to comment.