Skip to content

Commit

Permalink
Merge pull request #98 from viartemev/add-prefetchSize-to-func
Browse files Browse the repository at this point in the history
Add prefetchSize to a channel function
  • Loading branch information
viartemev authored Feb 18, 2019
2 parents 1b25630 + 572fc8e commit 8749661
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.viartemev.thewhiterabbit.consumer.ConfirmConsumer

fun Channel.consumer(queue: String) = ConfirmConsumer(this, queue)
fun Channel.consumer(queue: String, prefetchSize: Int) = ConfirmConsumer(this, queue, prefetchSize)

suspend fun Connection.channel(block: suspend Channel.() -> Unit): Channel {
val channel = this.createChannel()
channel.use { block(it) }
return channel
}

suspend fun Channel.consume(queue: String, block: suspend ConfirmConsumer.() -> Unit) {
val consumer = this.consumer(queue)
suspend fun Channel.consume(queue: String, prefetchSize: Int = 0, block: suspend ConfirmConsumer.() -> Unit) {
val consumer = this.consumer(queue, prefetchSize)
try {
block(consumer)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import kotlinx.coroutines.channels.Channel as KChannel

private val logger = KotlinLogging.logger {}

class ConfirmConsumer internal constructor(private val amqpChannel: Channel, amqpQueue: String, prefetchSize: Int = 0) {
class ConfirmConsumer internal constructor(private val amqpChannel: Channel, amqpQueue: String, prefetchSize: Int) {
private val deliveries = KChannel<Delivery>()
private val consTag: String

Expand All @@ -26,11 +26,11 @@ class ConfirmConsumer internal constructor(private val amqpChannel: Channel, amq
try {
deliveries.sendBlocking(message)
} catch (e: Exception) {
logger.info { "Consumer $consumerTag has been cancelled" }
logger.debug { "Can't send a message. Consumer $consumerTag has been cancelled" }
}
},
{ consumerTag ->
logger.info { "Consumer $consumerTag has been cancelled" }
logger.debug { "Consumer $consumerTag has been cancelled" }
deliveries.cancel()
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ConfirmConsumerTest {
runBlocking {
connection.channel {
declareQueue(QueueSpecification(QUEUE_NAME))
consume(QUEUE_NAME) {
consume(QUEUE_NAME, 2) {
for (i in 1..3) consumeWithConfirm({ handleDelivery(it) })
}
}
Expand Down

0 comments on commit 8749661

Please sign in to comment.