Skip to content
This repository has been archived by the owner on Aug 9, 2024. It is now read-only.

Commit

Permalink
Rename notify to notifyLogger (close #92)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzats committed Aug 25, 2017
1 parent 560b5aa commit 9025634
Show file tree
Hide file tree
Showing 14 changed files with 51 additions and 62 deletions.
17 changes: 8 additions & 9 deletions src/main/scala/com.snowplowanalytics.sauna/actors/Mediator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,37 +130,37 @@ class Mediator(saunaSettings: SaunaSettings) extends Actor {
val updatedState = state.addRejecter(rejecter)
processedEvents.put(event, updatedState)
if (updatedState.rejecters.size == responderActors.size) {
notify(s"Warning: observer event from [${event.id}] was rejected by all responders")
notifyLogger(s"Warning: observer event from [${event.id}] was rejected by all responders")
}
case None =>
processedEvents.put(event, MessageState.empty.addRejecter(rejecter))
if (responderActors.size == 1) {
notify(s"Warning: observer event from [${event.id}] was rejected by single running responder")
notifyLogger(s"Warning: observer event from [${event.id}] was rejected by single running responder")
}
}

// Mutate `processedEvents` primary state and clean-up resources
case result: Responder.ResponderResult =>
notify(result.message)
notifyLogger(result.message)
val original = result.source.source
processedEvents.get(original) match {
case Some(state) =>
val updatedState = state.addFinisher(sender())
updatedState.check match {
case AllFinished(actorStamps) =>
notify(s"All actors finished processing message [${original.id}]. Deleting")
notifyLogger(s"All actors finished processing message [${original.id}]. Deleting")
original match {
case l: LocalFilePublished => original.observer ! Observer.DeleteLocalFile(l.file)
case s: S3FilePublished => original.observer ! Observer.DeleteS3Object(s.id, s.s3Source)
case r: KinesisRecordReceived => ()
}
processedEvents.remove(original)
case InProcess(stillWorking) =>
notify(s"Some actors still processing message [${original.id}]")
notifyLogger(s"Some actors still processing message [${original.id}]")
processedEvents.put(original, updatedState)
}
case None =>
notify(s"Mediator received unknown (not-accepted) ResponderResult [$result]")
notifyLogger(s"Mediator received unknown (not-accepted) ResponderResult [$result]")
}

// Forward notification
Expand All @@ -169,7 +169,7 @@ class Mediator(saunaSettings: SaunaSettings) extends Actor {

// Check state
case Tick =>
getWarnings.foreach(notify)
getWarnings.foreach(notifyLogger)
}

override def postStop(): Unit = {
Expand All @@ -196,8 +196,7 @@ class Mediator(saunaSettings: SaunaSettings) extends Actor {
}
}

def notify(message: String): Unit =
logger ! Notification(message)
def notifyLogger(message: String): Unit = logger ! Notification(message)
}

