From ef8db1264283c4959ce1aaded643c799fbea4eb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 8 May 2017 11:09:06 +0200 Subject: [PATCH] Directory sources #272 --- docs/src/main/paradox/file.md | 21 ++++ .../alpakka/file/javadsl/Directory.java | 41 +++++++ .../file/javadsl/DirectoryChangesSource.java | 1 + .../alpakka/file/scaladsl/Directory.scala | 46 +++++++ .../scaladsl/DirectoryChangesSource.scala | 5 +- .../file/scaladsl/FileTailSource.scala | 4 +- .../alpakka/file/javadsl/DirectoryTest.java | 115 ++++++++++++++++++ .../alpakka/file/scaladsl/DirectorySpec.scala | 67 ++++++++++ .../file/scaladsl/FileTailSourceSpec.scala | 2 +- 9 files changed, 297 insertions(+), 5 deletions(-) create mode 100644 file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java create mode 100644 file/src/main/scala/akka/stream/alpakka/file/scaladsl/Directory.scala create mode 100644 file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java create mode 100644 file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala diff --git a/docs/src/main/paradox/file.md b/docs/src/main/paradox/file.md index 3c21200df9..b3e68612df 100644 --- a/docs/src/main/paradox/file.md +++ b/docs/src/main/paradox/file.md @@ -54,6 +54,27 @@ Java : @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/FileTailSourceTest.java) { #simple-lines } +### Directory + +`Directory.ls(path)` lists all files and directories +directly in a given directory: + +Scala +: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala) { #ls } + +Java +: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java) { #ls } + +`Directory.walk(path)` traverses all subdirectories and lists +files and directories depth first: + +Scala +: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala) { #walk } + +Java +: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java) { #walk } + + ### DirectoryChangesSource The `DirectoryChangesSource` will emit elements every time there is a change to a watched directory diff --git a/file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java b/file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java new file mode 100644 index 0000000000..b1380ac43c --- /dev/null +++ b/file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.file.javadsl; + +import akka.NotUsed; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.StreamConverters; +import scala.None; +import scala.None$; +import scala.collection.immutable.Nil$; + +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Path; + +public final class Directory { + + /** + * List all files in the given directory + */ + public static Source ls(Path directory) { + return akka.stream.alpakka.file.scaladsl.Directory.ls(directory).asJava(); + } + + /** + * Recursively list files and directories in the given directory, depth first. + */ + public static Source walk(Path directory) { + return StreamConverters.fromJavaStream(() -> Files.walk(directory)); + } + + /** + * Recursively list files and directories in the given directory, depth first, + * with a maximum directory depth limit and a possibly set of options (See {@link java.nio.file.Files#walk} for + * details. + */ + public static Source walk(Path directory, int maxDepth, FileVisitOption... options) { + return StreamConverters.fromJavaStream(() -> Files.walk(directory, maxDepth, options)); + } +} diff --git a/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java b/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java index 2125d85dbe..01f38cb71b 100644 --- a/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java +++ b/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java @@ -205,6 +205,7 @@ public String toString() { * the JDK implementation is slow, it will not help lowering this * @param maxBufferSize Maximum number of buffered directory changes before the stage fails */ + @SuppressWarnings("unchecked") public static Source, NotUsed> create(Path directoryPath, FiniteDuration pollInterval, int maxBufferSize) { return Source.fromGraph(new DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, Pair::apply)); } diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Directory.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Directory.scala new file mode 100644 index 0000000000..22bed90eb4 --- /dev/null +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Directory.scala @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.file.scaladsl + +import java.nio.file.{FileVisitOption, Files, Path} + +import akka.NotUsed +import akka.stream.scaladsl.{Source, StreamConverters} + +import scala.collection.immutable + +object Directory { + + /** + * List all files in the given directory + */ + def ls(directory: Path): Source[Path, NotUsed] = { + require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't") + StreamConverters.fromJavaStream(() => Files.list(directory)) + } + + /** + * Recursively list files and directories in the given directory and its subdirectories. Listing is done + * depth first. + * + * @param maxDepth If defined limits the depth of the directory structure to walk through + * @param fileVisitOptions See `java.nio.files.Files.walk()` for details + */ + def walk(directory: Path, + maxDepth: Option[Int] = None, + fileVisitOptions: immutable.Seq[FileVisitOption] = Nil): Source[Path, NotUsed] = { + require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't") + val factory = maxDepth match { + case None => + () => + Files.walk(directory, fileVisitOptions: _*) + case Some(maxDepth) => + () => + Files.walk(directory, maxDepth, fileVisitOptions: _*) + } + + StreamConverters.fromJavaStream(factory) + } + +} diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/DirectoryChangesSource.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/DirectoryChangesSource.scala index 9455a21ff3..cdb9f86d06 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/DirectoryChangesSource.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/DirectoryChangesSource.scala @@ -8,7 +8,6 @@ import java.util.function.BiFunction import akka.NotUsed import akka.stream.alpakka.file.DirectoryChange -import akka.stream.alpakka.file.javadsl.DirectoryChangesSource import akka.stream.scaladsl.Source import scala.concurrent.duration.FiniteDuration @@ -30,6 +29,8 @@ object DirectoryChangesSource { def apply(directoryPath: Path, pollInterval: FiniteDuration, maxBufferSize: Int): Source[(Path, DirectoryChange), NotUsed] = - Source.fromGraph(new DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, tupler)) + Source.fromGraph( + new akka.stream.alpakka.file.javadsl.DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, tupler) + ) } diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/FileTailSource.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/FileTailSource.scala index 470db37bbe..30d08fda45 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/FileTailSource.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/FileTailSource.scala @@ -7,7 +7,7 @@ import java.nio.charset.{Charset, StandardCharsets} import java.nio.file.Path import akka.NotUsed -import akka.stream.alpakka.file.javadsl.FileTailSource +import akka.stream.alpakka.file.javadsl.{FileTailSource => JavaFileTailSource} import akka.stream.scaladsl.Source import akka.util.ByteString @@ -34,7 +34,7 @@ object FileTailSource { maxChunkSize: Int, startingPosition: Long, pollingInterval: FiniteDuration): Source[ByteString, NotUsed] = - Source.fromGraph(new FileTailSource(path, maxChunkSize, startingPosition, pollingInterval)) + Source.fromGraph(new JavaFileTailSource(path, maxChunkSize, startingPosition, pollingInterval)) /** * Scala API: Read the entire contents of a file as text lines, and then when the end is reached, keep reading diff --git a/file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java b/file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java new file mode 100644 index 0000000000..c0464a477c --- /dev/null +++ b/file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.file.javadsl; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.TestKit; +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import scala.concurrent.duration.FiniteDuration; + +import java.nio.file.FileSystem; +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class DirectoryTest { + + private FileSystem fs; + private ActorSystem system; + private Materializer materializer; + + @Before + public void setup() { + fs = Jimfs.newFileSystem(Configuration.unix()); + system = ActorSystem.create(); + materializer = ActorMaterializer.create(system); + } + + + @Test + public void listFiles() throws Exception { + final Path dir = fs.getPath("listfiles"); + Files.createDirectories(dir); + final Path file1 = Files.createFile(dir.resolve("file1")); + final Path file2 = Files.createFile(dir.resolve("file2")); + + // #ls + final Source source = Directory.ls(dir); + // #ls + + final List result = source.runWith(Sink.seq(), materializer) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(result.size(), 2); + assertEquals(result.get(0), file1); + assertEquals(result.get(1), file2); + } + + @Test + public void walkAFileTree() throws Exception { + final Path root = fs.getPath("walk"); + Files.createDirectories(root); + final Path subdir1 = root.resolve("subdir1"); + Files.createDirectories(subdir1); + final Path file1 = subdir1.resolve("file1"); + Files.createFile(file1); + final Path subdir2 = root.resolve("subdir2"); + Files.createDirectories(subdir2); + final Path file2 = subdir2.resolve("file2"); + Files.createFile(file2); + + // #walk + final Source source = Directory.walk(root); + // #walk + + final List result = source.runWith(Sink.seq(), materializer) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(result, Arrays.asList(root, subdir1, file1, subdir2, file2)); + } + + @Test + public void walkAFileTreeWithOptions() throws Exception { + final Path root = fs.getPath("walk2"); + Files.createDirectories(root); + final Path subdir1 = root.resolve("subdir1"); + Files.createDirectories(subdir1); + final Path file1 = subdir1.resolve("file1"); + Files.createFile(file1); + final Path subdir2 = root.resolve("subdir2"); + Files.createDirectories(subdir2); + final Path file2 = subdir2.resolve("file2"); + Files.createFile(file2); + + // #walk + final Source source = Directory.walk(root, 1, FileVisitOption.FOLLOW_LINKS); + // #walk + + final List result = source.runWith(Sink.seq(), materializer) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(result, Arrays.asList(root, subdir1, subdir2)); + } + + @After + public void tearDown() throws Exception { + fs.close(); + fs = null; + TestKit.shutdownActorSystem(system, FiniteDuration.create(10, TimeUnit.SECONDS), true); + system = null; + materializer = null; + } +} diff --git a/file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala new file mode 100644 index 0000000000..9bda4cfa3b --- /dev/null +++ b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.file.scaladsl + +import java.nio.file.{Files, Path} + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source} +import akka.testkit.TestKit +import com.google.common.jimfs.{Configuration, Jimfs} +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +class DirectorySpec + extends TestKit(ActorSystem("directoryspec")) + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaFutures { + + private val fs = Jimfs.newFileSystem(Configuration.forCurrentPlatform.toBuilder.build) + private implicit val mat = ActorMaterializer() + + "The directory source factory" should { + "list files" in { + val dir = fs.getPath("listfiles") + Files.createDirectories(dir) + val paths = (0 to 100).map { n => + val name = s"file$n" + Files.createFile(dir.resolve(name)) + } + + // #ls + val source: Source[Path, NotUsed] = Directory.ls(dir) + // #ls + + val result = source.runWith(Sink.seq).futureValue + result.toSet shouldEqual paths.toSet + } + + "walk a file tree" in { + val root = fs.getPath("walk") + Files.createDirectories(root) + val subdir1 = root.resolve("subdir1") + Files.createDirectories(subdir1) + val file1 = subdir1.resolve("file1") + Files.createFile(file1) + val subdir2 = root.resolve("subdir2") + Files.createDirectories(subdir2) + val file2 = subdir2.resolve("file2") + Files.createFile(file2) + + // #walk + val files: Source[Path, NotUsed] = Directory.walk(root) + // #walk + + val result = files.runWith(Sink.seq).futureValue + result shouldEqual List(root, subdir1, file1, subdir2, file2) + } + } + + override protected def afterAll(): Unit = + fs.close() +} diff --git a/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala index cf2b779167..8115f08808 100644 --- a/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala +++ b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala @@ -13,7 +13,7 @@ import akka.stream.{ActorMaterializer, Materializer} import scala.concurrent.duration._ -object FileTailSourceSpec extends { +object FileTailSourceSpec { // small sample of usage, tails the first argument file path def main(args: Array[String]): Unit = {