From 7b1296a81d46fff4e6c9c6c959030b6fa137c7b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Fri, 21 Apr 2017 18:29:18 -0500 Subject: [PATCH] Directory sources #272 --- docs/src/main/paradox/file.md | 21 +++++ .../alpakka/file/javadsl/Directory.java | 26 ++++++ .../file/javadsl/DirectoryChangesSource.java | 1 + .../alpakka/file/scaladsl/Directory.scala | 36 ++++++++ .../scaladsl/DirectoryChangesSource.scala | 5 +- .../alpakka/file/javadsl/DirectoryTest.java | 92 +++++++++++++++++++ .../alpakka/file/scaladsl/DirectorySpec.scala | 67 ++++++++++++++ .../file/scaladsl/FileTailSourceSpec.scala | 2 +- 8 files changed, 247 insertions(+), 3 deletions(-) create mode 100644 file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java create mode 100644 file/src/main/scala/akka/stream/alpakka/file/scaladsl/Directory.scala create mode 100644 file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java create mode 100644 file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala diff --git a/docs/src/main/paradox/file.md b/docs/src/main/paradox/file.md index 11162ae31e..80bb34cb40 100644 --- a/docs/src/main/paradox/file.md +++ b/docs/src/main/paradox/file.md @@ -54,6 +54,27 @@ Java : @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/FileTailSourceTest.java) { #simple-lines } +### Directory + +`Directory.ls(path)` lists all files and directories +directly in a given directory: + +Scala +: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala) { #ls } + +Java +: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java) { #ls } + +`Directory.walk(path)` traverses all subdirectories and lists +files and directories depth first: + +Scala +: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala) { #walk } + +Java +: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java) { #walk } + + ### DirectoryChangesSource The `DirectoryChangesSource` will emit elements every time there is a change to a watched directory diff --git a/file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java b/file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java new file mode 100644 index 0000000000..d4147ee716 --- /dev/null +++ b/file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.file.javadsl; + +import akka.NotUsed; +import akka.stream.javadsl.Source; + +import java.nio.file.Path; + +public final class Directory { + + /** + * List all files in the given directory + */ + public static Source ls(Path directory) { + return akka.stream.alpakka.file.scaladsl.Directory.ls(directory).asJava(); + } + + /** + * Recursively list files and directories in the given directory, depth first. + */ + public static Source walk(Path directory) { + return akka.stream.alpakka.file.scaladsl.Directory.walk(directory).asJava(); + } +} diff --git a/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java b/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java index 2125d85dbe..01f38cb71b 100644 --- a/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java +++ b/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java @@ -205,6 +205,7 @@ public String toString() { * the JDK implementation is slow, it will not help lowering this * @param maxBufferSize Maximum number of buffered directory changes before the stage fails */ + @SuppressWarnings("unchecked") public static Source, NotUsed> create(Path directoryPath, FiniteDuration pollInterval, int maxBufferSize) { return Source.fromGraph(new DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, Pair::apply)); } diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Directory.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Directory.scala new file mode 100644 index 0000000000..8fa3105df2 --- /dev/null +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Directory.scala @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.file.scaladsl + +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file.{Files, Path} +import java.util.function.BiPredicate + +import akka.NotUsed +import akka.stream.scaladsl.{Source, StreamConverters} + +object Directory { + + /** + * List all files in the given directory + */ + def ls(directory: Path): Source[Path, NotUsed] = { + require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't") + StreamConverters.fromJavaStream(() => Files.list(directory)) + } + + /** + * Recursively list files in the given directory and its subdirectories. Listing is done + * depth first. + */ + def walk(directory: Path): Source[Path, NotUsed] = { + require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't") + StreamConverters.fromJavaStream(() => Files.walk(directory)) + } + + private def allFilesFilter = new BiPredicate[Path, BasicFileAttributes] { + override def test(t: Path, u: BasicFileAttributes): Boolean = true + } + +} diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/DirectoryChangesSource.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/DirectoryChangesSource.scala index 9455a21ff3..cdb9f86d06 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/DirectoryChangesSource.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/DirectoryChangesSource.scala @@ -8,7 +8,6 @@ import java.util.function.BiFunction import akka.NotUsed import akka.stream.alpakka.file.DirectoryChange -import akka.stream.alpakka.file.javadsl.DirectoryChangesSource import akka.stream.scaladsl.Source import scala.concurrent.duration.FiniteDuration @@ -30,6 +29,8 @@ object DirectoryChangesSource { def apply(directoryPath: Path, pollInterval: FiniteDuration, maxBufferSize: Int): Source[(Path, DirectoryChange), NotUsed] = - Source.fromGraph(new DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, tupler)) + Source.fromGraph( + new akka.stream.alpakka.file.javadsl.DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, tupler) + ) } diff --git a/file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java b/file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java new file mode 100644 index 0000000000..c8ffa941b6 --- /dev/null +++ b/file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.file.javadsl; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.TestKit; +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import scala.concurrent.duration.FiniteDuration; + +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class DirectoryTest { + + private FileSystem fs; + private ActorSystem system; + private Materializer materializer; + + @Before + public void setup() { + fs = Jimfs.newFileSystem(Configuration.unix()); + system = ActorSystem.create(); + materializer = ActorMaterializer.create(system); + } + + + @Test + public void listFiles() throws Exception { + final Path dir = fs.getPath("listfiles"); + Files.createDirectories(dir); + final Path file1 = Files.createFile(dir.resolve("file1")); + final Path file2 = Files.createFile(dir.resolve("file2")); + + // #ls + final Source source = Directory.ls(dir); + // #ls + + final List result = source.runWith(Sink.seq(), materializer) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(result.size(), 2); + assertEquals(result.get(0), file1); + assertEquals(result.get(1), file2); + } + + @Test + public void walkAFileTree() throws Exception { + final Path root = fs.getPath("walk"); + Files.createDirectories(root); + final Path subdir1 = root.resolve("subdir1"); + Files.createDirectories(subdir1); + final Path file1 = subdir1.resolve("file1"); + Files.createFile(file1); + final Path subdir2 = root.resolve("subdir2"); + Files.createDirectories(subdir2); + final Path file2 = subdir2.resolve("file2"); + Files.createFile(file2); + + // #walk + final Source source = Directory.walk(root); + // #walk + + final List result = source.runWith(Sink.seq(), materializer) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(result, Arrays.asList(root, subdir1, file1, subdir2, file2)); + } + + @After + public void tearDown() throws Exception { + fs.close(); + fs = null; + TestKit.shutdownActorSystem(system, FiniteDuration.create(10, TimeUnit.SECONDS), true); + system = null; + materializer = null; + } +} diff --git a/file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala new file mode 100644 index 0000000000..9bda4cfa3b --- /dev/null +++ b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.file.scaladsl + +import java.nio.file.{Files, Path} + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source} +import akka.testkit.TestKit +import com.google.common.jimfs.{Configuration, Jimfs} +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +class DirectorySpec + extends TestKit(ActorSystem("directoryspec")) + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaFutures { + + private val fs = Jimfs.newFileSystem(Configuration.forCurrentPlatform.toBuilder.build) + private implicit val mat = ActorMaterializer() + + "The directory source factory" should { + "list files" in { + val dir = fs.getPath("listfiles") + Files.createDirectories(dir) + val paths = (0 to 100).map { n => + val name = s"file$n" + Files.createFile(dir.resolve(name)) + } + + // #ls + val source: Source[Path, NotUsed] = Directory.ls(dir) + // #ls + + val result = source.runWith(Sink.seq).futureValue + result.toSet shouldEqual paths.toSet + } + + "walk a file tree" in { + val root = fs.getPath("walk") + Files.createDirectories(root) + val subdir1 = root.resolve("subdir1") + Files.createDirectories(subdir1) + val file1 = subdir1.resolve("file1") + Files.createFile(file1) + val subdir2 = root.resolve("subdir2") + Files.createDirectories(subdir2) + val file2 = subdir2.resolve("file2") + Files.createFile(file2) + + // #walk + val files: Source[Path, NotUsed] = Directory.walk(root) + // #walk + + val result = files.runWith(Sink.seq).futureValue + result shouldEqual List(root, subdir1, file1, subdir2, file2) + } + } + + override protected def afterAll(): Unit = + fs.close() +} diff --git a/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala index cf2b779167..8115f08808 100644 --- a/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala +++ b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala @@ -13,7 +13,7 @@ import akka.stream.{ActorMaterializer, Materializer} import scala.concurrent.duration._ -object FileTailSourceSpec extends { +object FileTailSourceSpec { // small sample of usage, tails the first argument file path def main(args: Array[String]): Unit = {