Skip to content

Commit

Permalink
Directory sources #272
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored May 8, 2017
1 parent a0ad914 commit ef8db12
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 5 deletions.
21 changes: 21 additions & 0 deletions docs/src/main/paradox/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ Java
: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/FileTailSourceTest.java) { #simple-lines }


### Directory

`Directory.ls(path)` lists all files and directories
directly in a given directory:

Scala
: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala) { #ls }

Java
: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java) { #ls }

`Directory.walk(path)` traverses all subdirectories and lists
files and directories depth first:

Scala
: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/DirectorySpec.scala) { #walk }

Java
: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java) { #walk }


### DirectoryChangesSource

The `DirectoryChangesSource` will emit elements every time there is a change to a watched directory
Expand Down
41 changes: 41 additions & 0 deletions file/src/main/java/akka/stream/alpakka/file/javadsl/Directory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.file.javadsl;

import akka.NotUsed;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters;
import scala.None;
import scala.None$;
import scala.collection.immutable.Nil$;

import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;

public final class Directory {

/**
* List all files in the given directory
*/
public static Source<Path, NotUsed> ls(Path directory) {
return akka.stream.alpakka.file.scaladsl.Directory.ls(directory).asJava();
}

/**
* Recursively list files and directories in the given directory, depth first.
*/
public static Source<Path, NotUsed> walk(Path directory) {
return StreamConverters.fromJavaStream(() -> Files.walk(directory));
}

/**
* Recursively list files and directories in the given directory, depth first,
* with a maximum directory depth limit and a possibly set of options (See {@link java.nio.file.Files#walk} for
* details.
*/
public static Source<Path, NotUsed> walk(Path directory, int maxDepth, FileVisitOption... options) {
return StreamConverters.fromJavaStream(() -> Files.walk(directory, maxDepth, options));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public String toString() {
* the JDK implementation is slow, it will not help lowering this
* @param maxBufferSize Maximum number of buffered directory changes before the stage fails
*/
@SuppressWarnings("unchecked")
public static Source<Pair<Path, DirectoryChange>, NotUsed> create(Path directoryPath, FiniteDuration pollInterval, int maxBufferSize) {
return Source.fromGraph(new DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, Pair::apply));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.file.scaladsl

import java.nio.file.{FileVisitOption, Files, Path}

import akka.NotUsed
import akka.stream.scaladsl.{Source, StreamConverters}

import scala.collection.immutable

object Directory {

/**
* List all files in the given directory
*/
def ls(directory: Path): Source[Path, NotUsed] = {
require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't")
StreamConverters.fromJavaStream(() => Files.list(directory))
}

/**
* Recursively list files and directories in the given directory and its subdirectories. Listing is done
* depth first.
*
* @param maxDepth If defined limits the depth of the directory structure to walk through
* @param fileVisitOptions See `java.nio.files.Files.walk()` for details
*/
def walk(directory: Path,
maxDepth: Option[Int] = None,
fileVisitOptions: immutable.Seq[FileVisitOption] = Nil): Source[Path, NotUsed] = {
require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't")
val factory = maxDepth match {
case None =>
() =>
Files.walk(directory, fileVisitOptions: _*)
case Some(maxDepth) =>
() =>
Files.walk(directory, maxDepth, fileVisitOptions: _*)
}

StreamConverters.fromJavaStream(factory)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import java.util.function.BiFunction

import akka.NotUsed
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource
import akka.stream.scaladsl.Source

import scala.concurrent.duration.FiniteDuration
Expand All @@ -30,6 +29,8 @@ object DirectoryChangesSource {
def apply(directoryPath: Path,
pollInterval: FiniteDuration,
maxBufferSize: Int): Source[(Path, DirectoryChange), NotUsed] =
Source.fromGraph(new DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, tupler))
Source.fromGraph(
new akka.stream.alpakka.file.javadsl.DirectoryChangesSource(directoryPath, pollInterval, maxBufferSize, tupler)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.Path

import akka.NotUsed
import akka.stream.alpakka.file.javadsl.FileTailSource
import akka.stream.alpakka.file.javadsl.{FileTailSource => JavaFileTailSource}
import akka.stream.scaladsl.Source
import akka.util.ByteString

Expand All @@ -34,7 +34,7 @@ object FileTailSource {
maxChunkSize: Int,
startingPosition: Long,
pollingInterval: FiniteDuration): Source[ByteString, NotUsed] =
Source.fromGraph(new FileTailSource(path, maxChunkSize, startingPosition, pollingInterval))
Source.fromGraph(new JavaFileTailSource(path, maxChunkSize, startingPosition, pollingInterval))

/**
* Scala API: Read the entire contents of a file as text lines, and then when the end is reached, keep reading
Expand Down
115 changes: 115 additions & 0 deletions file/src/test/java/akka/stream/alpakka/file/javadsl/DirectoryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.file.javadsl;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.TestKit;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.FiniteDuration;

import java.nio.file.FileSystem;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class DirectoryTest {

private FileSystem fs;
private ActorSystem system;
private Materializer materializer;

@Before
public void setup() {
fs = Jimfs.newFileSystem(Configuration.unix());
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
}


@Test
public void listFiles() throws Exception {
final Path dir = fs.getPath("listfiles");
Files.createDirectories(dir);
final Path file1 = Files.createFile(dir.resolve("file1"));
final Path file2 = Files.createFile(dir.resolve("file2"));

// #ls
final Source<Path, NotUsed> source = Directory.ls(dir);
// #ls

final List<Path> result = source.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(result.size(), 2);
assertEquals(result.get(0), file1);
assertEquals(result.get(1), file2);
}

@Test
public void walkAFileTree() throws Exception {
final Path root = fs.getPath("walk");
Files.createDirectories(root);
final Path subdir1 = root.resolve("subdir1");
Files.createDirectories(subdir1);
final Path file1 = subdir1.resolve("file1");
Files.createFile(file1);
final Path subdir2 = root.resolve("subdir2");
Files.createDirectories(subdir2);
final Path file2 = subdir2.resolve("file2");
Files.createFile(file2);

// #walk
final Source<Path, NotUsed> source = Directory.walk(root);
// #walk

final List<Path> result = source.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(result, Arrays.asList(root, subdir1, file1, subdir2, file2));
}

@Test
public void walkAFileTreeWithOptions() throws Exception {
final Path root = fs.getPath("walk2");
Files.createDirectories(root);
final Path subdir1 = root.resolve("subdir1");
Files.createDirectories(subdir1);
final Path file1 = subdir1.resolve("file1");
Files.createFile(file1);
final Path subdir2 = root.resolve("subdir2");
Files.createDirectories(subdir2);
final Path file2 = subdir2.resolve("file2");
Files.createFile(file2);

// #walk
final Source<Path, NotUsed> source = Directory.walk(root, 1, FileVisitOption.FOLLOW_LINKS);
// #walk

final List<Path> result = source.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(result, Arrays.asList(root, subdir1, subdir2));
}

@After
public void tearDown() throws Exception {
fs.close();
fs = null;
TestKit.shutdownActorSystem(system, FiniteDuration.create(10, TimeUnit.SECONDS), true);
system = null;
materializer = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.file.scaladsl

import java.nio.file.{Files, Path}

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.TestKit
import com.google.common.jimfs.{Configuration, Jimfs}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

class DirectorySpec
extends TestKit(ActorSystem("directoryspec"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {

private val fs = Jimfs.newFileSystem(Configuration.forCurrentPlatform.toBuilder.build)
private implicit val mat = ActorMaterializer()

"The directory source factory" should {
"list files" in {
val dir = fs.getPath("listfiles")
Files.createDirectories(dir)
val paths = (0 to 100).map { n =>
val name = s"file$n"
Files.createFile(dir.resolve(name))
}

// #ls
val source: Source[Path, NotUsed] = Directory.ls(dir)
// #ls

val result = source.runWith(Sink.seq).futureValue
result.toSet shouldEqual paths.toSet
}

"walk a file tree" in {
val root = fs.getPath("walk")
Files.createDirectories(root)
val subdir1 = root.resolve("subdir1")
Files.createDirectories(subdir1)
val file1 = subdir1.resolve("file1")
Files.createFile(file1)
val subdir2 = root.resolve("subdir2")
Files.createDirectories(subdir2)
val file2 = subdir2.resolve("file2")
Files.createFile(file2)

// #walk
val files: Source[Path, NotUsed] = Directory.walk(root)
// #walk

val result = files.runWith(Sink.seq).futureValue
result shouldEqual List(root, subdir1, file1, subdir2, file2)
}
}

override protected def afterAll(): Unit =
fs.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import akka.stream.{ActorMaterializer, Materializer}

import scala.concurrent.duration._

object FileTailSourceSpec extends {
object FileTailSourceSpec {

// small sample of usage, tails the first argument file path
def main(args: Array[String]): Unit = {
Expand Down

0 comments on commit ef8db12

Please sign in to comment.