Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:robey/kestrel
Browse files Browse the repository at this point in the history
  • Loading branch information
Robey Pointer committed Mar 2, 2012
2 parents 8bd2821 + 5b6e0b2 commit 86540ab
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 8 deletions.
16 changes: 12 additions & 4 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ expired at the same time, `maxExpireSweep` limits the number of items that
will be removed by the background thread in a single round. This is primarily
useful as a throttling mechanism when using a queue as a way to delay work.

Queue expiration
----------------

Whole queues can be configured to expire as well. If `maxQueueAge` is set
`expirationTimerFrequency` is used to check the queue age. If the queue is
empty, and it has been longer than `maxQueueAge` since it was created then
the queue will be deleted.

Fanout Queues
-------------
Expand All @@ -168,7 +175,6 @@ is created, and it will start receiving new items written to the parent queue.
Existing items are not copied over. A fanout queue can be deleted to stop it
from receiving new items.


Memcache commands
-----------------

Expand Down Expand Up @@ -356,7 +362,8 @@ Global stats reported by kestrel are:
- `bytes_read` - total bytes read from clients
- `bytes_written` - total bytes written to clients
- `queue_creates` - total number of queues created
- `queue_deletes` - total number of queues deleted
- `queue_deletes` - total number of queues deleted (includes expires)
- `queue_expires` - total number of queues expires

For each queue, the following stats are also reported:

Expand All @@ -378,8 +385,9 @@ For each queue, the following stats are also reported:
- `waiters` - number of clients waiting for an item from this queue (using
`GET/t`)
- `open_transactions` - items read with `/open` but not yet confirmed
- `total_flushes` total number of times this queue has been flushed

- `total_flushes` - total number of times this queue has been flushed
- `age_msec` - age of the last item read from the queue
- `create_time` - the time that the queue was created (in milliseconds since epoch)

Kestrel as a library
--------------------
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/net/lag/kestrel/Kestrel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
if (expired > 0) {
log.info("Expired %d item(s) from queues automatically.", expired)
}
// Now that we've cleaned out the queue, lets see if any of them are
// ready to be expired.
Kestrel.this.queueCollection.deleteExpiredQueues()
}
}.start()
}
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/net/lag/kestrel/MemcacheHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
report += (("bytes_written", Stats.getCounter("bytes_written")().toString))
report += (("queue_creates", Stats.getCounter("queue_creates")().toString))
report += (("queue_deletes", Stats.getCounter("queue_deletes")().toString))
report += (("queue_expires", Stats.getCounter("queue_expires")().toString))

