diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala new file mode 100644 index 0000000000000..eb9c07e9cf61f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +private[streaming] case class FileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala new file mode 100644 index 0000000000000..de53c59c68269 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path} + +private[streaming] object HdfsUtils { + + def getOutputStream(path: String): FSDataOutputStream = { + // HDFS is not thread-safe when getFileSystem is called, so synchronize on that + + val dfsPath = new Path(path) + val conf = new Configuration() + val dfs = + this.synchronized { + dfsPath.getFileSystem(conf) + } + // If the file exists and we have append support, append instead of creating a new file + val stream: FSDataOutputStream = { + if (dfs.isFile(dfsPath)) { + if (conf.getBoolean("hdfs.append.support", false)) { + dfs.append(dfsPath) + } else { + throw new IllegalStateException("File exists and there is no append support!") + } + } else { + dfs.create(dfsPath) + } + } + stream + } + + def getInputStream(path: String): FSDataInputStream = { + val dfsPath = new Path(path) + val dfs = this.synchronized { + dfsPath.getFileSystem(new Configuration()) + } + val instream = dfs.open(dfsPath) + instream + } + + def checkState(state: Boolean, errorMsg: => String) { + if(!state) { + throw new IllegalStateException(errorMsg) + } + } + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala new file mode 100644 index 0000000000000..aee5d192102e8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.Closeable + +private[streaming] class WriteAheadLogRandomReader(path: String) extends Closeable { + + private val instream = HdfsUtils.getInputStream(path) + private var closed = false + + def read(segment: FileSegment): Array[Byte] = synchronized { + assertOpen() + instream.seek(segment.offset) + val nextLength = instream.readInt() + HdfsUtils.checkState(nextLength == segment.length, + "Expected message length to be " + segment.length + ", " + "but was " + nextLength) + val buffer = new Array[Byte](nextLength) + instream.readFully(buffer) + buffer + } + + override def close(): Unit = synchronized { + closed = true + instream.close() + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.") + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala new file mode 100644 index 0000000000000..75791c2470181 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.Closeable + +private[streaming] class WriteAheadLogReader(path: String) + extends Iterator[Array[Byte]] with Closeable { + + private val instream = HdfsUtils.getInputStream(path) + private var closed = false + private var nextItem: Option[Array[Byte]] = None + + override def hasNext: Boolean = synchronized { + assertOpen() + if (nextItem.isDefined) { // handle the case where hasNext is called without calling next + true + } else { + val available = instream.available() + if (available < 4) { // Length of next block (which is an Int = 4 bytes) of data is unavailable! + false + } + val length = instream.readInt() + if (instream.available() < length) { + false + } + val buffer = new Array[Byte](length) + instream.readFully(buffer) + nextItem = Some(buffer) + true + } + } + + override def next(): Array[Byte] = synchronized { + // TODO: Possible error case where there are not enough bytes in the stream + // TODO: How to handle that? + val data = nextItem.getOrElse { + throw new IllegalStateException("next called without calling hasNext or after hasNext " + + "returned false") + } + nextItem = None // Ensure the next hasNext call loads new data. + data + } + + override def close(): Unit = synchronized { + closed = true + instream.close() + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the " + + "file.") + } + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala new file mode 100644 index 0000000000000..f151c17ff66d8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.Closeable + +private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { + private val stream = HdfsUtils.getOutputStream(path) + private var nextOffset = stream.getPos + private var closed = false + + // Data is always written as: + // - Length - Long + // - Data - of length = Length + def write(data: Array[Byte]): FileSegment = synchronized { + assertOpen() + val segment = new FileSegment(path, nextOffset, data.length) + stream.writeInt(data.length) + stream.write(data) + stream.hflush() + nextOffset = stream.getPos + segment + } + + override private[streaming] def close(): Unit = synchronized { + closed = true + stream.close() + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.") + } +}