Skip to content

Commit

Permalink
Fixes apache#3195 - code review feedback
Browse files Browse the repository at this point in the history
 * Simplify error handling code
 * Change how data location is calculated to avoid performance issue
  • Loading branch information
mcdan committed Feb 26, 2018
1 parent f88f9d4 commit 83f10fb
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,22 @@ package whisk.core.containerpool

import java.time.Instant

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Failure
import akka.actor.FSM
import akka.actor.Props
import akka.actor.Stash
import akka.actor.{FSM, Props, Stash}
import akka.actor.Status.{Failure => FailureMessage}
import akka.pattern.pipe
import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
import whisk.core.connector.ActivationMessage
import whisk.core.containerpool.logging.LogCollectingException
import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity._
import whisk.core.entity.size._
import whisk.core.entity.ExecManifest.ImageName
import whisk.http.Messages

import scala.util.Try
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}

// States
sealed trait ContainerState
Expand Down Expand Up @@ -382,25 +378,21 @@ class ContainerProxy(
val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
.flatMap { activation =>
val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS)
Try(
val collected: Future[ActivationLogs] = try {
collectLogs(tid, job.msg.user, activation, container, job.action)
.andThen {
case Success(_) => tid.finished(this, start)
case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
}
.map(logs => Right(activation.withLogs(logs)))
.recover {
case LogCollectingException(logs) =>
Left(ActivationLogReadingError(activation.withLogs(logs)))
case _ =>
Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
}) match {
case Success(e) => e
case Failure(t) =>
tid.failed(this, start, s"reading logs failed: $t")
Future.successful(
Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure))))))
}
} catch { case t: Throwable => Future.failed(t) }
collected
.andThen {
case Success(_) => tid.finished(this, start)
case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
}
.map(logs => Right(activation.withLogs(logs)))
.recover {
case LogCollectingException(logs) =>
Left(ActivationLogReadingError(activation.withLogs(logs)))
case _ =>
Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
}
}

// Storing the record. Entirely asynchronous and not waited upon.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,14 @@ import akka.actor.ActorSystem
import akka.stream.alpakka.file.scaladsl.FileTailSource
import akka.stream.scaladsl.{FileIO, Source => AkkaSource}
import akka.util.ByteString

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.blocking
import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.containerpool.ContainerId
import whisk.core.containerpool.ContainerAddress
import whisk.common.{Logging, TransactionId}
import whisk.core.containerpool.{ContainerAddress, ContainerId}

import scala.io.Source
import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration
import scala.io.Source

class DockerClientWithFileAccess(dockerHost: Option[String] = None,
containersDirectory: File = Paths.get("containers").toFile)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ class DockerContainer(protected val id: ContainerId,
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
docker
.rawContainerLogs(id, logFileOffset.get(), if (waitForSentinel) Some(filePollInterval) else None)
.recover {
case _: RuntimeException =>
ByteString(LogLine(Instant.now.toString, "stderr", "Ouch.").toJson.compactPrint)
}
// This stage only throws 'FramingException' so we cannot decide whether we got truncated due to a size
// constraint (like StreamLimitReachedException below) or due to the file being truncated itself.
.via(Framing.delimiter(delimiter, limit.toBytes.toInt))
Expand All @@ -255,6 +259,8 @@ class DockerContainer(protected val id: ContainerId,
// FramingException can also mean exceeding the limits, we cannot decide which case happened so we resort
// to the general error message. This will be the last element of the stream.
ByteString(LogLine(Instant.now.toString, "stderr", Messages.logFailure).toJson.compactPrint)
case _: RuntimeException =>
ByteString(LogLine(Instant.now.toString, "stderr", "Ouch.").toJson.compactPrint)
}
.takeWithin(waitForLogs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,5 @@ object DockerContainerFactoryProvider extends ContainerFactoryProvider {
}

case class DockerContainerFactoryConfig(dataroot: Option[String]) {
def containerPath: File =
Paths
.get({
if (dataroot.isEmpty)
"/containers"
else
dataroot.get + "/containers"
})
.toFile
val containerPath: File = Paths.get(dataroot.getOrElse("") + "/containers").toFile
}

0 comments on commit 83f10fb

Please sign in to comment.