Skip to content

Commit

Permalink
FTP - append mode for toPath sink + improved upstream failure handling
Browse files Browse the repository at this point in the history
  • Loading branch information
svezfaz authored and sbonettiEXPE committed Mar 27, 2017
1 parent d1d4370 commit 3a4368f
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings]

private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings]
extends FtpIOGraphStage[FtpClient, S, SinkShape[ByteString]] {

def append: Boolean

val shape: SinkShape[ByteString] = SinkShape(Inlet[ByteString](s"$name.in"))
val in: Inlet[ByteString] = shape.inlets.head.asInstanceOf[Inlet[ByteString]]

Expand Down Expand Up @@ -144,6 +147,11 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings]
matSuccess()
super.onUpstreamFinish()
}

override def onUpstreamFailure(exception: Throwable): Unit = {
matFailure(exception)
failStage(exception)
}
}) // end of handler

override def postStop(): Unit =
Expand All @@ -154,7 +162,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings]
}

protected[this] def doPreStart(): Unit = {
osOpt = Some(ftpLike.storeFileStream(path, handler.get).get)
osOpt = Some(ftpLike.storeFileStream(path, handler.get, append).get)
pull(in)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ protected[ftp] trait FtpLike[FtpClient, S <: RemoteFileSettings] {

def retrieveFileInputStream(name: String, handler: Handler): Try[InputStream]

def storeFileStream(name: String, handler: Handler): Try[OutputStream]
def storeFileStream(name: String, handler: Handler, append: Boolean): Try[OutputStream]
}

object FtpLike {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ private[ftp] trait FtpOperations { _: FtpLike[FTPClient, FtpFileSettings] =>
if (is != null) is else throw new IOException(s"$name: No such file or directory")
}

def storeFileStream(name: String, handler: Handler): Try[OutputStream] = Try {
val os = handler.storeFileStream(name)
def storeFileStream(name: String, handler: Handler, append: Boolean): Try[OutputStream] = Try {
val os = if (append) handler.appendFileStream(name) else handler.storeFileStream(name)
if (os != null) os else throw new IOException(s"Could not write to $name")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ private[ftp] trait FtpSourceFactory[FtpClient] { self =>

protected[this] def createIOSink(
_path: String,
_connectionSettings: S
_connectionSettings: S,
_append: Boolean
)(implicit _ftpLike: FtpLike[FtpClient, S]): FtpIOSinkStage[FtpClient, S] =
new FtpIOSinkStage[FtpClient, S] {
lazy val name: String = ftpIOSinkName
val path: String = _path
val connectionSettings: S = _connectionSettings
val ftpClient: () => FtpClient = self.ftpClient
val ftpLike: FtpLike[FtpClient, S] = _ftpLike
val append: Boolean = _append
}

protected[this] def defaultSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import java.io.{InputStream, OutputStream}
import java.nio.file.Paths
import java.nio.file.attribute.PosixFilePermissions

import com.jcraft.jsch.ChannelSftp.{APPEND, OVERWRITE}

private[ftp] trait SftpOperations { _: FtpLike[JSch, SftpSettings] =>

type Handler = ChannelSftp
Expand Down Expand Up @@ -77,7 +79,7 @@ private[ftp] trait SftpOperations { _: FtpLike[JSch, SftpSettings] =>
handler.get(name)
}

def storeFileStream(name: String, handler: Handler): Try[OutputStream] = Try {
handler.put(name)
def storeFileStream(name: String, handler: Handler, append: Boolean): Try[OutputStream] = Try {
if (append) handler.put(name, APPEND) else handler.put(name, OVERWRITE)
}
}
24 changes: 20 additions & 4 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import akka.stream.alpakka.ftp.impl.{FtpLike, FtpSourceFactory}
import akka.stream.IOResult
import akka.stream.javadsl.Source
import akka.stream.javadsl.Sink
import akka.stream.scaladsl.{Source => ScalaSource}
import akka.stream.scaladsl.{Sink => ScalaSink}
import akka.stream.scaladsl.{Source ScalaSource}
import akka.stream.scaladsl.{Sink ScalaSink}
import akka.util.ByteString
import com.jcraft.jsch.JSch
import org.apache.commons.net.ftp.FTPClient
Expand Down Expand Up @@ -158,16 +158,32 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] =>
*
* @param path the file path
* @param connectionSettings connection settings
* @param append append data is a file already exists, overwrite the file if not
* @return A [[Sink]] of [[ByteString]] that materializes to a [[CompletionStage]] of [[IOResult]]
*/
def toPath(
path: String,
connectionSettings: S
connectionSettings: S,
append: Boolean
): Sink[ByteString, CompletionStage[IOResult]] = {
import scala.compat.java8.FutureConverters._
ScalaSink.fromGraph(createIOSink(path, connectionSettings)).mapMaterializedValue(_.toJava).asJava
ScalaSink.fromGraph(createIOSink(path, connectionSettings, append)).mapMaterializedValue(_.toJava).asJava
}

/**
* Java API: creates a [[Sink]] of [[ByteString]] to some file path.
* If a file already exists at the specified target path, it will get overwritten.
*
* @param path the file path
* @param connectionSettings connection settings
* @return A [[Sink]] of [[ByteString]] that materializes to a [[CompletionStage]] of [[IOResult]]
*/
def toPath(
path: String,
connectionSettings: S
): Sink[ByteString, CompletionStage[IOResult]] =
toPath(path, connectionSettings, append = false)

protected[this] implicit def ftpLike: FtpLike[FtpClient, S]
}

Expand Down
1 change: 1 addition & 0 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ object FtpCredentials {

/**
* Non-anonymous credentials
*
* @param username the username
* @param password the password
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] =>
*
* @param path the file path
* @param connectionSettings connection settings
* @param append append data is a file already exists, overwrite the file if not
* @return A [[Sink]] of [[ByteString]] that materializes to a [[Future]] of [[IOResult]]
*/
def toPath(
path: String,
connectionSettings: S
connectionSettings: S,
append: Boolean = false
): Sink[ByteString, Future[IOResult]] =
Sink.fromGraph(createIOSink(path, connectionSettings))
Sink.fromGraph(createIOSink(path, connectionSettings, append))

protected[this] implicit def ftpLike: FtpLike[FtpClient, S]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.*;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;

public class JimfsFtpFile implements FtpFile {

private final Logger LOG = LoggerFactory.getLogger(JimfsFtpFile.class);
Expand Down Expand Up @@ -306,7 +309,9 @@ public OutputStream createOutputStream(long offset)
throw new IOException("No write permission : " + path.getFileName());
}

return Files.newOutputStream(path, StandardOpenOption.CREATE);
final OpenOption openOption = offset > 0 ? APPEND : CREATE;

return Files.newOutputStream(path, openOption);
}

public InputStream createInputStream(long offset)
Expand Down
4 changes: 2 additions & 2 deletions ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ trait BaseFtpSpec extends PlainFtpSupportImpl with BaseSpec {
//#retrieving

//#storing
protected def storeToPath(path: String): Sink[ByteString, Future[IOResult]] =
Ftp.toPath(path, settings)
protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] =
Ftp.toPath(path, settings, append)
//#storing
}
4 changes: 2 additions & 2 deletions ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait BaseFtpsSpec extends FtpsSupportImpl with BaseSpec {
protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]] =
Ftps.fromPath(path, settings)

protected def storeToPath(path: String): Sink[ByteString, Future[IOResult]] =
Ftps.toPath(path, settings)
protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] =
Ftps.toPath(path, settings, append)

}
4 changes: 2 additions & 2 deletions ftp/src/test/scala/akka/stream/alpakka/ftp/BaseSftpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ trait BaseSftpSpec extends SftpSupportImpl with BaseSpec {
protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]] =
Sftp.fromPath(path, settings)

