Skip to content
This repository has been archived by the owner on Mar 11, 2019. It is now read-only.

Feature/reporter #36

Merged
merged 8 commits into from
Jan 28, 2015
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ We all stand on the shoulders of giants and get by with a little help from our f
* [Akka](http://akka.io) (version 2.3.6 under [Apache 2 license](http://www.apache.org/licenses/LICENSE-2.0)), for asynchronous processing
* [Typesage Config](https://github.com/typesafehub/config) (version 1.2.1 under [Apache 2 license](http://www.apache.org/licenses/LICENSE-2.0)), for reading configuration files.
* [Apache log4j2](http://logging.apache.org/log4j/2.x/) (version 2.1 under [Apache 2 license](http://www.apache.org/licenses/LICENSE-2.0)), for logging outside actors.
* [JFreeChart](http://www.jfree.org/jfreechart/) (version 1.0.19 under [LGPL license](https://www.gnu.org/licenses/lgpl.html)), for creation of interactive and animated charts.
* [Scala IO](http://jesseeichar.github.io/scala-io-doc/0.4.3/index.html#!/overview) (version 0.4.3), for an extensions of IO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(version 0.4.3 under [BSD-2-Clause license](http://www.scala-lang.org/license.html))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it's a BSD-3-Clause license


# Licence
This software is licensed under the *GNU Affero General Public License*, quoted below.
Expand Down
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ libraryDependencies ++= Seq(
"org.apache.logging.log4j" % "log4j-api" % "2.1",
"org.apache.logging.log4j" % "log4j-core" % "2.1",
"com.typesafe.akka" %% "akka-testkit" % "2.3.6" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test"
"org.scalatest" %% "scalatest" % "2.2.2" % "test",
"com.github.scala-incubator.io" %% "scala-io-core" % "0.4.3",
"com.github.scala-incubator.io" %% "scala-io-file" % "0.4.3",
"org.jfree" % "jfreechart" % "1.0.19"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference the libraries inside the README

)

scalacOptions ++= Seq(
Expand Down
61 changes: 49 additions & 12 deletions src/main/scala/org/powerapi/core/MonitorActors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,59 @@
package org.powerapi.core

import java.util.UUID
import akka.actor.SupervisorStrategy.{Directive, Resume}
import akka.actor.{Actor, PoisonPill, Props}

import scala.concurrent.duration.FiniteDuration

import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props }
import akka.actor.SupervisorStrategy.{ Directive, Resume }
import akka.event.LoggingReceive

import org.powerapi.core.ClockChannel.ClockTick
import scala.concurrent.duration.FiniteDuration
import org.powerapi.core.power.Power
import org.powerapi.module.PowerChannel.PowerReport
import org.powerapi.reporter.ReporterComponent

/**
* One child represents one monitor.
* Allows to publish messages in the right topics depending of the targets.
*
* @author Maxime Colmant <[email protected]>
* @author Loïc Huertas <[email protected]>
*/
class MonitorChild(eventBus: MessageBus,
muid: UUID,
frequency: FiniteDuration,
targets: List[Target]) extends ActorComponent {
targets: List[Target],
aggFunction: Seq[Power] => Power) extends ActorComponent {

import org.powerapi.core.ClockChannel.{startClock, stopClock, subscribeClockTick, unsubscribeClockTick}
import org.powerapi.core.MonitorChannel.{MonitorStart, MonitorStop, MonitorStopAll, publishMonitorTick}
import org.powerapi.module.PowerChannel.{ AggregateReport, render, subscribePowerReport, unsubscribePowerReport }

def receive: PartialFunction[Any, Unit] = LoggingReceive {
case MonitorStart(_, id, freq, targs) if muid == id && frequency == freq && targets == targs => start()
case MonitorStart(_, id, freq, targs, _) if muid == id && frequency == freq && targets == targs => start()
} orElse default

/**
* Running state.
*/
def running: Actor.Receive = LoggingReceive {
def running(aggR: AggregateReport): Actor.Receive = LoggingReceive {
case tick: ClockTick => produceMessages(tick)
case powerReport: PowerReport => aggregate(aggR, powerReport)
case MonitorStop(_, id) if muid == id => stop()
case _: MonitorStopAll => stop()
} orElse default

/**
* Start the clock, subscribe on the associated topic for receiving tick messages.
* Start the clock, subscribe on the associated topic for receiving tick messages
* and power reports.
*/
def start(): Unit = {
startClock(frequency)(eventBus)
subscribeClockTick(frequency)(eventBus)(self)
subscribePowerReport(muid)(eventBus)(self)
log.info("monitor is started, muid: {}", muid)
context.become(running)
context.become(running(AggregateReport(muid, aggFunction)))
}

/**
Expand All @@ -72,14 +84,29 @@ class MonitorChild(eventBus: MessageBus,
def produceMessages(tick: ClockTick): Unit = {
targets.foreach(target => publishMonitorTick(muid, target, tick)(eventBus))
}

/**
* Wait to retrieve power reports of all targets from a same monitor to aggregate them
* into once power report.
*/
def aggregate(aggR: AggregateReport, powerReport: PowerReport): Unit = {
aggR += powerReport
if (aggR.size == targets.size) {
render(aggR)(eventBus)
context.become(running(AggregateReport(muid, aggFunction)))
}
else
context.become(running(aggR))
}

/**
* Publish a request for stopping the clock which is responsible to produce the ticks at this frequency,
* stop to listen ticks and kill the monitor actor.
* stop to listen ticks and power reports and kill the monitor actor.
*/
def stop(): Unit = {
stopClock(frequency)(eventBus)
unsubscribeClockTick(frequency)(eventBus)(self)
unsubscribePowerReport(muid)(eventBus)(self)
log.info("monitor is stopped, muid: {}", muid)
self ! PoisonPill
}
Expand Down Expand Up @@ -131,7 +158,7 @@ class Monitors(eventBus: MessageBus) extends Supervisor {
*/
def start(msg: MonitorStart): Unit = {
val name = formatMonitorChildName(msg.muid)
val child = context.actorOf(Props(classOf[MonitorChild], eventBus, msg.muid, msg.frequency, msg.targets), name)
val child = context.actorOf(Props(classOf[MonitorChild], eventBus, msg.muid, msg.frequency, msg.targets, msg.aggFunction), name)
child ! msg
context.become(running)
}
Expand Down Expand Up @@ -160,9 +187,19 @@ class Monitors(eventBus: MessageBus) extends Supervisor {
/**
* This class is an interface for interacting directly with a MonitorChild actor.
*/
class Monitor(eventBus: MessageBus, targets: List[Target]) {
class Monitor(eventBus: MessageBus, _system: ActorSystem) {
val muid = UUID.randomUUID()


def reportTo(reporterComponent: Class[_ <: ReporterComponent], args: List[Any] = List()): Monitor = {
reportTo(_system.actorOf(Props(reporterComponent, args: _*)))
}
def reportTo(reporter: ActorRef): Monitor = {
import org.powerapi.module.PowerChannel.subscribeAggPowerReport

subscribeAggPowerReport(muid)(eventBus)(reporter)
this
}

def cancel(): Unit = {
import org.powerapi.core.MonitorChannel.stopMonitor

Expand Down
16 changes: 11 additions & 5 deletions src/main/scala/org/powerapi/core/MonitorChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@
package org.powerapi.core

import java.util.UUID
import akka.actor.ActorRef
import org.powerapi.core.ClockChannel.ClockTick

import scala.concurrent.duration.FiniteDuration

import akka.actor.ActorRef

/**
* Monitor channel and messages.
*
* @author Maxime Colmant <[email protected]>
*/
object MonitorChannel extends Channel {
import org.powerapi.module.PowerChannel.PowerReport
import org.powerapi.core.ClockChannel.ClockTick
import org.powerapi.core.power.Power

type M = MonitorMessage

Expand All @@ -58,11 +62,13 @@ object MonitorChannel extends Channel {
* @param muid: monitor unique identifier (MUID), which is at the origin of the report flow.
* @param frequency: clock frequency.
* @param targets: monitor targets.
* @param aggFunction: aggregate power estimation for a specific sample of power reports.
*/
case class MonitorStart(topic: String,
muid: UUID,
frequency: FiniteDuration,
targets: List[Target]) extends MonitorMessage
targets: List[Target],
aggFunction: Seq[Power] => Power) extends MonitorMessage

/**
* MonitorStop is represented as a dedicated type of message.
Expand Down Expand Up @@ -99,8 +105,8 @@ object MonitorChannel extends Channel {
/**
* External Methods used by the API (or a Monitor object) for interacting with the bus.
*/
def startMonitor(muid: UUID, frequency: FiniteDuration, targets: List[Target]): MessageBus => Unit = {
publish(MonitorStart(topic, muid, frequency, targets))
def startMonitor(muid: UUID, frequency: FiniteDuration, targets: List[Target], aggFunction: Seq[Power] => Power): MessageBus => Unit = {
publish(MonitorStart(topic, muid, frequency, targets, aggFunction))
}

def stopMonitor(muid: UUID): MessageBus => Unit = {
Expand Down
151 changes: 151 additions & 0 deletions src/main/scala/org/powerapi/core/power/Power.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* This software is licensed under the GNU Affero General Public License, quoted below.
*
* This file is a part of PowerAPI.
*
* Copyright (C) 2011-2014 Inria, University of Lille 1.
*
* PowerAPI is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* PowerAPI is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with PowerAPI.
*
* If not, please consult http://www.gnu.org/licenses/agpl-3.0.html.
*/
package org.powerapi.core.power

object Power {
def apply(value: Double, unit: PowerUnit): Power = new RawPower(value, unit)
def apply(value: Double, unit: String): Power = new RawPower(value, PowerUnitSystem(unit))

/**
* The natural ordering of powers matches the natural ordering for Double.
*/
implicit object PowerIsOrdered extends Ordering[Power] {
def compare(a: Power, b: Power) = a compare b
}
}

trait Power extends Ordered[Power] {
def value: Double
def unit: PowerUnit
def toMilliWatts: Double
def toWatts: Double
def toKiloWatts: Double
def toMegaWatts: Double
def toUnit(unit: PowerUnit): Double
def +(other: Power): Power
def -(other: Power): Power
def *(factor: Double): Power
def /(divisor: Double): Power
def min(other: Power): Power = if (this < other) this else other
def max(other: Power): Power = if (this > other) this else other

// Java API
def div(divisor: Double) = this / divisor
def gt(other: Power) = this > other
def gteq(other: Power) = this >= other
def lt(other: Power) = this < other
def lteq(other: Power) = this <= other
def minus(other: Power) = this - other
def mul(factor: Double) = this * factor
def plus(other: Power) = this + other
}

object RawPower {

implicit object RawPowerIsOrdered extends Ordering[RawPower] {
def compare(a: RawPower, b: RawPower) = a compare b
}

def apply(value: Double, unit: PowerUnit) = new RawPower(value, unit)
def apply(value: Double, unit: String) = new RawPower(value, PowerUnitSystem(unit))

// limit on abs. value of powers in their units
private final val max_mw = Double.MaxValue
private final val max_w = max_mw / 1000.0
private final val max_kw = max_w / 1000.0
private final val max_Mw = max_kw / 1000.0
}

/**
* Defines a power value.
*
* @author Loïc Huertas <[email protected]>
* @author Romain Rouvoy <[email protected]>
*/
final class RawPower(val value: Double, val unit: PowerUnit) extends Power {
import org.apache.logging.log4j.LogManager
import RawPower._

private val log = LogManager.getLogger

private[this] def bounded(max: Double) = 0.0 <= value && value <= max

require(unit match {
case MILLIWATTS => bounded(max_mw)
case WATTS => bounded(max_w)
case KILOWATTS => bounded(max_kw)
case MEGAWATTS => bounded(max_Mw)
case _ =>
val v = MEGAWATTS.convert(value, unit)
0.0 <= v && v <= max_Mw
}, "Power value is limited to 1.79e308 mW and cannot be negative")

def toMilliWatts = unit.toMilliWatts(value)
def toWatts = unit.toWatts(value)
def toKiloWatts = unit.toKiloWatts(value)
def toMegaWatts = unit.toMegaWatts(value)
def toUnit(u: PowerUnit) = toMilliWatts / MILLIWATTS.convert(1, u)

override def toString() = s"$value $unit"

def compare(other: Power) = toMilliWatts compare other.toMilliWatts

private[this] def safeAdd(a: Double, b: Double): Double = {
if ((b > 0.0) && (a > Double.MaxValue - b)) throw new IllegalArgumentException("double overflow")
if ((b < 0.0) && (a < -b)) throw new IllegalArgumentException("negative power cannot exists")
a + b
}
private[this] def add(otherValue: Double, otherUnit: PowerUnit): Power = {
val commonUnit = if (otherUnit.convert(1, unit) < 1.0) unit else otherUnit
val resultValue = safeAdd(commonUnit.convert(value, unit), commonUnit.convert(otherValue, otherUnit))
new RawPower(resultValue, commonUnit)
}

def +(other: Power) = add(other.value, other.unit)
def -(other: Power) = add(-other.value, other.unit)

private[this] def safeMul(a: Double): Double = {
if (a.isInfinite) throw new IllegalArgumentException("multiplication's result is an infinite value")
if (a.isNaN) throw new IllegalArgumentException("multiplication's result is an undefined value")
if (a > Double.MaxValue) throw new IllegalArgumentException("double overflow")
if (a < 0.0) throw new IllegalArgumentException("negative power cannot exists")
a
}

def *(factor: Double) = new RawPower({
if (factor.isInfinite || factor.isNaN) throw new IllegalArgumentException("factor must be a finite and defined value")
else safeMul(value * factor)
}, unit
)
def /(divisor: Double) = new RawPower({
if (divisor.isInfinite || divisor.isNaN) throw new IllegalArgumentException("divisor must be a finite and defined value")
else safeMul(value / divisor)
}, unit
)

override def equals(other: Any) = other match {
case x: RawPower => toMilliWatts == x.toMilliWatts
case _ => super.equals(other)
}
}

Loading