Skip to content

Commit

Permalink
Merge pull request #367 from tumblr/bhaskar-distributed-cache
Browse files Browse the repository at this point in the history
Introducing hazel cast for clustered operation of collins
  • Loading branch information
yl3w committed Sep 7, 2015
2 parents 10edb36 + 6560cef commit 3c5b16c
Show file tree
Hide file tree
Showing 232 changed files with 3,024 additions and 2,448 deletions.
21 changes: 12 additions & 9 deletions app/Global.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,29 @@ import play.api.Mode
import play.api.Play
import play.api.mvc.Handler
import play.api.mvc.RequestHeader
import play.api.mvc.Results
import play.api.mvc.Result
import play.api.mvc.Results

import collins.callbacks.Callback
import collins.controllers.ApiResponse
import collins.db.DB
import collins.hazelcast.HazelcastHelper
import collins.logging.LoggingHelper
import collins.metrics.MetricsReporter
import collins.models.cache.Cache
import collins.solr.SolrHelper
import collins.util.BashOutput
import collins.util.CryptoAccessor
import collins.util.JsonOutput
import collins.util.OutputType
import collins.util.Stats
import collins.util.TextOutput
import collins.util.config.CryptoConfig
import collins.util.config.Registry
import collins.util.security.AuthenticationAccessor
import collins.util.security.AuthenticationProvider
import collins.util.security.AuthenticationProviderConfig

import collins.models.cache.Cache
import collins.logging.LoggingHelper
import collins.util.config.Registry
import collins.solr.SolrHelper
import collins.metrics.MetricsReporter
import collins.callbacks.Callback

