Skip to content

Commit

Permalink
Merge pull request #3390 from typelevel/topic/walk-with-attributes
Browse files Browse the repository at this point in the history
Add `Files.walkWithAttributes`
  • Loading branch information
mpilquist authored Mar 1, 2024
2 parents 7509023 + 30e09a7 commit 5b18a37
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 50 deletions.
31 changes: 20 additions & 11 deletions io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,34 +391,43 @@ private[file] trait FilesCompanionPlatform {
.resource(Resource.fromAutoCloseable(javaCollection))
.flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize))

protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path] = {
protected def walkEager(start: Path, options: WalkOptions): Stream[F, PathInfo] = {
val doWalk = Sync[F].interruptible {
val bldr = Vector.newBuilder[Path]
val bldr = Vector.newBuilder[PathInfo]
JFiles.walkFileTree(
start.toNioPath,
if (options.followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava,
options.maxDepth,
new SimpleFileVisitor[JPath] {
private def enqueue(path: JPath): FileVisitResult = {
bldr += Path.fromNioPath(path)
private def enqueue(path: JPath, attrs: JBasicFileAttributes): FileVisitResult = {
bldr += PathInfo(Path.fromNioPath(path), new DelegatingBasicFileAttributes(attrs))
FileVisitResult.CONTINUE
}

override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(file)
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(file, attrs)

override def visitFileFailed(file: JPath, t: IOException): FileVisitResult =
t match {
case _: FileSystemLoopException =>
if (options.allowCycles) enqueue(file) else throw t
if (options.allowCycles)
enqueue(
file,
JFiles.readAttributes(
file,
classOf[JBasicFileAttributes],
LinkOption.NOFOLLOW_LINKS
)
)
else throw t
case _ => FileVisitResult.CONTINUE
}

override def preVisitDirectory(
dir: JPath,
attrs: JBasicFileAttributes
): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(dir)
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(dir, attrs)

override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else FileVisitResult.CONTINUE
Expand All @@ -439,18 +448,18 @@ private[file] trait FilesCompanionPlatform {
protected def walkJustInTime(
start: Path,
options: WalkOptions
): Stream[F, Path] = {
): Stream[F, PathInfo] = {
import scala.collection.immutable.Queue

def loop(toWalk0: Queue[WalkEntry]): Stream[F, Path] = {
def loop(toWalk0: Queue[WalkEntry]): Stream[F, PathInfo] = {
val partialWalk = Sync[F].interruptible {
var acc = Vector.empty[Path]
var acc = Vector.empty[PathInfo]
var toWalk = toWalk0

while (acc.size < options.chunkSize && toWalk.nonEmpty && !Thread.interrupted()) {
val entry = toWalk.head
toWalk = toWalk.drop(1)
acc = acc :+ entry.path
acc = acc :+ PathInfo(entry.path, new DelegatingBasicFileAttributes(entry.attr))
if (entry.depth < options.maxDepth) {
val dir =
if (entry.attr.isDirectory) entry.path
Expand Down
8 changes: 4 additions & 4 deletions io/jvm/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ package file

private[file] trait AsyncFilesPlatform[F[_]] { self: Files.UnsealedFiles[F] =>

override def walk(
override def walkWithAttributes(
start: Path,
options: WalkOptions
): Stream[F, Path] =
): Stream[F, PathInfo] =
if (options.chunkSize == Int.MaxValue) walkEager(start, options)
else walkJustInTime(start, options)

protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path]
protected def walkEager(start: Path, options: WalkOptions): Stream[F, PathInfo]

protected def walkJustInTime(
start: Path,
options: WalkOptions
): Stream[F, Path]
): Stream[F, PathInfo]
}
6 changes: 3 additions & 3 deletions io/native/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ package io
package file

private[file] trait AsyncFilesPlatform[F[_]] { self: Files.UnsealedFiles[F] =>
override def walk(
override def walkWithAttributes(
start: Path,
options: WalkOptions
): Stream[F, Path] =
): Stream[F, PathInfo] =
// Disable eager walks until https://github.com/scala-native/scala-native/issues/3744
walkJustInTime(start, options)

protected def walkJustInTime(
start: Path,
options: WalkOptions
): Stream[F, Path]
): Stream[F, PathInfo]
}
74 changes: 42 additions & 32 deletions io/shared/src/main/scala/fs2/io/file/Files.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,22 @@ sealed trait Files[F[_]] extends FilesPlatform[F] {
* For example, to eagerly walk a directory while following symbolic links, emitting all
* paths as a single chunk, use `walk(start, WalkOptions.Eager.withFollowLinks(true))`.
*/
def walk(start: Path, options: WalkOptions): Stream[F, Path]
def walk(start: Path, options: WalkOptions): Stream[F, Path] =
walkWithAttributes(start, options).map(_.path)

/** Creates a stream of paths contained in a given file tree down to a given depth.
*/
@deprecated("Use walk(start, WalkOptions.Default.withMaxDepth(..).withFollowLinks(..))", "3.10")
def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] =
walk(start, WalkOptions.Default)

/** Like `walk` but returns a `PathInfo`, which provides both the `Path` and `BasicFileAttributes`. */
def walkWithAttributes(start: Path): Stream[F, PathInfo] =
walkWithAttributes(start, WalkOptions.Default)

/** Like `walk` but returns a `PathInfo`, which provides both the `Path` and `BasicFileAttributes`. */
def walkWithAttributes(start: Path, options: WalkOptions): Stream[F, PathInfo]

/** Writes all data to the file at the specified path.
*
* The file is created if it does not exist and is truncated.
Expand Down Expand Up @@ -517,44 +525,46 @@ object Files extends FilesCompanionPlatform with FilesLowPriority {
case _: NoSuchFileException => ()
})

def walk(start: Path, options: WalkOptions): Stream[F, Path] = {
def walkWithAttributes(start: Path, options: WalkOptions): Stream[F, PathInfo] = {

def go(start: Path, maxDepth: Int, ancestry: List[Either[Path, FileKey]]): Stream[F, Path] =
Stream.emit(start) ++ {
if (maxDepth == 0) Stream.empty
else
Stream.eval(getBasicFileAttributes(start, followLinks = false)).mask.flatMap { attr =>
if (attr.isDirectory)
list(start).mask.flatMap { path =>
go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry)
def go(
start: Path,
maxDepth: Int,
ancestry: List[Either[Path, FileKey]]
): Stream[F, PathInfo] =
Stream.eval(getBasicFileAttributes(start, followLinks = false)).mask.flatMap { attr =>
Stream.emit(PathInfo(start, attr)) ++ {
if (maxDepth == 0) Stream.empty
else if (attr.isDirectory)
list(start).mask.flatMap { path =>
go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry)
}
else if (attr.isSymbolicLink && options.followLinks)
Stream.eval(getBasicFileAttributes(start, followLinks = true)).mask.flatMap { attr =>
val fileKey = attr.fileKey
val isCycle = Traverse[List].existsM(ancestry) {
case Right(ancestorKey) => F.pure(fileKey.contains(ancestorKey))
case Left(ancestorPath) => isSameFile(start, ancestorPath)
}
else if (attr.isSymbolicLink && options.followLinks)
Stream.eval(getBasicFileAttributes(start, followLinks = true)).mask.flatMap {
attr =>
val fileKey = attr.fileKey
val isCycle = Traverse[List].existsM(ancestry) {
case Right(ancestorKey) => F.pure(fileKey.contains(ancestorKey))
case Left(ancestorPath) => isSameFile(start, ancestorPath)
}

Stream.eval(isCycle).flatMap { isCycle =>
if (!isCycle)
list(start).mask.flatMap { path =>
go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry)
}
else if (options.allowCycles)
Stream.empty
else
Stream.raiseError(new FileSystemLoopException(start.toString))
Stream.eval(isCycle).flatMap { isCycle =>
if (!isCycle)
list(start).mask.flatMap { path =>
go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry)
}

else if (options.allowCycles)
Stream.empty
else
Stream.raiseError(new FileSystemLoopException(start.toString))
}
else
Stream.empty
}

}
else
Stream.empty
}
}

Stream.eval(getBasicFileAttributes(start, options.followLinks)) >> go(
go(
start,
options.maxDepth,
Nil
Expand Down
25 changes: 25 additions & 0 deletions io/shared/src/main/scala/fs2/io/file/PathInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2.io.file

/** Provides a `Path` and its associated `BasicFileAttributes`. */
case class PathInfo(path: Path, attributes: BasicFileAttributes)

0 comments on commit 5b18a37

Please sign in to comment.