From 072991afbc814175876700be1d43070a80fca09c Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 28 Jun 2016 21:53:56 +0100 Subject: [PATCH 1/3] HADOOP-13227 the initial output stream docs --- .../site/markdown/filesystem/outputstream.md | 352 ++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md new file mode 100644 index 0000000000000..a0c7e6dad03e2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -0,0 +1,352 @@ + + + + + + + +# Class `OutputStream` + +The `FileSystem.create()` and `FileSystem.append()` calls return an instance +of a class `FSDataOutputStream`, a subclass of `java.io.OutputStream`. + +It wraps an `OutputStream` instance, one which may implement `Streamable` +and `CanSetDropBehind`. This document covers the requirements of such implementations. + +A Java `OutputStream` allows applications to write a sequence of bytes to a destination. +In a Hadoop filesystem, that destination is the data under a path in the filesystem. + + + +## Output stream model + +An output stream consists of a buffer `buf: List[byte]` + + + +Data written to an `FSDataOutputStream` implementation + + +```java +public class FSDataOutputStream extends DataOutputStream, FilterOutputStream + implements Syncable, CanSetDropBehind, DataOutput { + // ... + } +``` + +## Durability + +1. `OutputStream.write()` MAY persist the data, synchronously or asynchronously +1. `OutputStream.flush()` flushes data to the destination. There +are no strict persistence requirements. +1. `Syncable.hflush()` synchronously sends all local data to the destination +filesystem. After returning to the caller, the data MUST be visible to other readers. +1. `Syncable.hsync()` MUST flush the data and persist data to the underlying durable +storage. +1. `close()` MUST flush out all remaining data in the buffers, and persist it. + + +## Concurrency + +1. The outcome of more than one process writing to the same file is undefined. + +1. An input stream opened to read a file *before the file was opened for writing* +MAY fetch updated data. Because of buffering and caching, this is not a requirement +—and if a input stream does pick up updated data, the point at +which the updated data is read is undefined. This surfaces in object stores +where a `seek()` call which closes and re-opens the connection may pick up +updated data, while forward stream reads do not. Similarly, in block-oriented +filesystems, the data may be cached a block at a time —and changes only picked +up when a different block is read. + +1. A Filesystem MAY allow the destination path to be manipulated while a stream +is writing to it —for example, `rename()` of the path or a parent; `delete()` of +a path or parent. In such a case, the outcome of future write operations on +the output stream is undefined. Some filesystems MAY implement locking to +prevent conflict. However, this tends to be rare on distributed filesystems, +for reasons well known in the literature. + +1. The Java API specification of `java.io.OutputStream` does not require +an instance of the class to be thread safe. +However, `org.apache.hadoop.hdfs.DFSOutputStream` +has a stronger thread safety model (possibly unintentionally). This fact is +relied upon in Apache HBase, as discovered in HADOOP-11708. Implementations +SHOULD be thread safe. *Note*: even the `DFSOutputStream` synchronization +model permits the output stream to `close()`'d while awaiting an acknowledgement +from datanode or namenode writes in an `hsync()` operation. + + +## Consistency + +There is no requirement for the data to be immediately visible to other applications +—not until a specific call to flush buffers or persist it to the underlying storage +medium are made. + + +1. The metadata of a file (`length(FS, path)` in particular) SHOULD be consistent +with the contents of the file after `flush()` and `sync()`. +HDFS does not do this except when the write crosses a block boundary; to do +otherwise would overload the Namenode. As a result, while a file is being written +`length(Filesystem, Path)` MAY be less than the length of `data(Filesystem, Path)`. +1. The metadata MUST be consistent with the contents of a file after the `close()` +operation. +1. If the filesystem supports modification timestamps, this +timestamp MAY be updated while a file is being written, especially after a +`Syncable.hsync()` call. The timestamps MUST be updated after the file is closed. +1. After the contents of an input stream have been persisted (`hflush()/hsync()`) +all new `open(FS, Path)` operations MUST return the updated data. + +### HDFS + +That HDFS file metadata often lags the content of a file being written +to is not something everyone expects, nor convenient for any program trying +pick up updated data in a file being written. Most visible is the length +of a file returned in the various `list` commands and `getFileStatus` —this +is often out of data. + +As HDFS only supports file growth in its output operations, this means +that the size of the file as listed in the metadata may be less than or equal +to the number of available bytes —but never larger. This is a guarantee which +is also held + +The standard technique to determine whether a file is updated is: + +1. Remember the last read position `pos` in the file, using `0` if this is the initial +read. +1. Use `getFileStatus(FS, Path)` to query the updated length of the file as +recorded in the metadata. +1. If `Status.length > pos`, the file has grown. +1. If the number has not changed, then + 1. reopen the file. + 1. `seek(pos)` to that location + 1. If `read() != -1`, there is new data. + +This algorithm works for filesystems which are consistent with metadata and +data, as well as HDFS. What is important to know is that even if the length +of the file is declared to be 0, in HDFS this can mean +"there is data —it just that the metadata is not up to date" + +### Local Filesystem, `file:` + +`LocalFileSystem`, `file:`, (or any other `FileSystem` implementation based on +`ChecksumFileSystem`) has a different issue. If an output stream +is obtained from `create()` and `FileSystem.setWriteChecksum(false)` has +*not* been called on the filesystem, then the FS only flushes as much +local data as can be written to full checksummed blocks of data. + +That is, the flush operations are not guaranteed to write all the pending +data until the file is finally closed. + +That is, `sync()` and `hsync()` cannot be guaranteed to persist the data currently +buffered locally. + +For anyone thinking "this is a violation of this specification" —they are correct. + +### Object Stores + +Object store implementations historically cache the entire object's data +locally on disk (possibly in memory), until the final `close()` operation +triggers a single `PUT` of the data. Some implementations push out intermediate +blocks of data, synchronously or asynchronously. + +Accordingly, they tend not to implement the `Syncable` interface. +However, Exception: Azure's `PageBlobOutputStream` does implement `hsync()`, +blocking until ongoing writes complete. + + +## Model + +For this specification, the output stream can be viewed as a cached byte array +alongside the filesystem. After data is flushed, read operations on the filesystem +should be in sync with the data + + (path, cache) + +when opening a new file, the initial state of the cache is empty + + (path, []) + +When appending to a file, that is the output stream is created using +`FileSystem.append(path, buffersize, progress)`, then the initial state +of the cache is the current contents of the file + + + (path, data(FS, path)) + + +### `write(byte)` + + cache' = cache + [byte] + +### `write(byte[] buffer, int offset, int len)` + + +#### Preconditions + +The preconditions are all defined in `OutputStream.write()` + + buffer != null else raise NullPointerException + offset >= 0 else raise IndexOutOfBoundsException + len >= 0 else raise IndexOutOfBoundsException + offset < buffer.length else raise IndexOutOfBoundsException + offset + len < buffer.length else raise IndexOutOfBoundsException + + +#### Postconditions + + cache' = cache + buffer[offset...offset+len] + + +### `write(byte[] buffer)` + +This is required to be the equivalent of + + write(buffer, 0, buffer.length) + + +#### Preconditions + +With the offset of 0 and the length known to be that of the buffer, the +preconditions can be simplified to + + + buffer != null else raise NullPointerException + +#### Postconditions + +The postconditions become + + cache' = cache + buffer[0...buffer.length] + +Which is equivalent to + + cache' = cache + buffer + +### `flush()` + +Requests that the data is flushed. The specification of `ObjectStream.flush()` +declares that this SHOULD write data to the "intended destination". + +It explicitly precludes any guarantees about durability. + + +#### Preconditions + +#### Postconditions + + +### `close()` + +The `close()` operation completes the write. It is expected to block +until the write has completed (as with `Syncable.hflush()`), possibly +until it has been written to durable storage (as HDFS does). + +After `close()` completes, the data in a file MUST be visible and consistent +with the data most recently written. The metadata of the file MUST be consistent +with the data and the write history itself (i.e. any modification time fields +updated). + +Any locking/leaseholding mechanism is also required to release its lock/lease. + +The are two non-requirements of the `close()` operation; code use + +The `close()` call MAY fail during its operation. This is clearly an erroneous +outcome, but it is possible. + +1. Callers of the API MUST expect this and SHOULD code appropriately. Catching +and swallowing exceptions, while common, is not always the ideal solution. +1. Even after a failure, `close()` should place the stream into a closed state, +where future calls to `close()` are ignored, and calls to other methods +rejected. That is: caller's cannot be expected to call `close()` repeatedly +until it succeeds. +1. The duration of the `call()` operation is undefined. Operations which rely +on acknowledgements from remote systems to meet the persistence guarantees +implicitly have to await these acknowledgements. Some Object Store output streams +upload the entire data file in the `close()` operation. This can take a large amount +of time. The fact that many user applications assume that `close()` is both fast +and does not fail means that this behavior is dangerous. + + + +## `org.apache.hadoop.fs.Syncable` + +The purpose of `Syncable` interface is to provide guarantees that data is written +to a filesystem for both visibility and durability. + +It's presence is an explicit declaration of an Object Stream's ability to +meet those guarantees. + +The interface MUST NOT be declared as implemented by an `OutputStream` unless +those guarantees can be met. + +The `Syncable` interface has been implemented by other classes than +subclasses of `OutputStream`. Therefore the fact that +a class implements `Syncable` does not guarantee that `extends OutputStream` +holds. + +This specification only covers the required behavior of `ObjectStream` subclasses +which implement `Syncable`. + +### `Syncable.hflush()` + +Flush out the data in client's user buffer. After the return of +this call, new readers will see the data. + + FS' = FS where data(path) == cache + +It's not clear whether this operation is expected to be blocking, that is, +whether, after the call returns, is it guaranteed that the data is +now visible to all + + +### `Syncable.hsync()` + +Similar to POSIX fsync, save the data in client's user buffer +all the way to the disk device (but the disk may have it in its cache). + +That is, it is a requirement for the underlying FS To save all the data to +the disk hardware itself, where it is expected to be durable. + + FS' = FS where data(path) == cache + +The reference implementation, `DFSOutputStream` will block +until an acknowledgement is received from the datanodes: That is, all hosts +in the replica write chain have successfully written the file. + +That means that the expectation callers may have is that the return of +the method call contains visibility and durability guarantees which other +implementations must maintain. + +Note, however, that the reference `DFSOutputStream.hsync()` call only actually syncs/ +*the current block*. If there have been a series of writes since the last sync, +such that a block boundary has been crossed. The `hsync()` call claims only +to write the most recent. +From the javadocs of `DFSOutputStream.hsync(EnumSet syncFlags)` + +> Note that only the current block is flushed to the disk device. +> To guarantee durable sync across block boundaries the stream should +> be created with {@link CreateFlag#SYNC_BLOCK}. + + +In virtual machines, the notion of "disk hardware" is really that of +another software abstraction. There can be no durability guarantees in such +environments. + + + +### `Syncable.hflush()` + +Deprecated: replaced by `hflush()` + From 93c4f2c2e7c44c3eb175cb6039f170a14f2312d0 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 18 Nov 2016 12:30:50 +0000 Subject: [PATCH 2/3] HADOOP-13327 adding open/closed state to output stream, java.io.OutputStream defining actions when output streams are closed. Change-Id: Ia921e8d2f9af53e8e6655a806e79fcc1dba49d1d --- .../site/markdown/filesystem/filesystem.md | 14 +- .../site/markdown/filesystem/outputstream.md | 155 ++++++++++++++---- 2 files changed, 129 insertions(+), 40 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index b87ee4cffc81e..be485c6414610 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -581,10 +581,12 @@ and MAY be a `RuntimeException` or subclass. For instance, HDFS may raise a `Inv FS' where : FS'.Files'[p] == [] - ancestors(p) is-subset-of FS'.Directories' + and ancestors(p) is-subset-of FS'.Directories' result = FSDataOutputStream +A zero byte file must exist at the end of the specified path, visible to all + The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`. The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of @@ -600,8 +602,12 @@ The result is `FSDataOutputStream`, which through its operations may generate ne * S3N, S3A, Swift and potentially other Object Stores do not currently change the FS state until the output stream `close()` operation is completed. -This MAY be a bug, as it allows >1 client to create a file with `overwrite==false`, - and potentially confuse file/directory logic +This is a significant difference between the behavior of object stores +and that of filesystems, as it allows >1 client to create a file with `overwrite==false`, +and potentially confuse file/directory logic. In particular, using create() to acquire +an exclusive lock on a file (whoever creates the file without an error is considered +the holder of the lock) is not a valid algorithm when working with object stores. + * The Local FileSystem raises a `FileNotFoundException` when trying to create a file over a directory, hence it is listed as an exception that MAY be raised when @@ -621,7 +627,7 @@ Implementations MAY throw `UnsupportedOperationException`. #### Postconditions - FS + FS' = FS result = FSDataOutputStream Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]` diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index a0c7e6dad03e2..218944f140498 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -35,6 +35,35 @@ In a Hadoop filesystem, that destination is the data under a path in the filesys An output stream consists of a buffer `buf: List[byte]` +## Model + +For this specification, the output stream can be viewed as a cached byte array +alongside the filesystem. After data is flushed, read operations on the filesystem +should be in sync with the data + + Stream = (path, isOpen, cache) + +when opening a new file, the initial state of the cache is empty + + Stream = (path, true, []) + +When appending to a file, that is the output stream is created using +`FileSystem.append(path, buffersize, progress)`, then the initial state +of the cache is the current contents of the file + + + Stream = (path, true, data(FS, path)) + +After a call to `close()`, the stream is closed for all operations other +than `close()`; they MAY fail with `IOException` or `RuntimeException` instances. + + Stream = (path, false, []) + +The `close()` operation must become a no-op. That is: followon attempts to +persist data after a failure MUST NOT be made. (alternatively: users of the +API MUST NOT expect failing `close()` attemps to succeed if retried) + + Data written to an `FSDataOutputStream` implementation @@ -52,10 +81,14 @@ public class FSDataOutputStream extends DataOutputStream, FilterOutputStream 1. `OutputStream.flush()` flushes data to the destination. There are no strict persistence requirements. 1. `Syncable.hflush()` synchronously sends all local data to the destination -filesystem. After returning to the caller, the data MUST be visible to other readers. +filesystem. After returning to the caller, the data MUST be visible to other readers, +it MAY be durable. That is: it does not have to be persisted, merely guaranteed +to be consistently visible to all clients attempting to open a new stream reading +data at the path. 1. `Syncable.hsync()` MUST flush the data and persist data to the underlying durable storage. -1. `close()` MUST flush out all remaining data in the buffers, and persist it. +1. `close()` The first call to `close()` MUST flush out all remaining data in +the buffers, and persist it. ## Concurrency @@ -63,7 +96,8 @@ storage. 1. The outcome of more than one process writing to the same file is undefined. 1. An input stream opened to read a file *before the file was opened for writing* -MAY fetch updated data. Because of buffering and caching, this is not a requirement +MAY fetch data updated by writes to an OutputStream. +Because of buffering and caching, this is not a requirement —and if a input stream does pick up updated data, the point at which the updated data is read is undefined. This surfaces in object stores where a `seek()` call which closes and re-opens the connection may pick up @@ -88,13 +122,16 @@ model permits the output stream to `close()`'d while awaiting an acknowledgement from datanode or namenode writes in an `hsync()` operation. -## Consistency +## Consistency and Visibility There is no requirement for the data to be immediately visible to other applications —not until a specific call to flush buffers or persist it to the underlying storage medium are made. - +1. If an output stream is created with `FileSystem.create(path)`, with overwrite=true +and there is an existing file at the path, that is `exists(FS, path)` holds, +then, the existing data is immediately unavailable; the data at the end of the +path MUST consist of an empty byte sequence `[]`, with consistent metadata. 1. The metadata of a file (`length(FS, path)` in particular) SHOULD be consistent with the contents of the file after `flush()` and `sync()`. HDFS does not do this except when the write crosses a block boundary; to do @@ -105,8 +142,13 @@ operation. 1. If the filesystem supports modification timestamps, this timestamp MAY be updated while a file is being written, especially after a `Syncable.hsync()` call. The timestamps MUST be updated after the file is closed. -1. After the contents of an input stream have been persisted (`hflush()/hsync()`) +1. After the contents of an output stream have been persisted (`hflush()/hsync()`) all new `open(FS, Path)` operations MUST return the updated data. +1. After `close()` has been invoked on an output stream returned from a call +to `FileSystem.create(Path,...)`, a call to `getFileStatus(path)` MUST return the +final metadata of the written file, including length and modification time. +The metadata of the file returned in any of the FileSystem `list` operations +MUST be consistent with this metadata. ### HDFS @@ -134,9 +176,9 @@ recorded in the metadata. 1. If `read() != -1`, there is new data. This algorithm works for filesystems which are consistent with metadata and -data, as well as HDFS. What is important to know is that even if the length -of the file is declared to be 0, in HDFS this can mean -"there is data —it just that the metadata is not up to date" +data, as well as HDFS. What is important to know is that even if the +`getFileStatus(path).getLen()==0` holds, +in HDFS there may be data, but the metadata is not up to date" ### Local Filesystem, `file:` @@ -153,6 +195,7 @@ That is, `sync()` and `hsync()` cannot be guaranteed to persist the data current buffered locally. For anyone thinking "this is a violation of this specification" —they are correct. +The local filesystem is intended for testing, rather than production use. ### Object Stores @@ -163,32 +206,40 @@ blocks of data, synchronously or asynchronously. Accordingly, they tend not to implement the `Syncable` interface. However, Exception: Azure's `PageBlobOutputStream` does implement `hsync()`, -blocking until ongoing writes complete. +blocking until write operations being executed in separate threads have completed. +Equally importantly -## Model +1. The object may not be visible at the end of the path until the final `close()`. +is called; this holds for `getFileStatus()`, `open()` and all FileSystem list operations. +1. Any existing data at the end of a path `p`, may remain visible until the final +`close()` operation overwrites this data. +1. The check for existing data in a `create()` call with overwrite=false, may +take place in the `create()` call itself, in the `close()` call prior to/during +the write, or at some point in between. Expect in the special case that the +object store supports an atomic PUT operation, the check for existence of +existing data and the subsequent creation of data at the path contains a race +condition: other clients may create data at the path between the existence check +and the subsequent qrite. -For this specification, the output stream can be viewed as a cached byte array -alongside the filesystem. After data is flushed, read operations on the filesystem -should be in sync with the data - (path, cache) -when opening a new file, the initial state of the cache is empty - (path, []) +### `write(int data)` -When appending to a file, that is the output stream is created using -`FileSystem.append(path, buffersize, progress)`, then the initial state -of the cache is the current contents of the file +#### Preconditions + Stream.isOpen else raise IOException - (path, data(FS, path)) +#### Postconditions +The cache has the lower 8 bits of the data argument appended to it. -### `write(byte)` + Stream'.cache = Stream.cache + [data & 0xff] - cache' = cache + [byte] +There may be an explicit limit on the size of cached data, or an implicit +limit based by the available capacity of the destination filesystem. +When a limit is reached, `write()` MUST fail with an `IOException`. ### `write(byte[] buffer, int offset, int len)` @@ -197,6 +248,7 @@ of the cache is the current contents of the file The preconditions are all defined in `OutputStream.write()` + Stream.isOpen else raise IOException buffer != null else raise NullPointerException offset >= 0 else raise IndexOutOfBoundsException len >= 0 else raise IndexOutOfBoundsException @@ -204,9 +256,16 @@ The preconditions are all defined in `OutputStream.write()` offset + len < buffer.length else raise IndexOutOfBoundsException +There may be an explicit limit on the size of cached data, or an implicit +limit based by the available capacity of the destination filesystem. +When a limit is reached, `write()` MUST fail with an `IOException`. + +After the operation has returned, the buffer may be re-used. The outcome +of updates to the buffer while the `write()` operation is in progress is undefined. + #### Postconditions - cache' = cache + buffer[offset...offset+len] + Stream'.cache = Stream.cache + buffer[offset...offset+len] ### `write(byte[] buffer)` @@ -218,9 +277,12 @@ This is required to be the equivalent of #### Preconditions -With the offset of 0 and the length known to be that of the buffer, the -preconditions can be simplified to + Stream.isOpen else raise IOException + +With the offset of 0 and the length known to be that of the buffer, the +other preconditions of `write(byte[] buffer, int offset, int len)` +can be simplified to buffer != null else raise NullPointerException @@ -228,11 +290,11 @@ preconditions can be simplified to The postconditions become - cache' = cache + buffer[0...buffer.length] + Stream'.cache = Stream.cache + buffer[0...buffer.length] Which is equivalent to - cache' = cache + buffer + Stream'.cache = Stream.cache + buffer ### `flush()` @@ -244,9 +306,14 @@ It explicitly precludes any guarantees about durability. #### Preconditions + Stream.isOpen else raise IOException + #### Postconditions + FS' = FS where data(path) == cache + + ### `close()` The `close()` operation completes the write. It is expected to block @@ -260,15 +327,17 @@ updated). Any locking/leaseholding mechanism is also required to release its lock/lease. -The are two non-requirements of the `close()` operation; code use -The `close()` call MAY fail during its operation. This is clearly an erroneous -outcome, but it is possible. + FS' = FS where data(path) == cache + Stream'.isOpen = false + -1. Callers of the API MUST expect this and SHOULD code appropriately. Catching -and swallowing exceptions, while common, is not always the ideal solution. -1. Even after a failure, `close()` should place the stream into a closed state, -where future calls to `close()` are ignored, and calls to other methods +The `close()` call MAY fail during its operation. + +1. Callers of the API MUST expect for some calls to fail and SHOULD code appropriately. +Catching and swallowing exceptions, while common, is not always the ideal solution. +1. Even after a failure, `close()` MUST place the stream into a closed state. +Follow-on calls to `close()` are ignored, and calls to other methods rejected. That is: caller's cannot be expected to call `close()` repeatedly until it succeeds. 1. The duration of the `call()` operation is undefined. Operations which rely @@ -304,6 +373,13 @@ which implement `Syncable`. Flush out the data in client's user buffer. After the return of this call, new readers will see the data. +#### Preconditions + + Stream.isOpen else raise IOException + +#### Postconditions + + FS' = FS where data(path) == cache It's not clear whether this operation is expected to be blocking, that is, @@ -319,6 +395,13 @@ all the way to the disk device (but the disk may have it in its cache). That is, it is a requirement for the underlying FS To save all the data to the disk hardware itself, where it is expected to be durable. +#### Preconditions + + Stream.isOpen else raise IOException + +#### Postconditions + + FS' = FS where data(path) == cache The reference implementation, `DFSOutputStream` will block @@ -348,5 +431,5 @@ environments. ### `Syncable.hflush()` -Deprecated: replaced by `hflush()` +Deprecated: replaced by `hsync()` From 8177448ee29f30216219a0f63f49ca9820d1ccc3 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 1 May 2017 19:09:53 +0100 Subject: [PATCH 3/3] HADOOP-13327 spec out output stream and syncable, some minor cleanup of a few interfaces in the process --- .../apache/hadoop/fs/CanSetDropBehind.java | 2 +- .../apache/hadoop/fs/FSDataOutputStream.java | 1 - .../java/org/apache/hadoop/fs/Syncable.java | 6 +- .../site/markdown/filesystem/filesystem.md | 20 +- .../site/markdown/filesystem/outputstream.md | 589 ++++++++++++------ 5 files changed, 402 insertions(+), 216 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java index 2e2d98b9c5462..0077838920a9e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java @@ -36,6 +36,6 @@ public interface CanSetDropBehind { * UnsupportedOperationException If this stream doesn't support * setting the drop-behind. */ - public void setDropBehind(Boolean dropCache) + void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java index 9a59b1d17200a..41715681f5b52 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.fs; -import java.io.*; import java.io.DataOutputStream; import java.io.FilterOutputStream; import java.io.IOException; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java index 85abe067f3175..9c6bc54111087 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java @@ -31,17 +31,17 @@ public interface Syncable { * @deprecated As of HADOOP 0.21.0, replaced by hflush * @see #hflush() */ - @Deprecated public void sync() throws IOException; + @Deprecated void sync() throws IOException; /** Flush out the data in client's user buffer. After the return of * this call, new readers will see the data. * @throws IOException if any error occurs */ - public void hflush() throws IOException; + void hflush() throws IOException; /** Similar to posix fsync, flush out the data in client's user buffer * all the way to the disk device (but the disk may have it in its cache). * @throws IOException if error occurs */ - public void hsync() throws IOException; + void hsync() throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index be485c6414610..036cb42af1a0f 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -202,21 +202,21 @@ directory contains many thousands of files. Consider a directory `"/d"` with the contents: - a - part-0000001 - part-0000002 - ... - part-9999999 + a + part-0000001 + part-0000002 + ... + part-9999999 If the number of files is such that HDFS returns a partial listing in each response, then, if a listing `listStatus("/d")` takes place concurrently with the operation `rename("/d/a","/d/z"))`, the result may be one of: - [a, part-0000001, ... , part-9999999] - [part-0000001, ... , part-9999999, z] - [a, part-0000001, ... , part-9999999, z] - [part-0000001, ... , part-9999999] + [a, part-0000001, ... , part-9999999] + [part-0000001, ... , part-9999999, z] + [a, part-0000001, ... , part-9999999, z] + [part-0000001, ... , part-9999999] While this situation is likely to be a rare occurrence, it MAY happen. In HDFS these inconsistent views are only likely when listing a directory with many children. @@ -604,7 +604,7 @@ The result is `FSDataOutputStream`, which through its operations may generate ne until the output stream `close()` operation is completed. This is a significant difference between the behavior of object stores and that of filesystems, as it allows >1 client to create a file with `overwrite==false`, -and potentially confuse file/directory logic. In particular, using create() to acquire +and potentially confuse file/directory logic. In particular, using `create()` to acquire an exclusive lock on a file (whoever creates the file without an error is considered the holder of the lock) is not a valid algorithm when working with object stores. diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index 218944f140498..07915fdfc1d24 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -12,215 +12,223 @@ limitations under the License. See accompanying LICENSE file. --> - - - +# Output Streams +With the exception of `FileSystem.copyFromLocalFile()`, +all API methods operations which write data to a filesystem is done +with OutputStream subclasses obtained through calls to +`FileSystem.create()` or `FileSystem.append()`. These return +instances of `FSDataOutputStream`, through which data can be written. +After a stream's `close()` method is called, all data written to the +stream MUST BE persisted to the fileysystem and visible to oll other +clients attempting to read data from that path via `FileSystem.open()` +—until any change happens to that data, such as it being overwritten, deleted, +or appended to. -# Class `OutputStream` +As well as operations to write the data, the Hadoop output stream +subclasses also provide methods to request that the data is written +back to the filesystem, with two different requirements -The `FileSystem.create()` and `FileSystem.append()` calls return an instance -of a class `FSDataOutputStream`, a subclass of `java.io.OutputStream`. +* That it is flushed: that the data is written back to the filesystem, and +that other clients attempting to read from the file MUST be returned the +newly flushed data. -It wraps an `OutputStream` instance, one which may implement `Streamable` -and `CanSetDropBehind`. This document covers the requirements of such implementations. +* That it is persisted: that the data is not only flushed to the FileSystem, +but that the FS has, to its satisfaction, saved the data to a persistent +storage medium. -A Java `OutputStream` allows applications to write a sequence of bytes to a destination. -In a Hadoop filesystem, that destination is the data under a path in the filesystem. +These two guarantees are implemented by the `Syncable` interface. +## Issues with the Hadoop Output Stream model. -## Output stream model -An output stream consists of a buffer `buf: List[byte]` +There are some known issues with the output stream model as offered by Hadoop, +specifically about the guarantees about when data is written and persisted by +different filesystems, and when the metadata is synchronized. These are +where implementation aspects of HDFS and the "Local" filesystem surface in ways +which do not match the simple model of the filesystem used in this specification. +1. HDFS: The metadata on a file, specifically it's length and modification time, may lag +that of the actual data. That is: if a file is opened and read, more data may +be returned than a call to `getFileStatus()` would indicate. It is only +once an output stream has been closed that the metadata and data can be guaranteed +to be consistent. -## Model +1. Local: `flush()` does not guaranteed to flush all the data to the local filesystem. +This is because the checksum process needs whole checksummable chunks to scan +to produce a checksum: only complete chunks are flushed, When the output stream +is closed via `close()`, then all data is written, even any trailing incomplete chunk. -For this specification, the output stream can be viewed as a cached byte array -alongside the filesystem. After data is flushed, read operations on the filesystem -should be in sync with the data - Stream = (path, isOpen, cache) -when opening a new file, the initial state of the cache is empty +Object stores have their own behavior, which again, differs significantly +from that of a classic POSIX filesystem. - Stream = (path, true, []) - -When appending to a file, that is the output stream is created using -`FileSystem.append(path, buffersize, progress)`, then the initial state -of the cache is the current contents of the file +1. There is no guarantee that any file will be visible at the path of an output +stream after the output stream is created. +1. There is no guarantee that any file or data will be visible at the path of an output +stream after data is written to the output stream and it is then flushed by +any of the flush/sync operations supported in the stream's (static) interfaces. +One guarantee which Object Stores MUST make is the same as those of POSIX +filesystems: After a stream `close()` call returns, the data must be persisted +durably and visible to all callers. Though there, the eventual consistency +nature of some object stores means that existing data on a path MAY be visible +for an indeterminate period of time. The fact that old data can be seen by different +callers, even by the same process in sequential operations, means that the +requirements of the specification below are not met. - Stream = (path, true, data(FS, path)) +To exacerbate the problem, the interface whose existence is meant to +offere guarantees of durability, `Syncable`, declared as an implementation +of all `FSDataOutputStream` instances: it is impossible for an object store +to not declare that it supports the operations. Instead, they are just ignored. -After a call to `close()`, the stream is closed for all operations other -than `close()`; they MAY fail with `IOException` or `RuntimeException` instances. +[HDFS-11644](https://issues.apache.org/jira/browse/HDFS-11644), +*DFSStripedOutputStream should not implement Syncable* covers this issue +in the context of HDFS Erasure Coding. - Stream = (path, false, []) -The `close()` operation must become a no-op. That is: followon attempts to -persist data after a failure MUST NOT be made. (alternatively: users of the -API MUST NOT expect failing `close()` attemps to succeed if retried) + + + -Data written to an `FSDataOutputStream` implementation +# Class `FSDataOutputStream` ```java -public class FSDataOutputStream extends DataOutputStream, FilterOutputStream - implements Syncable, CanSetDropBehind, DataOutput { +public class FSDataOutputStream + extends DataOutputStream + implements Syncable, CanSetDropBehind { // ... } ``` -## Durability +The `FileSystem.create()` and `FileSystem.append()` calls return an instance +of a class `FSDataOutputStream`, a subclass of `java.io.OutputStream`. -1. `OutputStream.write()` MAY persist the data, synchronously or asynchronously -1. `OutputStream.flush()` flushes data to the destination. There -are no strict persistence requirements. -1. `Syncable.hflush()` synchronously sends all local data to the destination -filesystem. After returning to the caller, the data MUST be visible to other readers, -it MAY be durable. That is: it does not have to be persisted, merely guaranteed -to be consistently visible to all clients attempting to open a new stream reading -data at the path. -1. `Syncable.hsync()` MUST flush the data and persist data to the underlying durable -storage. -1. `close()` The first call to `close()` MUST flush out all remaining data in -the buffers, and persist it. +The base class wraps an `OutputStream` instance, one which may implement `Streamable` +and `CanSetDropBehind`. This document covers the requirements of such implementations. +A Java `OutputStream` allows applications to write a sequence of bytes to a destination. +In a Hadoop filesystem, that destination is the data under a path in the filesystem. -## Concurrency -1. The outcome of more than one process writing to the same file is undefined. +HDFS's `FileSystem` implementation, `DistributedFileSystem`, returns an instance +of `HdfsDataOutputStream`. This implementation has at least two behaviors +which are not explicitly declared by the base Java implmentation -1. An input stream opened to read a file *before the file was opened for writing* -MAY fetch data updated by writes to an OutputStream. -Because of buffering and caching, this is not a requirement -—and if a input stream does pick up updated data, the point at -which the updated data is read is undefined. This surfaces in object stores -where a `seek()` call which closes and re-opens the connection may pick up -updated data, while forward stream reads do not. Similarly, in block-oriented -filesystems, the data may be cached a block at a time —and changes only picked -up when a different block is read. +1. Writes are synchronized: more than one thread can write to the same +output stream. This is a use pattern which HBase relies on. -1. A Filesystem MAY allow the destination path to be manipulated while a stream -is writing to it —for example, `rename()` of the path or a parent; `delete()` of -a path or parent. In such a case, the outcome of future write operations on -the output stream is undefined. Some filesystems MAY implement locking to -prevent conflict. However, this tends to be rare on distributed filesystems, -for reasons well known in the literature. +1. `OutputStream.flush()` is a no-op when the file is closed. Apache Druid +has made such a call on this in the past, though it is something they may +have corrected. -1. The Java API specification of `java.io.OutputStream` does not require -an instance of the class to be thread safe. -However, `org.apache.hadoop.hdfs.DFSOutputStream` -has a stronger thread safety model (possibly unintentionally). This fact is -relied upon in Apache HBase, as discovered in HADOOP-11708. Implementations -SHOULD be thread safe. *Note*: even the `DFSOutputStream` synchronization -model permits the output stream to `close()`'d while awaiting an acknowledgement -from datanode or namenode writes in an `hsync()` operation. +As the HDFS implementation is considered the de-facto specification of +the FileSystem APIs, the fact that `write()` is thread-safe is significant, +because programs rely on this. +For compatibility, not only must other FS clients be thread-safe, +but new HDFS featues, such as encryption and Erasure Coding must also +implement consistent behavior with the core HDFS output stream. -## Consistency and Visibility +Put differently -There is no requirement for the data to be immediately visible to other applications -—not until a specific call to flush buffers or persist it to the underlying storage -medium are made. +*it isn't enough for HDFS Output Streams to implement the core semantics +of `java.io.OutputStream`: they need to implement the extra semantics +of `HdfsDataOutputStream`. Failure to do so must be considered regressions* -1. If an output stream is created with `FileSystem.create(path)`, with overwrite=true -and there is an existing file at the path, that is `exists(FS, path)` holds, -then, the existing data is immediately unavailable; the data at the end of the -path MUST consist of an empty byte sequence `[]`, with consistent metadata. -1. The metadata of a file (`length(FS, path)` in particular) SHOULD be consistent -with the contents of the file after `flush()` and `sync()`. -HDFS does not do this except when the write crosses a block boundary; to do -otherwise would overload the Namenode. As a result, while a file is being written -`length(Filesystem, Path)` MAY be less than the length of `data(Filesystem, Path)`. -1. The metadata MUST be consistent with the contents of a file after the `close()` -operation. -1. If the filesystem supports modification timestamps, this -timestamp MAY be updated while a file is being written, especially after a -`Syncable.hsync()` call. The timestamps MUST be updated after the file is closed. -1. After the contents of an output stream have been persisted (`hflush()/hsync()`) -all new `open(FS, Path)` operations MUST return the updated data. -1. After `close()` has been invoked on an output stream returned from a call -to `FileSystem.create(Path,...)`, a call to `getFileStatus(path)` MUST return the -final metadata of the written file, including length and modification time. -The metadata of the file returned in any of the FileSystem `list` operations -MUST be consistent with this metadata. +The concurrent `write()` call is the most significant tightening of +this specification. -### HDFS -That HDFS file metadata often lags the content of a file being written -to is not something everyone expects, nor convenient for any program trying -pick up updated data in a file being written. Most visible is the length -of a file returned in the various `list` commands and `getFileStatus` —this -is often out of data. -As HDFS only supports file growth in its output operations, this means -that the size of the file as listed in the metadata may be less than or equal -to the number of available bytes —but never larger. This is a guarantee which -is also held +## Output stream model -The standard technique to determine whether a file is updated is: +An output stream consists of a buffer `buffer: List[byte]` -1. Remember the last read position `pos` in the file, using `0` if this is the initial -read. -1. Use `getFileStatus(FS, Path)` to query the updated length of the file as -recorded in the metadata. -1. If `Status.length > pos`, the file has grown. -1. If the number has not changed, then - 1. reopen the file. - 1. `seek(pos)` to that location - 1. If `read() != -1`, there is new data. +For this specification, the output stream can be viewed as a list of bytes, +the buffer, bonded to a path in its owning filesystem. A flag, `open` +tracks whether the stream is open: after the stream is closed no more +data may be written to it -This algorithm works for filesystems which are consistent with metadata and -data, as well as HDFS. What is important to know is that even if the -`getFileStatus(path).getLen()==0` holds, -in HDFS there may be data, but the metadata is not up to date" + FS: FileSystem + path: List[PathElement] + open: bool + buffer: List[byte] -### Local Filesystem, `file:` +(Immediately) after stream operations which flush data to the filesystem, +the contents of the file at the stream's path must match that of the +buffer. That is, the following condition holds + -`LocalFileSystem`, `file:`, (or any other `FileSystem` implementation based on -`ChecksumFileSystem`) has a different issue. If an output stream -is obtained from `create()` and `FileSystem.setWriteChecksum(false)` has -*not* been called on the filesystem, then the FS only flushes as much -local data as can be written to full checksummed blocks of data. + FS'.Files(path) == buffer + + +data is flushed, read operations on the filesystem +should be in sync with the data -That is, the flush operations are not guaranteed to write all the pending -data until the file is finally closed. + Stream = (FS', path, open, buffer) -That is, `sync()` and `hsync()` cannot be guaranteed to persist the data currently -buffered locally. -For anyone thinking "this is a violation of this specification" —they are correct. -The local filesystem is intended for testing, rather than production use. +### Initial State -### Object Stores +The output stream returned by a `FileSystem.create()` call must be empty -Object store implementations historically cache the entire object's data -locally on disk (possibly in memory), until the final `close()` operation -triggers a single `PUT` of the data. Some implementations push out intermediate -blocks of data, synchronously or asynchronously. + Stream = (FS, path, true, []) -Accordingly, they tend not to implement the `Syncable` interface. -However, Exception: Azure's `PageBlobOutputStream` does implement `hsync()`, -blocking until write operations being executed in separate threads have completed. +It is a requirement that after the call complets, +the the filesystem `FS'` contains a 0-byte file at the path, that is + + data(FS', path) == [] -Equally importantly +Accordingly, the the initial state of `Stream'.buffer` is consistent +with the data at the filesystem. + +The output stream returned from a call of `FileSystem.append(path, buffersize, progress)`, +contains the initial contents of the path +of the cache is the current contents of the file + + + Stream = (FS, path, true, data(FS, path)) -1. The object may not be visible at the end of the path until the final `close()`. -is called; this holds for `getFileStatus()`, `open()` and all FileSystem list operations. -1. Any existing data at the end of a path `p`, may remain visible until the final -`close()` operation overwrites this data. -1. The check for existing data in a `create()` call with overwrite=false, may -take place in the `create()` call itself, in the `close()` call prior to/during -the write, or at some point in between. Expect in the special case that the -object store supports an atomic PUT operation, the check for existence of -existing data and the subsequent creation of data at the path contains a race -condition: other clients may create data at the path between the existence check -and the subsequent qrite. + +#### Persisting data to the Filesystem + +When the stream writes data back to the Filesystem, be it in any +supported flush operation, in the `close()` operation, or at any other +time the stream chooses to do so, the contents of the file +are replaced with the current buffer + + + Stream'=(FS', path, true, buffer) + FS' = FS where data(FS', path) == buffer + + +After a call to `close()`, the stream is closed for all operations other +than `close()`; they MAY fail with `IOException` or `RuntimeException` instances. + + Stream = (FS, path, false, []) + + +The `close()` operation must become a no-op. That is: followon attempts to +persist data after a failure MUST NOT be made. (alternatively: users of the +API MUST NOT expect failing `close()` attemps to succeed if retried) + + + +```java +public abstract class OutputStream implements Closeable, Flushable { + public abstract void write(int b) throws IOException; + public void write(byte b[]) throws IOException; + public void write(byte b[], int off, int len) throws IOException; + public void flush() throws IOException ; + public void close() throws IOException; +} +``` @@ -229,72 +237,51 @@ and the subsequent qrite. #### Preconditions - Stream.isOpen else raise IOException + Stream.open else raise IOException #### Postconditions -The cache has the lower 8 bits of the data argument appended to it. +The buffer has the lower 8 bits of the data argument appended to it. - Stream'.cache = Stream.cache + [data & 0xff] + Stream'.buffer = Stream.buffer + [data & 0xff] There may be an explicit limit on the size of cached data, or an implicit limit based by the available capacity of the destination filesystem. -When a limit is reached, `write()` MUST fail with an `IOException`. +When a limit is reached, `write()` SHOULD fail with an `IOException`. -### `write(byte[] buffer, int offset, int len)` +### `write(byte[] data, int offset, int len)` #### Preconditions The preconditions are all defined in `OutputStream.write()` - Stream.isOpen else raise IOException - buffer != null else raise NullPointerException + Stream.open else raise IOException + data != null else raise NullPointerException offset >= 0 else raise IndexOutOfBoundsException len >= 0 else raise IndexOutOfBoundsException - offset < buffer.length else raise IndexOutOfBoundsException - offset + len < buffer.length else raise IndexOutOfBoundsException + offset < data.length else raise IndexOutOfBoundsException + offset + len < data.length else raise IndexOutOfBoundsException There may be an explicit limit on the size of cached data, or an implicit limit based by the available capacity of the destination filesystem. -When a limit is reached, `write()` MUST fail with an `IOException`. +When a limit is reached, `write()` SHOULD fail with an `IOException`. After the operation has returned, the buffer may be re-used. The outcome of updates to the buffer while the `write()` operation is in progress is undefined. #### Postconditions - Stream'.cache = Stream.cache + buffer[offset...offset+len] - - -### `write(byte[] buffer)` - -This is required to be the equivalent of - - write(buffer, 0, buffer.length) - - -#### Preconditions - - Stream.isOpen else raise IOException - - -With the offset of 0 and the length known to be that of the buffer, the -other preconditions of `write(byte[] buffer, int offset, int len)` -can be simplified to + Stream'.buffer' = Stream.buffer + data[offset...offset + len] - buffer != null else raise NullPointerException -#### Postconditions - -The postconditions become +### `write(byte[] data)` - Stream'.cache = Stream.cache + buffer[0...buffer.length] +This is defined as the equivalent of -Which is equivalent to + write(data, 0, data.length) - Stream'.cache = Stream.cache + buffer ### `flush()` @@ -304,9 +291,12 @@ declares that this SHOULD write data to the "intended destination". It explicitly precludes any guarantees about durability. +For that reason, this document doesn't provide any normative +specifications of behaviour. + #### Preconditions - Stream.isOpen else raise IOException + Stream.open else raise IOException #### Postconditions @@ -328,8 +318,8 @@ updated). Any locking/leaseholding mechanism is also required to release its lock/lease. + Stream'.open' = false FS' = FS where data(path) == cache - Stream'.isOpen = false The `close()` call MAY fail during its operation. @@ -347,10 +337,41 @@ upload the entire data file in the `close()` operation. This can take a large am of time. The fact that many user applications assume that `close()` is both fast and does not fail means that this behavior is dangerous. +Recommendations for safe use + +* Do plan for exceptions being raised, either in catch+ log or throwing further up. Silent +catch + swallow will hide problems when they arise. +* Heartbeat operations SHOULD take place on a separate thread, so that a long +upload in `close()` does not block the thread so long that the heartbeat times +out. ## `org.apache.hadoop.fs.Syncable` +```java +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface Syncable { + /** + * @deprecated As of HADOOP 0.21.0, replaced by hflush + * @see #hflush() + */ + @Deprecated void sync() throws IOException; + + /** Flush out the data in client's user buffer. After the return of + * this call, new readers will see the data. + * @throws IOException if any error occurs + */ + void hflush() throws IOException; + + /** Similar to posix fsync, flush out the data in client's user buffer + * all the way to the disk device (but the disk may have it in its cache). + * @throws IOException if error occurs + */ + void hsync() throws IOException; +} +``` + The purpose of `Syncable` interface is to provide guarantees that data is written to a filesystem for both visibility and durability. @@ -361,9 +382,10 @@ The interface MUST NOT be declared as implemented by an `OutputStream` unless those guarantees can be met. The `Syncable` interface has been implemented by other classes than -subclasses of `OutputStream`. Therefore the fact that -a class implements `Syncable` does not guarantee that `extends OutputStream` -holds. +subclasses of `OutputStream`, such as `org.apache.hadoop.io.SequenceFile.Writer`. + +*The fact that a class implements `Syncable` does not guarantee that `extends OutputStream` +holds.* This specification only covers the required behavior of `ObjectStream` subclasses which implement `Syncable`. @@ -371,25 +393,25 @@ which implement `Syncable`. ### `Syncable.hflush()` Flush out the data in client's user buffer. After the return of -this call, new readers will see the data. +this call, new readers will see the data. The `hflush()` operation +does not many any guarantees as to durability. Thus implementations +may cache the written data in memory —visible to all, but not yet +persisted. #### Preconditions - Stream.isOpen else raise IOException + Stream.open else raise IOException #### Postconditions FS' = FS where data(path) == cache -It's not clear whether this operation is expected to be blocking, that is, -whether, after the call returns, is it guaranteed that the data is -now visible to all - +After the call returns, the data MUST be already visible to all. ### `Syncable.hsync()` -Similar to POSIX fsync, save the data in client's user buffer +Similar to POSIX `fsync`, save the data in client's user buffer all the way to the disk device (but the disk may have it in its cache). That is, it is a requirement for the underlying FS To save all the data to @@ -397,12 +419,12 @@ the disk hardware itself, where it is expected to be durable. #### Preconditions - Stream.isOpen else raise IOException + Stream.open else raise IOException #### Postconditions - FS' = FS where data(path) == cache + FS' = FS where data(path) == buffer The reference implementation, `DFSOutputStream` will block until an acknowledgement is received from the datanodes: That is, all hosts @@ -416,6 +438,8 @@ Note, however, that the reference `DFSOutputStream.hsync()` call only actually s *the current block*. If there have been a series of writes since the last sync, such that a block boundary has been crossed. The `hsync()` call claims only to write the most recent. + + From the javadocs of `DFSOutputStream.hsync(EnumSet syncFlags)` > Note that only the current block is flushed to the disk device. @@ -433,3 +457,166 @@ environments. Deprecated: replaced by `hsync()` +### Flushing and sync-ing closed streams + + + +## Durability, Concurrency, Consistency, Visibility, etc. + +These are the aspects of the system behaviour which are not directly +covered in the (very simplistic) Filesystem model, but which are visible +in production. + + +## Durability + +1. `OutputStream.write()` MAY persist the data, synchronously or asynchronously +1. `OutputStream.flush()` flushes data to the destination. There +are no strict persistence requirements. +1. `Syncable.hflush()` synchronously sends all local data to the destination +filesystem. After returning to the caller, the data MUST be visible to other readers, +it MAY be durable. That is: it does not have to be persisted, merely guaranteed +to be consistently visible to all clients attempting to open a new stream reading +data at the path. +1. `Syncable.hsync()` MUST flush the data and persist data to the underlying durable +storage. +1. `close()` The first call to `close()` MUST flush out all remaining data in +the buffers, and persist it. + + +## Concurrency + +1. The outcome of more than one process writing to the same file is undefined. + +1. An input stream opened to read a file *before the file was opened for writing* +MAY fetch data updated by writes to an OutputStream. +Because of buffering and caching, this is not a requirement +—and if a input stream does pick up updated data, the point at +which the updated data is read is undefined. This surfaces in object stores +where a `seek()` call which closes and re-opens the connection may pick up +updated data, while forward stream reads do not. Similarly, in block-oriented +filesystems, the data may be cached a block at a time —and changes only picked +up when a different block is read. + +1. A Filesystem MAY allow the destination path to be manipulated while a stream +is writing to it —for example, `rename()` of the path or a parent; `delete()` of +a path or parent. In such a case, the outcome of future write operations on +the output stream is undefined. Some filesystems MAY implement locking to +prevent conflict. However, this tends to be rare on distributed filesystems, +for reasons well known in the literature. + +1. The Java API specification of `java.io.OutputStream` does not require +an instance of the class to be thread safe. +However, `org.apache.hadoop.hdfs.DFSOutputStream` +has a stronger thread safety model (possibly unintentionally). This fact is +relied upon in Apache HBase, as discovered in HADOOP-11708. Implementations +SHOULD be thread safe. *Note*: even the `DFSOutputStream` synchronization +model permits the output stream to `close()`'d while awaiting an acknowledgement +from datanode or namenode writes in an `hsync()` operation. + + +## Consistency and Visibility + +There is no requirement for the data to be immediately visible to other applications +—not until a specific call to flush buffers or persist it to the underlying storage +medium are made. + +1. If an output stream is created with `FileSystem.create(path)`, with `overwrite==true` +and there is an existing file at the path, that is `exists(FS, path)` holds, +then, the existing data is immediately unavailable; the data at the end of the +path MUST consist of an empty byte sequence `[]`, with consistent metadata. + +1. The metadata of a file (`length(FS, path)` in particular) SHOULD be consistent +with the contents of the file after `flush()` and `sync()`. +HDFS does not do this except when the write crosses a block boundary; to do +otherwise would overload the Namenode. As a result, while a file is being written +`length(Filesystem, Path)` MAY be less than the length of `data(Filesystem, Path)`. + +1. The metadata MUST be consistent with the contents of a file after the `close()` +operation. + +1. If the filesystem supports modification timestamps, this +timestamp MAY be updated while a file is being written, especially after a +`Syncable.hsync()` call. The timestamps MUST be updated after the file is closed. + +1. After the contents of an output stream have been persisted (`hflush()/hsync()`) +all new `open(FS, Path)` operations MUST return the updated data. + +1. After `close()` has been invoked on an output stream returned from a call +to `FileSystem.create(Path,...)`, a call to `getFileStatus(path)` MUST return the +final metadata of the written file, including length and modification time. +The metadata of the file returned in any of the FileSystem `list` operations +MUST be consistent with this metadata. + +### HDFS + +That HDFS file metadata often lags the content of a file being written +to is not something everyone expects, nor convenient for any program trying +pick up updated data in a file being written. Most visible is the length +of a file returned in the various `list` commands and `getFileStatus` —this +is often out of data. + +As HDFS only supports file growth in its output operations, this means +that the size of the file as listed in the metadata may be less than or equal +to the number of available bytes —but never larger. This is a guarantee which +is also held + +The standard technique to determine whether a file is updated is: + +1. Remember the last read position `pos` in the file, using `0` if this is the initial +read. +1. Use `getFileStatus(FS, Path)` to query the updated length of the file as +recorded in the metadata. +1. If `Status.length > pos`, the file has grown. +1. If the number has not changed, then + 1. reopen the file. + 1. `seek(pos)` to that location + 1. If `read() != -1`, there is new data. + +This algorithm works for filesystems which are consistent with metadata and +data, as well as HDFS. What is important to know is that even if the +`getFileStatus(path).getLen()==0` holds, +in HDFS there may be data, but the metadata is not up to date" + +### Local Filesystem, `file:` + +`LocalFileSystem`, `file:`, (or any other `FileSystem` implementation based on +`ChecksumFileSystem`) has a different issue. If an output stream +is obtained from `create()` and `FileSystem.setWriteChecksum(false)` has +*not* been called on the filesystem, then the FS only flushes as much +local data as can be written to full checksummed blocks of data. + +That is, the flush operations are not guaranteed to write all the pending +data until the file is finally closed. + +That is, `sync()` and `hsync()` cannot be guaranteed to persist the data currently +buffered locally. + +For anyone thinking "this is a violation of this specification" —they are correct. +The local filesystem is intended for testing, rather than production use. + +### Object Stores + +Object store implementations historically cache the entire object's data +locally on disk (possibly in memory), until the final `close()` operation +triggers a single `PUT` of the data. Some implementations push out intermediate +blocks of data, synchronously or asynchronously. + +Accordingly, they tend not to implement the `Syncable` interface. +However, Exception: Azure's `PageBlobOutputStream` does implement `hsync()`, +blocking until write operations being executed in separate threads have completed. + +Equally importantly + +1. The object may not be visible at the end of the path until the final `close()`. +is called; this holds for `getFileStatus()`, `open()` and all FileSystem list operations. +1. Any existing data at the end of a path `p`, may remain visible until the final +`close()` operation overwrites this data. +1. The check for existing data in a `create()` call with `overwrite==false`, may +take place in the `create()` call itself, in the `close()` call prior to/during +the write, or at some point in between. Expect in the special case that the +object store supports an atomic PUT operation, the check for existence of +existing data and the subsequent creation of data at the path contains a race +condition: other clients may create data at the path between the existence check +and the subsequent qrite. +