object Global extends GlobalSettings with AuthenticationAccessor with CryptoAccessor {
private[this] val logger = Logger.logger

Expand All @@ -40,18 +40,21 @@ object Global extends GlobalSettings with AuthenticationAccessor with CryptoAcce

override def onStart(app: Application) {
Registry.setupRegistry(app)
HazelcastHelper.setupHazelcast()
Cache.setupCache()
setAuthentication(AuthenticationProvider.get(AuthenticationProviderConfig.authType))
setCryptoKey(CryptoConfig.key)
LoggingHelper.setupLogging(app)
Callback.setupCallbacks()
SolrHelper.setupSolr()
MetricsReporter.setupMetrics()
Callback.setupCallbacks()
}

override def onStop(app: Application) {
DB.shutdown()
Registry.terminateRegistry()
Cache.terminateCache()
HazelcastHelper.terminateHazelcast()
SolrHelper.terminateSolr()
Callback.terminateCallbacks()
}
Expand Down
22 changes: 17 additions & 5 deletions app/collins/cache/CacheConfig.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
package collins.cache

import com.google.common.cache.CacheBuilderSpec

import collins.guava.GuavaConfig
import collins.hazelcast.HazelcastConfig
import collins.util.config.Configurable
import collins.util.config.ConfigurationException

object CacheConfig extends Configurable {

override val namespace = "cache"
override val referenceConfigFilename = "cache_reference.conf"

def enabled = getBoolean("enabled", true)
def specification = getString("specification", "maximumSize=10000,expireAfterWrite=10s,recordStats")
def cacheType = getString("type", "in-memory")

override protected def validateConfig() {
logger.debug(s"Loading domain model cache specification enabled - $enabled")
if (enabled) {
logger.debug("Validating domain model cache specification")
CacheBuilderSpec.parse(specification)
if (cacheType != "in-memory" && cacheType != "distributed")
throw new ConfigurationException("Please specify cache type of 'in-memory' or 'distributed'")

if (cacheType == "in-memory") {
if (!GuavaConfig.enabled) {
throw new ConfigurationException("In memory cache uses Guava, please enable and configure it.")
}
} else {
if (!HazelcastConfig.enabled) {
throw new ConfigurationException("Distributed cache uses hazelcast, please enable and configure it.")
}
}
}
}
}
6 changes: 3 additions & 3 deletions app/collins/callbacks/AsyncCallbackManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import akka.actor.ActorRef
import akka.actor.actorRef2Scala

trait AsyncCallbackManager extends CallbackManager {
protected def changeQueue: ActorRef
protected var changeQueue: Option[ActorRef]

override def fire(propertyName: String, oldValue: AnyRef, newValue: AnyRef) {
override def fire(propertyName: String, oldValue: CallbackDatumHolder, newValue: CallbackDatumHolder) {
logger.debug("Async Firing %s".format(propertyName))
changeQueue ! CallbackMessage(propertyName, oldValue, newValue)
changeQueue.foreach(_ ! CallbackMessage(propertyName, oldValue, newValue))
}
}
28 changes: 12 additions & 16 deletions app/collins/callbacks/Callback.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ package collins.callbacks

import java.beans.PropertyChangeEvent

import play.api.Application
import play.api.Logger
import play.api.Play.current
import play.api.Plugin
import play.api.libs.concurrent.Akka

import akka.actor.ActorRef
import akka.actor.Props
import akka.routing.FromConfig

object Callback extends AsyncCallbackManager {
override protected val logger = Logger(getClass)

override val changeQueue = Akka.system.actorOf(Props(CallbackMessageQueue(pcs)).
withRouter(FromConfig()), name = "change_queue_processor")
override protected[this] var changeQueue: Option[ActorRef] = None

def setupCallbacks() {
if (CallbackConfig.enabled) {
logger.debug("Loading listeners")
changeQueue = Some(Akka.system.actorOf(Props(CallbackMessageQueue(pcs)).
withRouter(FromConfig()), name = "change_queue_processor"))
loadListeners()
}
}
Expand All @@ -28,7 +28,7 @@ object Callback extends AsyncCallbackManager {
removeListeners()
}

override protected def loadListeners(): Unit = {
private[this] def loadListeners(): Unit = {
CallbackConfig.registry.foreach { descriptor =>
logger.debug("Loading callback %s".format(descriptor.name))
setupCallback(descriptor)
Expand All @@ -38,26 +38,22 @@ object Callback extends AsyncCallbackManager {
protected def setupCallback(descriptor: CallbackDescriptor) {
val eventName = descriptor.on
val matchCondition = descriptor.matchCondition
val currentConfigMatches = CallbackMatcher(matchCondition.current, _.getNewValue)
val previousConfigMatches = CallbackMatcher(matchCondition.previous, _.getOldValue)
val handlesMatch = createMatchHandler(descriptor.matchAction)
val currentConfigMatches = CallbackMatcher(matchCondition.current, _.getNewValue.asInstanceOf[CallbackDatumHolder])
val previousConfigMatches = CallbackMatcher(matchCondition.previous, _.getOldValue.asInstanceOf[CallbackDatumHolder])
val handlesMatch = descriptor.matchAction
logger.debug("Setting up callback %s - %s %s".format(descriptor.name, eventName, handlesMatch))
on(eventName, new CallbackActionHandler {

override def apply(pce: PropertyChangeEvent) {
val prevMatch = previousConfigMatches(pce)
val curMatch = currentConfigMatches(pce)

logger.debug("Callback invocation : Name %s - On %s - Prev %b -- Curr %b".format(descriptor.name, pce.getPropertyName, prevMatch, curMatch))

if (prevMatch && curMatch) {
handlesMatch(pce)
}
}
})
}

protected def createMatchHandler(cfg: CallbackAction): CallbackActionHandler = {
cfg.actionType match {
case CallbackActionType.Exec =>
runners.ExecActionRunner(cfg.command)
}
}

}
13 changes: 8 additions & 5 deletions app/collins/callbacks/CallbackActionHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package collins.callbacks
import java.beans.PropertyChangeEvent

trait CallbackActionHandler {
def apply(pce: PropertyChangeEvent)

protected def getValue(pce: PropertyChangeEvent): AnyRef =
Option(pce.getNewValue).getOrElse(pce.getOldValue)
def apply(pce: PropertyChangeEvent)

protected def getValueOption(pce: PropertyChangeEvent): Option[AnyRef] = Option(getValue(pce))
protected def getValue(pce: PropertyChangeEvent): CallbackDatumHolder = {
pce.getNewValue match {
case CallbackDatumHolder(Some(_)) => pce.getNewValue.asInstanceOf[CallbackDatumHolder]
case CallbackDatumHolder(None) => pce.getOldValue.asInstanceOf[CallbackDatumHolder]
}
}

protected def maybeNullString(s: AnyRef): String =
Option(s).filter(_ != null).map(_.toString).getOrElse("null")
Option(s).map(_.toString).getOrElse("null")
}
41 changes: 0 additions & 41 deletions app/collins/callbacks/CallbackActionRunner.scala

This file was deleted.

6 changes: 3 additions & 3 deletions app/collins/callbacks/CallbackActionType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package collins.callbacks
abstract sealed class CallbackActionType
object CallbackActionType {
object Exec extends CallbackActionType

def apply(name: String): Option[CallbackActionType] = name.toLowerCase match {
case "exec" => Some(Exec)
case _ => None
case "exec" => Some(Exec)
case _ => None
}
}
36 changes: 36 additions & 0 deletions app/collins/callbacks/CallbackDatum.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package collins.callbacks

trait CallbackDatum {
def compare(z: Any): Boolean
}

case class StringDatum(val e: String) extends CallbackDatum {
override def compare(z: Any): Boolean = {
if (z == null)
return false
val ar = z.asInstanceOf[AnyRef]
if (ar.getClass != classOf[StringDatum])
false
else {
val other = ar.asInstanceOf[StringDatum]
this.e.equals(other.e)
}
}
}

case class CallbackDatumHolder(val datum: Option[CallbackDatum]) {
override def equals(z: Any): Boolean = {
if (z == null)
return false
val ar = z.asInstanceOf[AnyRef]
if (ar.getClass != classOf[CallbackDatumHolder])
false
else {
val other = ar.asInstanceOf[CallbackDatumHolder]
(this.datum, other.datum) match {
case (Some(s), Some(t)) => s.compare(t)
case _ => false
}
}
}
}
18 changes: 9 additions & 9 deletions app/collins/callbacks/CallbackDescriptor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,31 @@ import collins.util.config.TypesafeConfiguration

case class MatchConditional(name: String, state: Option[String], states: List[String])
case class CallbackConditional(previous: MatchConditional, current: MatchConditional)
case class CallbackAction(command: Seq[String], actionType: CallbackActionType = CallbackActionType.Exec)

case class CallbackDescriptor(name: String, override val source: TypesafeConfiguration)
extends ConfigAccessor
with ConfigSource
{
extends ConfigAccessor
with ConfigSource {
private[this] val logger = Logger("CallbackDescriptor.%s".format(name))

def on = getString("on")(ConfigValue.Required).get
def matchCondition = CallbackConditional(
MatchConditional(name, previousState, previousStates),
MatchConditional(name, currentState, currentStates)
)
MatchConditional(name, currentState, currentStates))
def matchAction = {
val cfg = getStringMap("action")
if (cfg.isEmpty) {
throw CallbackConfigException("action", name)
}
val atype = cfg.get("type").flatMap(CallbackActionType(_)).getOrElse(CallbackActionType.Exec)
val cmd = getCommand
CallbackAction(cmd, atype)
atype match {
case CallbackActionType.Exec =>
val cmd = getCommand
ExecActionRunner(cmd)
}
}

def validateConfig() {
logger.debug("validateConfig - event - %s".format(getString("on","NONE")))
logger.debug("validateConfig - event - %s".format(getString("on", "NONE")))
on
logger.debug("validateConfig - matchCondition - %s".format(matchCondition.toString))
matchCondition
Expand Down
4 changes: 1 addition & 3 deletions app/collins/callbacks/CallbackManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ trait CallbackManager {
protected val logger = Logger(getClass)
protected val pcs = new PropertyChangeSupport(this)

def fire(propertyName: String, oldValue: AnyRef, newValue: AnyRef) {
def fire(propertyName: String, oldValue: CallbackDatumHolder, newValue: CallbackDatumHolder) {
logger.debug("Firing %s".format(propertyName))
pcs.firePropertyChange(propertyName, oldValue, newValue)
}
Expand All @@ -22,8 +22,6 @@ trait CallbackManager {
})
}

protected def loadListeners(): Unit

protected def removeListeners() {
for (listener <- pcs.getPropertyChangeListeners()) pcs.removePropertyChangeListener(listener)
}
Expand Down
Loading

0 comments on commit 3c5b16c

Please sign in to comment.