Skip to content

Commit

Permalink
FTP - toPath sink #182
Browse files Browse the repository at this point in the history
  • Loading branch information
svezfaz committed Feb 15, 2017
1 parent 996ec9e commit 6425a59
Show file tree
Hide file tree
Showing 24 changed files with 375 additions and 87 deletions.
22 changes: 18 additions & 4 deletions docs/src/main/paradox/ftp.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Scala
: @@snip (../../../../ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpSpec.scala) { #create-settings }

Java
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpSourceTest.java) { #create-settings }
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpStageTest.java) { #create-settings }

The configuration above will create an anonymous connection with a remote FTP server in passive mode. For both FTPs and SFTP servers, you will need to provide the specialized versions of these settings: @scaladoc[FtpsSettings](akka.stream.alpakka.ftp.RemoteFileSettings$$FtpsSettings) or @scaladoc[SftpSettings](akka.stream.alpakka.ftp.RemoteFileSettings$$SftpSettings)
respectively.
Expand All @@ -59,7 +59,7 @@ Scala
: @@snip (../../../../ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpSpec.scala) { #traversing }

Java
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpSourceTest.java) { #traversing }
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpStageTest.java) { #traversing }

This source will emit @scaladoc[FtpFile](akka.stream.alpakka.ftp.FtpFile) elements with no significant materialization.

Expand All @@ -73,9 +73,23 @@ Scala
: @@snip (../../../../ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpSpec.scala) { #retrieving }

Java
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpSourceTest.java) { #retrieving }
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpStageTest.java) { #retrieving }

This souce will emit @scaladoc[ByteString](akka.util.ByteString) elements and materializes to @scaladoc[Future](scala.concurrent.Future) in Scala API and @extref[CompletionStage](java-api:java/util/concurrent/CompletionStage) in Java API of @scaladoc[IOResult](akka.stream.IOResult) when the stream finishes.
This source will emit @scaladoc[ByteString](akka.util.ByteString) elements and materializes to @scaladoc[Future](scala.concurrent.Future) in Scala API and @extref[CompletionStage](java-api:java/util/concurrent/CompletionStage) in Java API of @scaladoc[IOResult](akka.stream.IOResult) when the stream finishes.

For both FTPs and SFTP servers, you will need to use the `FTPs` and `SFTP` API respectively.

### Writing files

In order to store a remote file from a stream of bytes, you need to use the `toPath` method in the FTP API:

Scala
: @@snip (../../../../ftp/src/test/scala/akka/stream/alpakka/ftp/BaseFtpSpec.scala) { #storing }

Java
: @@snip (../../../../ftp/src/test/java/akka/stream/alpakka/ftp/FtpStageTest.java) { #storing }

This sink will consume @scaladoc[ByteString](akka.util.ByteString) elements and materializes to @scaladoc[Future](scala.concurrent.Future) in Scala API and @extref[CompletionStage](java-api:java/util/concurrent/CompletionStage) in Java API of @scaladoc[IOResult](akka.stream.IOResult) when the stream finishes.

For both FTPs and SFTP servers, you will need to use the `FTPs` and `SFTP` API respectively.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings] exte

val shape: SourceShape[FtpFile] = SourceShape(Outlet[FtpFile](s"$name.out"))

val out = shape.outlets.head.asInstanceOf[Outlet[FtpFile]]

override def initialAttributes: Attributes =
super.initialAttributes and Attributes.name(name) and IODispatcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package akka.stream.alpakka.ftp
package impl

import akka.stream.Shape
import akka.stream.stage.GraphStageLogic
import akka.stream.{Outlet, Shape}

import scala.util.control.NonFatal

private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSettings](
Expand All @@ -16,7 +17,6 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSett
) extends GraphStageLogic(shape) {

protected[this] implicit val client = ftpClient()
protected[this] val out = shape.outlets.head.asInstanceOf[Outlet[T]]
protected[this] var handler: Option[ftpLike.Handler] = Option.empty[ftpLike.Handler]
protected[this] var isConnected: Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,23 @@
package akka.stream.alpakka.ftp
package impl

import akka.stream.stage.{GraphStageWithMaterializedValue, OutHandler}
import akka.stream.{Attributes, IOResult, Outlet, SourceShape}
import akka.stream.stage.{GraphStageWithMaterializedValue, InHandler, OutHandler}
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.{Attributes, IOResult, Inlet, Outlet, Shape, SinkShape, SourceShape}
import akka.util.ByteString
import akka.util.ByteString.ByteString1C
import scala.concurrent.{Future, Promise}
import scala.util.control.NonFatal
import java.io.InputStream
import java.io.{InputStream, OutputStream}
import java.nio.file.Path

private[ftp] trait FtpIOGraphStage[FtpClient, S <: RemoteFileSettings]
extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] {
private[ftp] trait FtpIOGraphStage[FtpClient, S <: RemoteFileSettings, Sh <: Shape]
extends GraphStageWithMaterializedValue[Sh, Future[IOResult]] {

def name: String

def path: Path

def chunkSize: Int

def connectionSettings: S

implicit def ftpClient: () => FtpClient
Expand All @@ -32,7 +30,16 @@ private[ftp] trait FtpIOGraphStage[FtpClient, S <: RemoteFileSettings]
override def initialAttributes: Attributes =
super.initialAttributes and Attributes.name(name) and IODispatcher

val shape = SourceShape(Outlet[ByteString](s"$name.out"))
override def shape: Sh
}

private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings]
extends FtpIOGraphStage[FtpClient, S, SourceShape[ByteString]] {

def chunkSize: Int

val shape: SourceShape[ByteString] = SourceShape(Outlet[ByteString](s"$name.out"))
val out: Outlet[ByteString] = shape.outlets.head.asInstanceOf[Outlet[ByteString]]

def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {

Expand Down Expand Up @@ -114,3 +121,70 @@ private[ftp] trait FtpIOGraphStage[FtpClient, S <: RemoteFileSettings]
}

}

private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings]
extends FtpIOGraphStage[FtpClient, S, SinkShape[ByteString]] {
val shape: SinkShape[ByteString] = SinkShape(Inlet[ByteString](s"$name.in"))
val in: Inlet[ByteString] = shape.inlets.head.asInstanceOf[Inlet[ByteString]]

def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {

val matValuePromise = Promise[IOResult]()

val logic = new FtpGraphStageLogic[ByteString, FtpClient, S](shape, ftpLike, connectionSettings, ftpClient) {

private[this] var osOpt: Option[OutputStream] = None
private[this] var writtenBytesTotal: Long = 0L

setHandler(in,
new InHandler {
override def onPush(): Unit = {
write(grab(in))
pull(in)
}

override def onUpstreamFinish(): Unit =
try {
osOpt.foreach(_.close())
disconnect()
} finally {
matSuccess()
super.onUpstreamFinish()
}
}) // end of handler

override def postStop(): Unit =
try {
osOpt.foreach(_.close())
} finally {
super.postStop()
}

protected[this] def doPreStart(): Unit = {
val tryOs = ftpLike.storeFileStream(path.toAbsolutePath.toString, handler.get)
if (tryOs.isSuccess) {
osOpt = tryOs.toOption
pull(in)
} else
tryOs.failed.foreach { case NonFatal(t) => throw t }
}

protected[this] def matSuccess(): Boolean =
matValuePromise.trySuccess(IOResult.createSuccessful(writtenBytesTotal))

protected[this] def matFailure(t: Throwable): Boolean =
matValuePromise.trySuccess(IOResult.createFailed(writtenBytesTotal, t))

/** BLOCKING I/O WRITE */
private[this] def write(bytes: ByteString) =
osOpt.foreach { os =>
os.write(bytes.toArray)
writtenBytesTotal += bytes.size
}

} // end of stage logic

(logic, matValuePromise.future)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package impl
import akka.stream.alpakka.ftp.RemoteFileSettings.SftpSettings
import com.jcraft.jsch.JSch
import org.apache.commons.net.ftp.FTPClient

import scala.collection.immutable
import scala.util.Try
import java.io.InputStream
import java.io.{InputStream, OutputStream}

protected[ftp] trait FtpLike[FtpClient, S <: RemoteFileSettings] {

Expand All @@ -24,6 +25,8 @@ protected[ftp] trait FtpLike[FtpClient, S <: RemoteFileSettings] {
def listFiles(handler: Handler): immutable.Seq[FtpFile]

def retrieveFileInputStream(name: String, handler: Handler): Try[InputStream]

def storeFileStream(name: String, handler: Handler): Try[OutputStream]
}

object FtpLike {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ package akka.stream.alpakka.ftp
package impl

import org.apache.commons.net.ftp.{FTP, FTPClient}

import scala.collection.immutable
import scala.util.Try
import java.io.{IOException, InputStream}
import java.io.{IOException, InputStream, OutputStream}
import java.nio.file.Paths

private[ftp] trait FtpOperations { _: FtpLike[FTPClient, FtpFileSettings] =>
Expand Down Expand Up @@ -51,4 +52,9 @@ private[ftp] trait FtpOperations { _: FtpLike[FTPClient, FtpFileSettings] =>
val is = handler.retrieveFileStream(name)
if (is != null) is else throw new IOException(s"$name: No such file or directory")
}

def storeFileStream(name: String, handler: Handler): Try[OutputStream] = Try {
val os = handler.storeFileStream(name)
if (os != null) os else throw new IOException(s"$name: Could not connect")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.commons.net.ftp.FTPClient
import java.net.InetAddress
import java.nio.file.Path

private[ftp] trait FtpSourceFactory[FtpClient] { self =>
private[ftp] trait FtpStageFactory[FtpClient] { self =>

type S <: RemoteFileSettings

Expand All @@ -23,6 +23,8 @@ private[ftp] trait FtpSourceFactory[FtpClient] { self =>

protected[this] def ftpIOSourceName: String

protected[this] def ftpIOSinkName: String

protected[this] def createBrowserGraph(
_basePath: String,
_connectionSettings: S
Expand All @@ -35,12 +37,12 @@ private[ftp] trait FtpSourceFactory[FtpClient] { self =>
val ftpLike: FtpLike[FtpClient, S] = _ftpLike
}

protected[this] def createIOGraph(
protected[this] def createIOSource(
_path: Path,
_connectionSettings: S,
_chunkSize: Int
)(implicit _ftpLike: FtpLike[FtpClient, S]): FtpIOGraphStage[FtpClient, S] =
new FtpIOGraphStage[FtpClient, S] {
)(implicit _ftpLike: FtpLike[FtpClient, S]): FtpIOSourceStage[FtpClient, S] =
new FtpIOSourceStage[FtpClient, S] {
lazy val name: String = ftpIOSourceName
val path: Path = _path
val connectionSettings: S = _connectionSettings
Expand All @@ -49,35 +51,53 @@ private[ftp] trait FtpSourceFactory[FtpClient] { self =>
val chunkSize: Int = _chunkSize
}

protected[this] def createIOSink(
_path: Path,
_connectionSettings: S
)(implicit _ftpLike: FtpLike[FtpClient, S]): FtpIOSinkStage[FtpClient, S] =
new FtpIOSinkStage[FtpClient, S] {
lazy val name: String = ftpIOSinkName
val path: Path = _path
val connectionSettings: S = _connectionSettings
val ftpClient: () => FtpClient = self.ftpClient
val ftpLike: FtpLike[FtpClient, S] = _ftpLike
}

protected[this] def defaultSettings(
hostname: String,
username: Option[String] = None,
password: Option[String] = None
): S
}

private[ftp] trait FtpSource extends FtpSourceFactory[FTPClient] {
private[ftp] trait FtpStage extends FtpStageFactory[FTPClient] {
protected final val FtpBrowserSourceName = "FtpBrowserSource"
protected final val FtpIOSourceName = "FtpIOSource"
protected final val FtpIOSinkName = "FtpIOSink"
protected val ftpClient: () => FTPClient = () => new FTPClient
protected val ftpBrowserSourceName: String = FtpBrowserSourceName
protected val ftpIOSourceName: String = FtpIOSourceName
protected val ftpIOSinkName: String = FtpIOSinkName
}

private[ftp] trait FtpsSource extends FtpSourceFactory[FTPClient] {
private[ftp] trait FtpsStage extends FtpStageFactory[FTPClient] {
protected final val FtpsBrowserSourceName = "FtpsBrowserSource"
protected final val FtpsIOSourceName = "FtpsIOSource"
protected final val FtpsIOSinkName = "FtpsIOSink"
protected val ftpClient: () => FTPClient = () => new FTPClient
protected val ftpBrowserSourceName: String = FtpsBrowserSourceName
protected val ftpIOSourceName: String = FtpsIOSourceName
protected val ftpIOSinkName: String = FtpsIOSinkName
}

private[ftp] trait SftpSource extends FtpSourceFactory[JSch] {
private[ftp] trait SftpStage extends FtpStageFactory[JSch] {
protected final val sFtpBrowserSourceName = "sFtpBrowserSource"
protected final val sFtpIOSourceName = "sFtpIOSource"
protected final val sFtpIOSinkName = "sFtpIOSink"
protected val ftpClient: () => JSch = () => new JSch
protected val ftpBrowserSourceName: String = sFtpBrowserSourceName
protected val ftpIOSourceName: String = sFtpIOSourceName
protected val ftpIOSinkName: String = sFtpIOSinkName
}

private[ftp] trait FtpDefaultSettings {
Expand Down Expand Up @@ -128,17 +148,17 @@ private[ftp] trait SftpDefaultSettings {
)
}

private[ftp] trait FtpSourceParams extends FtpSource with FtpDefaultSettings {
private[ftp] trait FtpStageParams extends FtpStage with FtpDefaultSettings {
type S = FtpFileSettings
protected[this] val ftpLike: FtpLike[FTPClient, S] = FtpLike.ftpLikeInstance
}

private[ftp] trait FtpsSourceParams extends FtpsSource with FtpsDefaultSettings {
private[ftp] trait FtpsStageParams extends FtpsStage with FtpsDefaultSettings {
type S = FtpFileSettings
protected[this] val ftpLike: FtpLike[FTPClient, S] = FtpLike.ftpLikeInstance
}

private[ftp] trait SftpSourceParams extends SftpSource with SftpDefaultSettings {
private[ftp] trait SftpStageParams extends SftpStage with SftpDefaultSettings {
type S = SftpSettings
protected[this] val ftpLike: FtpLike[JSch, S] = FtpLike.sFtpLikeInstance
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package impl

import akka.stream.alpakka.ftp.RemoteFileSettings.SftpSettings
import com.jcraft.jsch.{ChannelSftp, JSch}

import scala.collection.immutable
import scala.util.Try
import java.io.InputStream
import java.io.{InputStream, OutputStream}
import java.nio.file.Paths

private[ftp] trait SftpOperations { _: FtpLike[JSch, SftpSettings] =>
Expand Down Expand Up @@ -58,4 +59,8 @@ private[ftp] trait SftpOperations { _: FtpLike[JSch, SftpSettings] =>
def retrieveFileInputStream(name: String, handler: Handler): Try[InputStream] = Try {
handler.get(name)
}

def storeFileStream(name: String, handler: Handler): Try[OutputStream] = Try {
handler.put(name)
}
}
Loading

0 comments on commit 6425a59

Please sign in to comment.