You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I think I have a handle on why my attempt to measure the messaging throughput with a single contended echo (rather than one per client) resulting in deadlock. I suspect the problem is actually very serious.
The changes I made:
use a different mailBoxSpec for 'Echo', set to c;lientCount producers
create just one echo actor instance
to make testing easier I just use 900 batch size and 1 iteration
On my system, I have 8 cores, so I have 160 clients and they try to send 64000 messages in 'repeat' loops. This results in the JVM CPU usage dropping to 0.
If I increase the echo mailbox capacity to 30000 then it still fails. With 40000 it is working, but I think I'm just lucky and the echo actor got some CPU.
I believe the problem is that the CPU threads are all waiting on the actor's mailbox because send is effectively a blocking operation.
I think that's rather bad in an actor runtime.
Throughput is fine when its working, but it seems dangerous to use fixed size queues that can block a sender.
fun run() {
val clientCount = getRuntime().availableProcessors() * 20
val mailboxSpecClient = ConcurrentQueueSpec(1,1,5000, Ordering.PRODUCER_FIFO , Preference.NONE)
val mailboxSpecEcho = ConcurrentQueueSpec(clientCount,1,40000, Ordering.PRODUCER_FIFO , Preference.NONE)
val messageCount = 1_000_000
val batchSize = 400
println("Dispatcher\t\tElapsed\t\tMsg/sec")
val tps = arrayOf(/1,2,5,10,20,50,100,150,200,300, 400, 500, 600, 700, 800,/ 900)
for (t in tps) {
val d = DefaultDispatcher(throughput = t)
val echoProps =
fromProducer { EchoActor() }
.withDispatcher(d)
.withMailbox { newSpecifiedMailbox(mailboxSpecEcho) }
val latch = CountDownLatch(clientCount)
val clientProps =
fromProducer { PingActor(latch, messageCount, batchSize) }
.withDispatcher(d)
.withMailbox { newSpecifiedMailbox(mailboxSpecClient)}
val echoActor = spawn(echoProps)
val pairs = (0 until clientCount)
.map { Pair(spawn(clientProps), echoActor) }
.toTypedArray()
val sw = nanoTime()
for ((ping, pong) in pairs) {
send(ping, Start(Msg(ping,pong)))
}
latch.await()
val elapsedNanos = (nanoTime() - sw).toDouble()
val elapsedMillis = (elapsedNanos / 1_000_000).toInt()
val totalMessages = messageCount * 2 * clientCount
val x = ((totalMessages.toDouble() / elapsedNanos * 1_000_000_000.0 ).toInt())
println("$t\t\t\t\t$elapsedMillis\t\t\t$x")
for ((client, echo) in pairs) {
stop(client)
stop(echo)
}
Thread.sleep(500)
}
}
data class Msg(val ping: PID, val pong: PID)
data class Start(val msg : Msg)
class EchoActor : Actor {
suspend override fun Context.receive(msg: Any) {
//print('.')
when (msg) {
is Msg -> send(msg.ping, msg)
}
}
}
class PingActor(private val latch: CountDownLatch, private var messageCount: Int, private val batchSize: Int, private var batch: Int = 0) : Actor {
suspend override fun Context.receive(msg: Any) {
when (msg) {
is Start -> sendBatch(msg.msg)
is Msg -> {
batch--
if (batch > 0) return
if (!sendBatch(msg)) {
latch.countDown()
}
}
}
}
private fun Context.sendBatch(msg : Msg): Boolean = when (messageCount) {
0 -> false
else -> {
val n = minOf(batchSize, messageCount)
repeat(n) { send(msg.pong, msg) }
messageCount -= n
batch = n
true
}
}
}
The text was updated successfully, but these errors were encountered:
I think I have a handle on why my attempt to measure the messaging throughput with a single contended echo (rather than one per client) resulting in deadlock. I suspect the problem is actually very serious.
The changes I made:
On my system, I have 8 cores, so I have 160 clients and they try to send 64000 messages in 'repeat' loops. This results in the JVM CPU usage dropping to 0.
If I increase the echo mailbox capacity to 30000 then it still fails. With 40000 it is working, but I think I'm just lucky and the echo actor got some CPU.
I believe the problem is that the CPU threads are all waiting on the actor's mailbox because send is effectively a blocking operation.
I think that's rather bad in an actor runtime.
Throughput is fine when its working, but it seems dangerous to use fixed size queues that can block a sender.
package actor.proto.examples.inprocessbenchmark
import actor.proto.*
import actor.proto.mailbox.DefaultDispatcher
import actor.proto.mailbox.newMpscArrayMailbox
import actor.proto.mailbox.newSpecifiedMailbox
import org.jctools.queues.spec.ConcurrentQueueSpec
import org.jctools.queues.spec.Ordering
import org.jctools.queues.spec.Preference
import java.lang.Runtime.getRuntime
import java.lang.System.nanoTime
import java.util.concurrent.CountDownLatch
fun main(args: Array) {
repeat(1 /0/) {
run()
readLine()
}
}
fun run() {
val clientCount = getRuntime().availableProcessors() * 20
val mailboxSpecClient = ConcurrentQueueSpec(1,1,5000, Ordering.PRODUCER_FIFO , Preference.NONE)
val mailboxSpecEcho = ConcurrentQueueSpec(clientCount,1,40000, Ordering.PRODUCER_FIFO , Preference.NONE)
val messageCount = 1_000_000
val batchSize = 400
println("Dispatcher\t\tElapsed\t\tMsg/sec")
val tps = arrayOf(/1,2,5,10,20,50,100,150,200,300, 400, 500, 600, 700, 800,/ 900)
for (t in tps) {
val d = DefaultDispatcher(throughput = t)
}
data class Msg(val ping: PID, val pong: PID)
data class Start(val msg : Msg)
class EchoActor : Actor {
suspend override fun Context.receive(msg: Any) {
//print('.')
when (msg) {
is Msg -> send(msg.ping, msg)
}
}
}
class PingActor(private val latch: CountDownLatch, private var messageCount: Int, private val batchSize: Int, private var batch: Int = 0) : Actor {
suspend override fun Context.receive(msg: Any) {
when (msg) {
is Start -> sendBatch(msg.msg)
is Msg -> {
batch--
if (batch > 0) return
if (!sendBatch(msg)) {
latch.countDown()
}
}
}
}
}
The text was updated successfully, but these errors were encountered: