-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Yuriy Vountesmery
committed
Nov 12, 2024
1 parent
2e5bf9f
commit d8c3de6
Showing
14 changed files
with
710 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
module nngd.nngtests.test05; | ||
|
||
import std.stdio; | ||
import std.concurrency; | ||
import std.exception; | ||
import std.json; | ||
import std.format; | ||
import std.conv; | ||
import std.regex; | ||
import core.thread; | ||
import core.thread.osthread; | ||
import nngd; | ||
|
||
const _testclass = "nngd.nngtests.nng_test05_reqrep"; | ||
|
||
@trusted class nng_test05_reqrep : NNGTest { | ||
|
||
this(Args...)(auto ref Args args) { super(args); } | ||
|
||
override string[] run(){ | ||
log("NNG test 05: request-reply"); | ||
this.uri = "tcp://127.0.0.1:31005"; | ||
immutable string[] tags = ["TAG0", "TAG1", "TAG2", "TAG3"]; | ||
workers ~= new Thread(&(this.server_worker)).start(); | ||
foreach(t; tags){ | ||
this.tag = t; | ||
workers ~= new Thread(&(this.client_worker)).start(); | ||
} | ||
foreach(w; workers) | ||
w.join(); | ||
log(_testclass ~ ": Bye!"); | ||
return []; | ||
} | ||
|
||
void server_worker() @trusted { | ||
const NTAGS = 4; | ||
const NMSGS = 31; | ||
auto ctr = regex(r" ([0-9]+)$"); | ||
uint k = 0, p=0; | ||
int rc; | ||
try{ | ||
thread_attachThis(); | ||
rt_moduleTlsCtor(); | ||
log("REP: Listening at " ~ uri); | ||
NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REP); | ||
s.sendtimeout = msecs(1000); | ||
s.recvtimeout = msecs(1000); | ||
s.sendbuf = 4096; | ||
rc = s.listen(uri); | ||
enforce(rc == 0); | ||
while( p < NTAGS ){ | ||
auto line = s.receive!string(); | ||
if(s.errno != 0){ | ||
error("REP: RECV ERROR: " ~ nng_errstr(s.errno)); | ||
continue; | ||
} | ||
k++; | ||
log("REP: RECV: " ~ line); | ||
auto rres = matchFirst(line, ctr); | ||
line = format("REPLY(%d) = %s",k,line); | ||
if(!rres.empty){ | ||
auto i = to!int(rres[1]); | ||
if(i>NMSGS){ | ||
line = "END"; | ||
p++; | ||
} | ||
} | ||
rc = s.send!string(line); | ||
if(rc != 0){ | ||
error("REP: SEND ERROR: " ~ nng_errstr(rc)); | ||
}else{ | ||
log("REP: SENT: " ~ line); | ||
} | ||
} | ||
log("REP: bye!"); | ||
} catch(Throwable e) { | ||
error(dump_exception_recursive(e, "SS: Sender worker")); | ||
} | ||
} | ||
|
||
void client_worker() @trusted { | ||
const NDIALS = 32; | ||
uint k = 0; | ||
int rc; | ||
bool _ok = false; | ||
string tag = this.tag.dup; | ||
Thread.sleep(msecs(10)); | ||
try{ | ||
thread_attachThis(); | ||
rt_moduleTlsCtor(); | ||
NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ); | ||
s.recvtimeout = msecs(1000); | ||
while(++k < NDIALS){ | ||
log("REQ("~tag~"): to dial..."); | ||
rc = s.dial(uri); | ||
if(rc == 0) | ||
break; | ||
if(rc == nng_errno.NNG_ECONNREFUSED){ | ||
nng_sleep(msecs(100)); | ||
continue; | ||
} | ||
error("REQ("~tag~"): Dial error: ",nng_errstr(rc)); | ||
enforce(rc == 0); | ||
} | ||
if(s.state is nng_socket_state.NNG_STATE_CONNECTED){ | ||
log("REQ: connected with : " ~ nng_errstr(s.errno)); | ||
}else{ | ||
enforce(false, "SS: connection timed out"); | ||
} | ||
k = 0; | ||
while(true){ | ||
k++; | ||
auto line = format("Client(%s) request %d", tag, k); | ||
rc = s.send!string(line); | ||
enforce(rc == 0); | ||
log("REQ("~tag~"): SENT: " ~ line); | ||
auto str = s.receive!string(); | ||
if(s.errno == 0){ | ||
log(format("REQ("~tag~") RECV [%03d]: %s", str.length, str)); | ||
}else{ | ||
error("REQ("~tag~"): Error string: " ~ nng_errstr(s.errno)); | ||
} | ||
if(str == "END"){ | ||
_ok = true; | ||
break; | ||
} | ||
} | ||
if(!_ok){ | ||
error("Test stopped without normal end."); | ||
} | ||
log("REQ("~tag~"): bye!"); | ||
} catch(Throwable e) { | ||
error(dump_exception_recursive(e, "RR: Receiver worker")); | ||
} | ||
} | ||
|
||
private: | ||
Thread[] workers; | ||
string uri; | ||
string tag; | ||
|
||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
module nngd.nngtests.test06; | ||
|
||
import std.stdio; | ||
import std.concurrency; | ||
import std.exception; | ||
import std.json; | ||
import std.format; | ||
import std.conv; | ||
import std.regex; | ||
import std.file; | ||
import std.random; | ||
import core.thread; | ||
import core.thread.osthread; | ||
import nngd; | ||
|
||
const _testclass = "nngd.nngtests.nng_test06_message"; | ||
|
||
@trusted class nng_test06_message : NNGTest { | ||
|
||
this(Args...)(auto ref Args args) { super(args); } | ||
|
||
override string[] run(){ | ||
int rc; | ||
string s = "AbAbAgAlAmAgA"; | ||
log("NNG test 06: nng message manupulation"); | ||
|
||
log("NNGMessage test 1: assembly-disassembly"); | ||
try{ | ||
NNGMessage msg1 = NNGMessage(0); | ||
|
||
rc = msg1.body_append!ushort(11); enforce(rc == 0); | ||
rc = msg1.body_append!uint(12); enforce(rc == 0); | ||
rc = msg1.body_append!ulong(13); enforce(rc == 0); | ||
rc = msg1.body_prepend(cast(ubyte[])s); enforce(rc == 0); | ||
|
||
enforce( msg1.length == 27 && msg1.header_length == 0 ); | ||
|
||
auto x1 = msg1.body_chop!ulong(); | ||
enforce(x1 == 13); | ||
auto x2 = msg1.body_chop!uint(); | ||
enforce(x2 == 12); | ||
auto x3 = msg1.body_chop!ushort(); | ||
enforce(x3 == 11); | ||
auto x4 = msg1.body_trim!(ubyte[])(); | ||
string x5 = cast(string)x4; | ||
enforce(x5 == s); | ||
|
||
enforce( msg1.length == 0 && msg1.header_length == 0 ); | ||
} catch(Throwable e) { | ||
error(dump_exception_recursive(e, "test 1: assembly-disassembly")); | ||
} | ||
log("...passed"); | ||
|
||
log("NNGMessage test 2: send-receive"); | ||
|
||
string url = "tcp://127.0.0.1:13006"; | ||
try{ | ||
NNGSocket sr = NNGSocket(nng_socket_type.NNG_SOCKET_PULL); | ||
sr.recvtimeout = msecs(1000); | ||
rc = sr.listen(url); | ||
enforce(rc == 0); | ||
NNGSocket ss = NNGSocket(nng_socket_type.NNG_SOCKET_PUSH); | ||
ss.sendtimeout = msecs(1000); | ||
ss.sendbuf = 4096; | ||
rc = ss.dial(url); | ||
enforce(rc == 0); | ||
|
||
NNGMessage msg2 = NNGMessage(0); | ||
rc = msg2.body_append!ushort(11); enforce(rc == 0); | ||
rc = msg2.body_append!uint(12); enforce(rc == 0); | ||
rc = msg2.body_append!ulong(13); enforce(rc == 0); | ||
rc = msg2.body_prepend(cast(ubyte[])s); enforce(rc == 0); | ||
NNGMessage msg3 = NNGMessage(0); | ||
|
||
rc = ss.sendmsg(msg2); | ||
enforce(rc == 0); | ||
|
||
rc = sr.receivemsg(&msg3); | ||
enforce(rc == 0); | ||
|
||
enforce( msg3.length == 27 && msg3.header_length == 0 ); | ||
|
||
auto x6 = msg3.body_trim!string(s.length); | ||
enforce(x6 == s); | ||
} catch(Throwable e) { | ||
error(dump_exception_recursive(e, "test 2: send-receive")); | ||
} | ||
|
||
log("NNGMessage test 3: realloc"); | ||
try{ | ||
NNGMessage *msg4 = new NNGMessage(0); | ||
|
||
ubyte[] data = cast(ubyte[])("/dev/urandom".read(8192)); | ||
auto rnd = Random(to!int(timestamp)); | ||
ulong k; | ||
for(auto i=0; i<128; i++){ | ||
k = uniform(0,8192,rnd); | ||
msg4.clear(); | ||
msg4.body_append(data[0..k]); | ||
enforce(msg4.length == k); | ||
} | ||
} catch(Throwable e) { | ||
error(dump_exception_recursive(e, "test 3: realloc")); | ||
} | ||
|
||
log("...passed"); | ||
log(_testclass ~ ": Bye!"); | ||
return []; | ||
} | ||
|
||
|
||
} | ||
|
||
|
||
|
Oops, something went wrong.