for (qName <- queues.queueNames) {
report ++= queues.stats(qName).map { case (k, v) => ("queue_" + qName + "_" + k, v) }
Expand Down
18 changes: 18 additions & 0 deletions src/main/scala/net/lag/kestrel/PersistentQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c

// age of the last item read from the queue:
private var _currentAge: Duration = 0.milliseconds

// time the queue was created
private var _createTime = Time.now

def statNamed(statName: String) = "q/" + name + "/" + statName

Expand Down Expand Up @@ -92,6 +95,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
def currentAge: Duration = synchronized { if (queueSize == 0) 0.milliseconds else _currentAge }
def waiterCount: Long = synchronized { waiters.size }
def isClosed: Boolean = synchronized { closed || paused }
def createTime: Long = synchronized { _createTime }

// mostly for unit tests.
def memoryLength: Long = synchronized { queue.size }
Expand Down Expand Up @@ -129,6 +133,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
gauge("age_msec", currentAge.inMilliseconds)
gauge("waiters", waiterCount)
gauge("open_transactions", openTransactionCount)
gauge("create_time", createTime)

private final def adjustExpiry(startingTime: Time, expiry: Option[Time]): Option[Time] = {
if (config.maxAge.isDefined) {
Expand All @@ -139,6 +144,19 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
}
}

/**
* Check if this Queue is eligible for expiration by way of it being empty
* and it's age being greater than or equal to maxQueueAge
*/
def isReadyForExpiration: Boolean = {
// Don't even bother if the maxQueueAge is None
if (config.maxQueueAge.isDefined && queue.isEmpty && Time.now > _createTime + config.maxQueueAge.get) {
true
} else {
false
}
}

// you are holding the lock, and config.keepJournal is true.
private def checkRotateJournal() {
/*
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/net/lag/kestrel/QueueCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,27 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
queue(name) map { q => q.discardExpired(q.config.maxExpireSweep) } getOrElse(0)
}
}

def expireQueue(name: String): Unit = {
if (!shuttingDown) {
queues.get(name) map { q =>
if (q.isReadyForExpiration) {
delete(name)
Stats.incr("queue_expires")
log.info("Expired queue %s", name)
}
}
}
return 1
}

def flushAllExpired(): Int = {
queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName) }
}

def deleteExpiredQueues(): Unit = {
queueNames.map { qName => expireQueue(qName) }
}

def stats(key: String): Array[(String, String)] = queue(key) match {
case None => Array[(String, String)]()
Expand Down
15 changes: 11 additions & 4 deletions src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ case class QueueConfig(
syncJournal: Duration,
expireToQueue: Option[String],
maxExpireSweep: Int,
fanoutOnly: Boolean
fanoutOnly: Boolean,
maxQueueAge: Option[Duration]
) {
override def toString() = {
("maxItems=%d maxSize=%s maxItemSize=%s maxAge=%s defaultJournalSize=%s maxMemorySize=%s " +
"maxJournalSize=%s discardOldWhenFull=%s keepJournal=%s syncJournal=%s " +
"expireToQueue=%s maxExpireSweep=%d fanoutOnly=%s").format(maxItems, maxSize,
"expireToQueue=%s maxExpireSweep=%d fanoutOnly=%s maxQueueAge=%s").format(maxItems, maxSize,
maxItemSize, maxAge, defaultJournalSize, maxMemorySize, maxJournalSize, discardOldWhenFull,
keepJournal, syncJournal, expireToQueue, maxExpireSweep, fanoutOnly)
keepJournal, syncJournal, expireToQueue, maxExpireSweep, fanoutOnly, maxQueueAge)
}
}

Expand Down Expand Up @@ -140,10 +141,16 @@ class QueueBuilder extends Config[QueueConfig] {
*/
var fanoutOnly: Boolean = false

/**
* Expiration time for the queue itself. If the queue is empty and older
* than this value then we should delete it.
*/
var maxQueueAge: Option[Duration] = None

def apply() = {
QueueConfig(maxItems, maxSize, maxItemSize, maxAge, defaultJournalSize, maxMemorySize,
maxJournalSize, discardOldWhenFull, keepJournal, syncJournal,
expireToQueue, maxExpireSweep, fanoutOnly)
expireToQueue, maxExpireSweep, fanoutOnly, maxQueueAge)
}
}

Expand Down
38 changes: 38 additions & 0 deletions src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,44 @@ class PersistentQueueSpec extends Specification
}
}

"PersistentQueue with expiry" should {
val timer = new FakeTimer()
val scheduler = new ScheduledThreadPoolExecutor(1)

"expire queue" in {
withTempFolder {
Time.withCurrentTimeFrozen { time =>
val config = new QueueBuilder {
keepJournal = false
maxQueueAge = 90.seconds
}.apply()
val q = new PersistentQueue("wu_tang", folderName, config, timer, scheduler)
q.setup()

// Not ready, we just got started!
q.isReadyForExpiration mustEqual false

q.add("method man".getBytes, None) mustEqual true

time.advance(30.seconds)
// We aren't ready to expire yet, as it's not been long enough
q.isReadyForExpiration mustEqual false

time.advance(61.seconds)

// Still not ready, as we have items in the queue!
q.isReadyForExpiration mustEqual false

q.remove must beSomeQItem("method man") // queue is now empty

// This should be true now because the queue is 91 seconds old and
// has no items
q.isReadyForExpiration mustEqual true
}
}
}
}

"PersistentQueue with item expiry" should {
val timer = new FakeTimer()
val scheduler = new ScheduledThreadPoolExecutor(1)
Expand Down

0 comments on commit 86540ab

Please sign in to comment.