From a86431166e2e094c3332fe191bbed9dc0d65d624 Mon Sep 17 00:00:00 2001 From: Michel Zimmer Date: Fri, 24 Jun 2022 14:17:28 +0200 Subject: [PATCH] Make timings configurable - Add configurable measurement Cassandra TTLs - Add configurable Cassandra Query timeouts - Add names and debug logging to schedulers - Make scheduler intervals configurable - Add startup complete message - Add graceful shutdown via stdin if enabled --- Dockerfile | 4 +- build.sbt | 8 ++- .../in/scheduler/AggregationScheduler.scala | 8 ++- .../MeasurementCassandraRepository.scala | 11 ++- .../neuland/bandwhichd/server/boot/App.scala | 67 ++++++++++++------- .../server/boot/Configuration.scala | 39 +++++++++-- .../server/lib/scheduling/Schedule.scala | 1 + .../lib/scheduling/SchedulerOperator.scala | 14 ++-- .../lib/scheduling/SchedulersOperator.scala | 3 +- .../server/BandwhichDServerApiV1Spec.scala | 13 ++-- .../MeasurementCassandraRepositorySpec.scala | 8 +-- .../server/boot/ConfigurationFixtures.scala | 20 ++++++ 12 files changed, 141 insertions(+), 55 deletions(-) create mode 100644 src/test/scala/de/neuland/bandwhichd/server/boot/ConfigurationFixtures.scala diff --git a/Dockerfile b/Dockerfile index a6b0e04..f18975b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,11 +26,11 @@ LABEL org.opencontainers.image.vendor="neuland – Büro für Informatik GmbH" LABEL org.opencontainers.image.licenses="Apache-2.0" LABEL org.opencontainers.image.title="bandwhichd-server" LABEL org.opencontainers.image.description="bandwhichd server collecting measurements and calculating statistics" -LABEL org.opencontainers.image.version="0.5.1" +LABEL org.opencontainers.image.version="0.6.0-rc1" USER guest ENTRYPOINT ["/opt/java/openjdk/bin/java"] CMD ["-jar", "/opt/bandwhichd-server.jar"] EXPOSE 8080 HEALTHCHECK --interval=5s --timeout=1s --start-period=2s --retries=2 \ CMD wget --spider http://localhost:8080/v1/health || exit 1 -COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.5.1.jar /opt/bandwhichd-server.jar \ No newline at end of file +COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.6.0-rc1.jar /opt/bandwhichd-server.jar \ No newline at end of file diff --git a/build.sbt b/build.sbt index bbeef60..e9341e1 100644 --- a/build.sbt +++ b/build.sbt @@ -2,11 +2,16 @@ lazy val root = (project in file(".")) .settings( organization := "de.neuland-bfi", name := "bandwhichd-server", - version := "0.5.1", + version := "0.6.0-rc1", scalaVersion := "3.1.2", Compile / scalaSource := baseDirectory.value / "src" / "main" / "scala", Test / scalaSource := baseDirectory.value / "src" / "test" / "scala", Test / fork := true, + run / fork := true, + run / connectInput := true, + javaOptions := Seq( + "-Dorg.slf4j.simpleLogger.log.de.neuland.bandwhichd=debug" + ), ThisBuild / assemblyMergeStrategy := { case PathList(ps @ _*) if ps.last endsWith "module-info.class" => MergeStrategy.discard @@ -25,6 +30,7 @@ lazy val root = (project in file(".")) libraryDependencies += "io.circe" %% "circe-core" % "0.14.2", libraryDependencies += "io.circe" %% "circe-parser" % "0.14.2", libraryDependencies += "org.typelevel" %% "cats-effect" % "3.3.12", + libraryDependencies += "org.typelevel" %% "log4cats-slf4j" % "2.3.2", libraryDependencies += "org.http4s" %% "http4s-circe" % "1.0.0-M32", libraryDependencies += "org.http4s" %% "http4s-core" % "1.0.0-M32", libraryDependencies += "org.http4s" %% "http4s-dsl" % "1.0.0-M32", diff --git a/src/main/scala/de/neuland/bandwhichd/server/adapter/in/scheduler/AggregationScheduler.scala b/src/main/scala/de/neuland/bandwhichd/server/adapter/in/scheduler/AggregationScheduler.scala index dc3a4fe..188085a 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/adapter/in/scheduler/AggregationScheduler.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/adapter/in/scheduler/AggregationScheduler.scala @@ -5,15 +5,19 @@ import de.neuland.bandwhichd.server.application.StatsApplicationService import de.neuland.bandwhichd.server.lib.scheduling.{Schedule, Scheduler, Work} import scala.concurrent.duration.{FiniteDuration, SECONDS} +import de.neuland.bandwhichd.server.boot.Configuration +import scala.jdk.DurationConverters.* class AggregationScheduler[F[_]: Monad]( + private val configuration: Configuration, private val statsApplicationService: StatsApplicationService[F] ) extends Scheduler[F] { override def schedule: F[Schedule[F]] = Monad[F].pure( Schedule.Pausing( - FiniteDuration(10, SECONDS), - Work({ statsApplicationService.recalculate }) + getClass.getSimpleName, + configuration.aggregationSchedulerInterval.toScala, + Work(statsApplicationService.recalculate) ) ) } diff --git a/src/main/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraRepository.scala b/src/main/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraRepository.scala index ece55ab..3f2b6aa 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraRepository.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraRepository.scala @@ -24,13 +24,19 @@ class MeasurementCassandraRepository[F[_]: Async]( override def record(measurement: Measurement[Timing]): F[Unit] = cassandraContext.executeRawExpectNoRow( SimpleStatement - .builder("insert into measurements json ?") + .builder("insert into measurements json ? using ttl ?") .addPositionalValues( Encoder[Measurement[Timing]] .apply(measurement) - .noSpaces + .noSpaces, + measurement match + case _: Measurement.NetworkConfiguration => + configuration.measurementNetworkConfigurationTTL.toSeconds.toInt + case _: Measurement.NetworkUtilization => + configuration.measurementNetworkUtilizationTTL.toSeconds.toInt ) .setKeyspace(configuration.measurementsKeyspace) + .setTimeout(configuration.recordMeasurementQueryTimeout) .build() ) @@ -40,6 +46,7 @@ class MeasurementCassandraRepository[F[_]: Async]( SimpleStatement .builder("select json * from measurements") .setKeyspace(configuration.measurementsKeyspace) + .setTimeout(configuration.getAllMeasurementsQueryTimeout) .build() ) .flatMap(reactiveRow => diff --git a/src/main/scala/de/neuland/bandwhichd/server/boot/App.scala b/src/main/scala/de/neuland/bandwhichd/server/boot/App.scala index 4a44420..839174b 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/boot/App.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/boot/App.scala @@ -26,7 +26,11 @@ import org.http4s.dsl.io.* import org.http4s.ember.server.EmberServerBuilder import org.http4s.server.{Router, Server} import org.http4s.{HttpApp, HttpRoutes} +import org.typelevel.log4cats.slf4j.Slf4jLogger +import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} +import java.io.{BufferedReader, InputStreamReader} +import java.util.Scanner import scala.io.StdIn class App[F[_]: Async]( @@ -56,6 +60,19 @@ class App[F[_]: Async]( statsRepository = statsRepository ) + // in scheduling + val aggregationScheduler: Scheduler[F] = + AggregationScheduler[F]( + configuration = configuration, + statsApplicationService = statsApplicationService + ) + + // scheduling + val schedulersOperator: SchedulersOperator[F] = + SchedulersOperator[F]( + aggregationScheduler + ) + // in http val healthController: HealthController[F] = HealthController[F]() @@ -68,12 +85,6 @@ class App[F[_]: Async]( statsApplicationService = statsApplicationService ) - // in scheduling - val aggregationScheduler: Scheduler[F] = - AggregationScheduler[F]( - statsApplicationService = statsApplicationService - ) - // http val routes: Routes[F] = Routes[F]( @@ -84,17 +95,11 @@ class App[F[_]: Async]( // org.http4s.syntax.KleisliResponseOps#orNotFound val httpApp: HttpApp[F] = routes.routes.orNotFound - - // scheduling - val schedulersOperator: Operator[F] = - SchedulersOperator[F]( - aggregationScheduler - ) } object App extends IOApp { override def run(args: List[String]): IO[ExitCode] = { - val schedulerOutcomeR = for { + val outcomeR = for { configuration <- Configuration.resource[IO] cassandraContext <- CassandraContext.resource[IO](configuration) _ <- Resource.eval( @@ -102,24 +107,34 @@ object App extends IOApp { ) main = App[IO](cassandraContext, configuration) schedulerOutcomeF <- main.schedulersOperator.resource - _ <- EmberServerBuilder + server <- EmberServerBuilder .default[IO] .withHostOption(None) .withHttpApp(main.httpApp) .build - } yield schedulerOutcomeF + logger <- Resource.eval(Slf4jLogger.create[IO]) + _ <- Resource.eval( + logger.info( + s"bandwhichd-server startup complete - ${main.schedulersOperator.size} scheduler - listening on ${server.address}" + ) + ) + lineF <- Resource.eval(IO.delay { + for { + line <- IO.interruptible { + StdIn.readLine() + } + _ <- if (line == null) IO.never else IO.unit + } yield () + }) + } yield schedulerOutcomeF.race(lineF) - for { - schedulerOutcome <- schedulerOutcomeR.use(identity) - } yield { - schedulerOutcome match - case Outcome.Succeeded(_) => - ExitCode.Success - case Outcome.Errored(throwable) => - throwable.printStackTrace() - ExitCode.Error - case Outcome.Canceled() => - ExitCode.Error + outcomeR.use { outcomeF => + for { + outcome <- outcomeF + } yield outcome match + case Right(_) => ExitCode.Success + case Left(Outcome.Succeeded(_)) => ExitCode.Success + case _ => ExitCode.Error } } } diff --git a/src/main/scala/de/neuland/bandwhichd/server/boot/Configuration.scala b/src/main/scala/de/neuland/bandwhichd/server/boot/Configuration.scala index da0fc9e..06e148f 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/boot/Configuration.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/boot/Configuration.scala @@ -6,19 +6,31 @@ import cats.{Defer, Monad, MonadError, Traverse} import com.comcast.ip4s.{Dns, Host, IpAddress, SocketAddress} import com.datastax.oss.driver.api.core.CqlIdentifier +import java.time.Duration +import scala.concurrent.duration.FiniteDuration import scala.util.{Failure, Success, Try} case class Configuration( contactPoints: Seq[SocketAddress[IpAddress]], localDatacenter: String, - measurementsKeyspace: CqlIdentifier + measurementsKeyspace: CqlIdentifier, + measurementNetworkConfigurationTTL: Duration, + measurementNetworkUtilizationTTL: Duration, + recordMeasurementQueryTimeout: Duration, + getAllMeasurementsQueryTimeout: Duration, + aggregationSchedulerInterval: Duration ) object Configuration { def resolve[F[_]: Sync]( contactPoints: String, localDatacenter: String, - measurementsKeyspace: String + measurementsKeyspace: String, + measurementNetworkConfigurationTTL: String, + measurementNetworkUtilizationTTL: String, + recordMeasurementQueryTimeout: String, + getAllMeasurementsQueryTimeout: String, + aggregationSchedulerInterval: String ): F[Configuration] = { val maybeHostnameContactPoints = contactPoints @@ -48,7 +60,17 @@ object Configuration { } yield Configuration( contactPoints = ipAddressContactPoints, localDatacenter = localDatacenter, - measurementsKeyspace = CqlIdentifier.fromCql(measurementsKeyspace) + measurementsKeyspace = CqlIdentifier.fromCql(measurementsKeyspace), + measurementNetworkConfigurationTTL = + Duration.parse(measurementNetworkConfigurationTTL), + measurementNetworkUtilizationTTL = + Duration.parse(measurementNetworkUtilizationTTL), + recordMeasurementQueryTimeout = + Duration.parse(recordMeasurementQueryTimeout), + getAllMeasurementsQueryTimeout = + Duration.parse(getAllMeasurementsQueryTimeout), + aggregationSchedulerInterval = + Duration.parse(aggregationSchedulerInterval) ) } @@ -56,7 +78,16 @@ object Configuration { resolve( scala.util.Properties.envOrElse("CONTACT_POINTS", "localhost:9042"), scala.util.Properties.envOrElse("LOCAL_DATACENTER", "datacenter1"), - scala.util.Properties.envOrElse("MEASUREMENTS_KEYSPACE", "bandwhichd") + scala.util.Properties.envOrElse("MEASUREMENTS_KEYSPACE", "bandwhichd"), + scala.util.Properties + .envOrElse("MEASUREMENT_NETWORK_CONFIGURATION_TTL", "PT2H"), + scala.util.Properties + .envOrElse("MEASUREMENT_NETWORK_UTILIZATION_TTL", "PT2H"), + scala.util.Properties + .envOrElse("RECORD_MEASUREMENT_QUERY_TIMEOUT", "PT2S"), + scala.util.Properties + .envOrElse("GET_ALL_MEASUREMENTS_QUERY_TIMEOUT", "PT8S"), + scala.util.Properties.envOrElse("AGGREGATION_SCHEDULER_INTERVAL", "PT10S") ) def resource[F[_]: Sync]: Resource[F, Configuration] = diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Schedule.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Schedule.scala index 57bac47..0ca60cc 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Schedule.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Schedule.scala @@ -8,6 +8,7 @@ sealed trait Schedule[F[_]] { object Schedule { case class Pausing[F[_]]( + name: String, pauseDuration: FiniteDuration, work: Work[F] ) extends Schedule[F] diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulerOperator.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulerOperator.scala index a30a3a5..431a201 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulerOperator.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulerOperator.scala @@ -2,6 +2,7 @@ package de.neuland.bandwhichd.server.lib.scheduling import cats.effect.* import cats.implicits.* +import org.typelevel.log4cats.slf4j.Slf4jLogger import java.util.concurrent.atomic.AtomicBoolean import scala.util.{Failure, Success, Try} @@ -11,15 +12,18 @@ class SchedulerOperator[F[_]: Async]( ) extends Operator[F] { override def resource: Resource[F, F[Outcome[F, Throwable, Unit]]] = for { - schedule <- Resource.make( - Async[F].defer(scheduler.schedule) - )(_ => Async[F].pure(())) + logger <- Resource.eval(Slf4jLogger.create[F]) + schedule <- Resource.eval(Async[F].defer(scheduler.schedule)) outcome: F[Outcome[F, Throwable, Unit]] <- Async[F].background { schedule match - case Schedule.Pausing(pauseDuration, work) => + case Schedule.Pausing(name, pauseDuration, work) => def cycle: F[Unit] = for { - _ <- work.run + _ <- logger.debug(s"Running $name") + _ <- Async[F].onError(work.run) { case e => + logger.error(e)(s"Scheduler $name failed") + } + _ <- logger.debug(s"Pausing $name for $pauseDuration") _ <- Async[F].sleep(pauseDuration) _ <- cycle } yield () diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulersOperator.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulersOperator.scala index fcd44a6..0c1dda5 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulersOperator.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulersOperator.scala @@ -5,6 +5,8 @@ import cats.implicits.* class SchedulersOperator[F[_]: Async](private val schedulers: Scheduler[F]*) extends Operator[F] { + def size: Int = schedulers.size + override def resource: Resource[F, F[Outcome[F, Throwable, Unit]]] = schedulers .map(scheduler => SchedulerOperator(scheduler)) @@ -37,5 +39,4 @@ class SchedulersOperator[F[_]: Async](private val schedulers: Scheduler[F]*) } } } - } diff --git a/src/test/scala/de/neuland/bandwhichd/server/BandwhichDServerApiV1Spec.scala b/src/test/scala/de/neuland/bandwhichd/server/BandwhichDServerApiV1Spec.scala index ea66c05..a968317 100644 --- a/src/test/scala/de/neuland/bandwhichd/server/BandwhichDServerApiV1Spec.scala +++ b/src/test/scala/de/neuland/bandwhichd/server/BandwhichDServerApiV1Spec.scala @@ -9,7 +9,12 @@ import de.neuland.bandwhichd.server.adapter.in.v1.message.{ ApiV1MessageV1Fixtures, MessageController } -import de.neuland.bandwhichd.server.boot.{App, Configuration, Routes} +import de.neuland.bandwhichd.server.boot.{ + App, + Configuration, + ConfigurationFixtures, + Routes +} import de.neuland.bandwhichd.server.domain.measurement.MeasurementFixtures import de.neuland.bandwhichd.server.lib.cassandra.CassandraContext import de.neuland.bandwhichd.server.lib.test.cassandra.CassandraContainer @@ -31,11 +36,7 @@ class BandwhichDServerApiV1Spec override val container: CassandraContainer = CassandraContainer() private def configuration: Configuration = - Configuration( - contactPoints = Seq(container.container.socket), - localDatacenter = container.datacenter, - measurementsKeyspace = CqlIdentifier.fromCql("bandwhichd") - ) + ConfigurationFixtures.testDefaults(container) "bandwhichd-server v1 API" should { "have health status" in { diff --git a/src/test/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraRepositorySpec.scala b/src/test/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraRepositorySpec.scala index baf5839..766dacc 100644 --- a/src/test/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraRepositorySpec.scala +++ b/src/test/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraRepositorySpec.scala @@ -7,7 +7,7 @@ import com.datastax.oss.driver.api.core.CqlIdentifier import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, SimpleStatement} import com.dimafeng.testcontainers.ForAllTestContainer import de.neuland.bandwhichd.server.adapter.out.CassandraMigration -import de.neuland.bandwhichd.server.boot.Configuration +import de.neuland.bandwhichd.server.boot.{Configuration, ConfigurationFixtures} import de.neuland.bandwhichd.server.domain.measurement.{ MeasurementFixtures, MeasurementRepository, @@ -30,11 +30,7 @@ class MeasurementCassandraRepositorySpec override val container: CassandraContainer = CassandraContainer() private def configuration: Configuration = - Configuration( - contactPoints = Seq(container.container.socket), - localDatacenter = container.datacenter, - measurementsKeyspace = CqlIdentifier.fromCql("bandwhichd") - ) + ConfigurationFixtures.testDefaults(container) "MeasurementCassandraRepository" should { "record and get measurements" in { diff --git a/src/test/scala/de/neuland/bandwhichd/server/boot/ConfigurationFixtures.scala b/src/test/scala/de/neuland/bandwhichd/server/boot/ConfigurationFixtures.scala new file mode 100644 index 0000000..b893b7c --- /dev/null +++ b/src/test/scala/de/neuland/bandwhichd/server/boot/ConfigurationFixtures.scala @@ -0,0 +1,20 @@ +package de.neuland.bandwhichd.server.boot + +import com.datastax.oss.driver.api.core.CqlIdentifier +import de.neuland.bandwhichd.server.lib.test.cassandra.CassandraContainer + +import java.time.Duration + +object ConfigurationFixtures { + def testDefaults(container: CassandraContainer): Configuration = + Configuration( + contactPoints = Seq(container.container.socket), + localDatacenter = container.datacenter, + measurementsKeyspace = CqlIdentifier.fromCql("bandwhichd"), + measurementNetworkConfigurationTTL = Duration.ofHours(2), + measurementNetworkUtilizationTTL = Duration.ofHours(2), + recordMeasurementQueryTimeout = Duration.ofSeconds(2), + getAllMeasurementsQueryTimeout = Duration.ofSeconds(8), + aggregationSchedulerInterval = Duration.ofSeconds(10) + ) +}