object Mediator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import Observer.KinesisRecordReceived
class AmazonKinesisObserver(streamName: String, kclConfig: KinesisClientLibConfiguration) extends Actor with Observer {
override def preStart: Unit = {
super.preStart()
notify("Started Kinesis Observer")
notifyLogger("Started Kinesis Observer")

/**
* [[KinesisRecordReader]] instance that will return processed records
Expand All @@ -55,7 +55,7 @@ class AmazonKinesisObserver(streamName: String, kclConfig: KinesisClientLibConfi
*/
KCLWorkerRunner(kclConfig).runAsyncSingleRecordProcessor[Record](1 minute) { record: Record =>
Future {
notify(s"Received Kinesis Record from $streamName")
notifyLogger(s"Received Kinesis Record from $streamName")
context.parent ! KinesisRecordReceived(streamName, record.getSequenceNumber, record.getData, self)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class AmazonS3Observer(s3: S3, sqs: SQS, queue: Queue) extends Actor with Observ

def receive: Receive = {
case filePublished: S3FilePublished =>
notify(s"Detected new S3 file [${filePublished.id}]")
notifyLogger(s"Detected new S3 file [${filePublished.id}]")
context.parent ! filePublished

case deleteFile: DeleteS3Object =>
Expand All @@ -79,7 +79,7 @@ class AmazonS3Observer(s3: S3, sqs: SQS, queue: Queue) extends Actor with Observ
self ! S3FilePublished(decodedFileName, s3Source, self)

case None =>
notify(s"Unknown message [$message] was published")
notifyLogger(s"Unknown message [$message] was published")
sqs.delete(message)
}
}
Expand All @@ -93,7 +93,7 @@ class AmazonS3Observer(s3: S3, sqs: SQS, queue: Queue) extends Actor with Observ
throwable match {
case e: InterruptedException =>
monitor.stop()
notify("SqsMonitor thread has been stopped")
notifyLogger("SqsMonitor thread has been stopped")
case e: AbortedException => // Not sure why SQSClient throws it on actor shutdown
monitor.stop()
monitorThread.interrupt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class LocalObserver(root: Path) extends Actor with Observer {
sizes.foreach { case (path, size) =>
updateFile(path, size) match {
case Ready =>
notify(s"Detected new local file [${path.toString}]")
notifyLogger(s"Detected new local file [${path.toString}]")
context.parent ! LocalFilePublished(path, self)
case Updated => ()
case Error(_, _) => ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ trait Observer {
* Send message to supervisor trait to forward it to loggers
* This means observer should also be a child of supvervisor
*/
def notify(message: String): Unit = {
context.parent ! Notification(message)
}
def notifyLogger(message: String): Unit = context.parent ! Notification(message)
}

object Observer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ trait Responder[OE <: ObserverEvent, RE <: ResponderEvent] extends Actor {
/**
* Log unstructured notification
*/
def notify(message: String): Unit = {
logger ! Notification(message)
}
def notifyLogger(message: String): Unit = logger ! Notification(message)
}

object Responder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import akka.actor.{ActorRef, Props}
// sauna
import apis.Hipchat
import apis.Hipchat._
import loggers.Logger.Notification
import observers.Observer._
import responders.Responder._
import responders.hipchat.SendRoomNotificationResponder._
Expand All @@ -52,11 +51,11 @@ class SendRoomNotificationResponder(hipchat: Hipchat, val logger: ActorRef) exte
case Right(_) =>
Some(RoomNotificationReceived(data, e))
case Left(error) =>
logger ! Notification(error)
notifyLogger(error)
None
}
case Left(error) =>
logger ! Notification(error)
notifyLogger(error)
None
}
case _ => None
Expand All @@ -74,8 +73,8 @@ class SendRoomNotificationResponder(hipchat: Hipchat, val logger: ActorRef) exte
if (message.status >= 200 && message.status <= 204)
context.parent ! RoomNotificationSent(event, s"Successfully sent HipChat notification: $message")
else
logger ! Notification(s"Unexpected response from HipChat: ${message.body}")
case Failure(error) => logger ! Notification(s"Error while sending HipChat notification: $error")
notifyLogger(s"Unexpected response from HipChat: ${message.body}")
case Failure(error) => notifyLogger(s"Error while sending HipChat notification: $error")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class DcpResponder(optimizely: Optimizely, importRegion: String, val logger: Act
case Right(event) => Some(event)
case Left(Some(error)) =>
// TODO: cases where path is only partly correct should be handled differently, without deleting source file
notify(error)
notifyLogger(error)
None
case Left(None) =>
None
Expand All @@ -100,7 +100,7 @@ class DcpResponder(optimizely: Optimizely, importRegion: String, val logger: Act
}
.onComplete {
case Success(message) => context.parent ! CustomersProfilesUploaded(event, message)
case Failure(error) => notify(error.toString)
case Failure(error) => notifyLogger(error.toString)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class TargetingListResponder(optimizely: Optimizely, val logger: ActorRef) exten
Future.sequence(iterables).map(_.foreach(logResponse)).onComplete {
case Success(_) =>
context.parent ! TargetingListUploaded(event, s"Targeting list from ${event.source.id} has been successfully published")
case Failure(error) => notify(error.toString)
case Failure(error) => notifyLogger(error.toString)
}
case None => notify(s"Cannot read file [${event.source.id}]")
case None => notifyLogger(s"Cannot read file [${event.source.id}]")
}
}

Expand Down Expand Up @@ -110,15 +110,15 @@ class TargetingListResponder(optimizely: Optimizely, val logger: ActorRef) exten
// log results
logger ! Manifestation(id.toString, name, status, description, lastModified)
if (status == 201) {
notify(s"Successfully uploaded targeting lists with name [$name]")
notifyLogger(s"Successfully uploaded targeting lists with name [$name]")
} else {
notify(s"Unable to upload targeting list with name [$name] : [${response.body}]")
notifyLogger(s"Unable to upload targeting list with name [$name] : [${response.body}]")
}

} catch {
case e: JsonParseException =>
logger ! Manifestation(defaultId, defaultName, status, defaultDescription, defaultLastModified)
notify(s"Problems while parsing Optimizely API response. See [${response.body}]")
notifyLogger(s"Problems while parsing Optimizely API response. See [${response.body}]")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import play.api.libs.json.Json
// sauna
import apis.PagerDuty
import apis.PagerDuty.PagerDutyEvent
import loggers.Logger.Notification
import observers.Observer._
import responders.Responder._
import responders.pagerduty.CreateEventResponder._
Expand All @@ -52,11 +51,11 @@ class CreateEventResponder(pagerDuty: PagerDuty, val logger: ActorRef) extends R
case Right(_) =>
Some(PagerDutyEventReceived(data, e))
case Left(error) =>
logger ! Notification(error)
notifyLogger(error)
None
}
case Left(error) =>
logger ! Notification(error)
notifyLogger(error)
None
}
case _ => None
Expand All @@ -69,8 +68,8 @@ class CreateEventResponder(pagerDuty: PagerDuty, val logger: ActorRef) extends R
if (message.status == 200)
context.parent ! PagerDutyEventSent(event, s"Successfully created PagerDuty event: ${message.body}")
else
logger ! Notification(s"Unexpected response from PagerDuty: ${message.body}")
case Failure(error) => logger ! Notification(s"Error while creating PagerDuty event: $error")
notifyLogger(s"Unexpected response from PagerDuty: ${message.body}")
case Failure(error) => notifyLogger(s"Error while creating PagerDuty event: $error")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RecipientsResponder(sendgrid: Sendgrid, val logger: ActorRef) extends Resp
case pathRegexp(attrs) if attrs.split(",").contains("email") =>
Some(RecipientsPublished(attrs.split(",").toList, e))
case pathRegexp(_) =>
notify(s"RecipientsResponder: attribute 'email' for [${observerEvent.id}] must be included")
notifyLogger(s"RecipientsResponder: attribute 'email' for [${observerEvent.id}] must be included")
None
case _ => None
}
Expand All @@ -82,9 +82,9 @@ class RecipientsResponder(sendgrid: Sendgrid, val logger: ActorRef) extends Resp
def process(event: RecipientsPublished): Unit = {
event.source.streamContent match {
case Some(content) =>
worker ! Delegate(RecipientsChunks.parse(content, event, notify _))
worker ! Delegate(RecipientsChunks.parse(content, event, notifyLogger _))
case None =>
notify(s"FAILURE: event's [${event.source.id}] source doesn't exist")
notifyLogger(s"FAILURE: event's [${event.source.id}] source doesn't exist")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ class RecipientsWorker(apiWrapper: Sendgrid) extends Actor {
chunks.customFields = Some(orderedFields)
self ! Start(chunks)
case Left(error) =>
notify(error)
notifyLogger(error)
}
case Success(Left(error)) =>
notify("Cannot extract custom fields information: " + error)
notifyLogger("Cannot extract custom fields information: " + error)
case Failure(exception) =>
notify(exception.toString)
notifyLogger(exception.toString)
}

// Enqueue chunks
Expand Down Expand Up @@ -118,7 +118,7 @@ class RecipientsWorker(apiWrapper: Sendgrid) extends Actor {
.postRecipients(json)
.onComplete {
case Success(response) => processResponse(chunk.length, response.body)
case Failure(err) => notify(err.toString)
case Failure(err) => notifyLogger(err.toString)
}
}

Expand Down Expand Up @@ -152,29 +152,27 @@ class RecipientsWorker(apiWrapper: Sendgrid) extends Actor {
errors.map(_.value).find(_.apply("error_indices").as[Seq[Int]].contains(errorIndex)) match {
case Some(error) =>
val reason = error.apply("message").as[String]
notify(s"Error $errorIndex caused due to [$reason]")
notifyLogger(s"Error $errorIndex caused due to [$reason]")

case None =>
notify(s"Unable to find reason for error [$errorIndex]")
notifyLogger(s"Unable to find reason for error [$errorIndex]")
}
}

if (errorCount + newCount + updatedCount != total) {
notify("Several records disappeared. It's rare Sendgrid bug. Double-check you input")
notifyLogger("Several records disappeared. It's rare Sendgrid bug. Double-check you input")
}

} catch {
case NonFatal(e) =>
notify(s"Got exception [${e.getMessage}] while parsing Sendgrid's response")
notifyLogger(s"Got exception [${e.getMessage}] while parsing Sendgrid's response")
}
}

/**
* Helper method to send notifications
*/
def notify(message: String): Unit = {
context.parent ! Notification(message)
}
def notifyLogger(message: String): Unit = context.parent ! Notification(message)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import play.api.libs.json.Json
// sauna
import apis.Sendgrid
import apis.Sendgrid._
import loggers.Logger.Notification
import observers.Observer.{ObserverCommandEvent, ObserverEvent}
import responders.Responder.{ResponderEvent, ResponderResult}
import responders.sendgrid.SendEmailResponder._
Expand All @@ -52,11 +51,11 @@ class SendEmailResponder(sendgrid: Sendgrid, val logger: ActorRef) extends Respo
case Right(_) =>
Some(SendgridEmailReceived(data, e))
case Left(error) =>
logger ! Notification(error)
notifyLogger(error)
None
}
case Left(error) =>
logger ! Notification(error)
notifyLogger(error)
None
}
case _ => None
Expand All @@ -69,8 +68,8 @@ class SendEmailResponder(sendgrid: Sendgrid, val logger: ActorRef) extends Respo
if (message.status >= 200 && message.status <= 299)
context.parent ! SendgridEmailSent(event, s"Successfully sent Sendgrid email!")
else
logger ! Notification(s"Unexpected response from Sendgrid: ${message.body}")
case Failure(error) => logger ! Notification(s"Error while sending Sendgrid message: $error")
notifyLogger(s"Unexpected response from Sendgrid: ${message.body}")
case Failure(error) => notifyLogger(s"Error while sending Sendgrid message: $error")
}
}

Expand Down
Loading

0 comments on commit 9025634

Please sign in to comment.