protected def storeToPath(path: String): Sink[ByteString, Future[IOResult]] =
Sftp.toPath(path, settings)
protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] =
Sftp.toPath(path, settings, append)
}
2 changes: 1 addition & 1 deletion ftp/src/test/scala/akka/stream/alpakka/ftp/BaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait BaseSpec

protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]]

protected def storeToPath(path: String): Sink[ByteString, Future[IOResult]]
protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]]

protected def startServer(): Unit

Expand Down
69 changes: 53 additions & 16 deletions ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,58 @@ trait CommonFtpStageSpec extends BaseSpec {
}
}

"FtpIOSink" should {
"write a file to a path from a stream of bytes" in {
val fileName = "sample_io"
val result = Source.single(ByteString(getLoremIpsum)).runWith(storeToPath(s"/$fileName")).futureValue
"FTPIOSink" when {
val fileName = "sample_io"

val expectedNumOfBytes = getLoremIpsum.getBytes().length
result shouldBe IOResult.createSuccessful(expectedNumOfBytes)
"no file is already present at the target location" should {
"create a new file from the provided stream of bytes regardless of the append mode" in {
List(true, false).foreach { mode
val result = Source.single(ByteString(getLoremIpsum)).runWith(storeToPath(s"/$fileName", mode)).futureValue

val storedContents = getFtpFileContents(FtpBaseSupport.FTP_ROOT_DIR, fileName)
storedContents shouldBe getLoremIpsum.getBytes
val expectedNumOfBytes = getLoremIpsum.getBytes().length
result shouldBe IOResult.createSuccessful(expectedNumOfBytes)

val storedContents = getFtpFileContents(FtpBaseSupport.FTP_ROOT_DIR, fileName)
storedContents shouldBe getLoremIpsum.getBytes

cleanFiles()
}
}
}

"a file is already present at the target location" should {

val reversedLoremIpsum = getLoremIpsum.reverse
val expectedNumOfBytes = reversedLoremIpsum.length

"overwrite it when not in append mode" in {
putFileOnFtp(FtpBaseSupport.FTP_ROOT_DIR, fileName)

val result =
Source.single(ByteString(reversedLoremIpsum)).runWith(storeToPath(s"/$fileName", append = false)).futureValue

result shouldBe IOResult.createSuccessful(expectedNumOfBytes)

val storedContents = getFtpFileContents(FtpBaseSupport.FTP_ROOT_DIR, fileName)
storedContents shouldBe reversedLoremIpsum.getBytes
}

"append to its contents when in append mode" in {
putFileOnFtp(FtpBaseSupport.FTP_ROOT_DIR, fileName)

val result =
Source.single(ByteString(reversedLoremIpsum)).runWith(storeToPath(s"/$fileName", append = true)).futureValue

result shouldBe IOResult.createSuccessful(expectedNumOfBytes)

val storedContents = getFtpFileContents(FtpBaseSupport.FTP_ROOT_DIR, fileName)

storedContents shouldBe getLoremIpsum.getBytes ++ reversedLoremIpsum.getBytes
}
}
}

it should {
"write a bigger file (~2 MB) to a path from a stream of bytes" in {
val fileName = "sample_bigger_file"
val fileContents = new Array[Byte](2000020)
Expand All @@ -139,7 +179,7 @@ trait CommonFtpStageSpec extends BaseSpec {
val result = Source[Byte](fileContents.toList)
.grouped(8192)
.map(s ByteString.apply(s.toArray))
.runWith(storeToPath(s"/$fileName"))
.runWith(storeToPath(s"/$fileName", append = false))
.futureValue

val expectedNumOfBytes = fileContents.length
Expand All @@ -149,17 +189,14 @@ trait CommonFtpStageSpec extends BaseSpec {
storedContents shouldBe fileContents
}

"overwrite a file if it's already present" in {
"fail and report the exception in the result status if upstream fails" in {
val fileName = "sample_io"
putFileOnFtp(FtpBaseSupport.FTP_ROOT_DIR, fileName)
val brokenSource = Source(10.to(0, -1)).map(x ByteString(10 / x))

val reversedLoremIpsum = getLoremIpsum.reverse
Source.single(ByteString(reversedLoremIpsum)).runWith(storeToPath(s"/$fileName")).futureValue
val result = brokenSource.runWith(storeToPath(s"/$fileName", append = false)).futureValue

val storedContents = getFtpFileContents(FtpBaseSupport.FTP_ROOT_DIR, fileName)
storedContents shouldBe reversedLoremIpsum.getBytes
result.status.failed.get shouldBe a[ArithmeticException]
}

}

}

0 comments on commit 3a4368f

Please sign in to comment.