Skip to content

Commit

Permalink
support reading non-compressed warc archives
Browse files Browse the repository at this point in the history
  • Loading branch information
eiennohito committed Apr 9, 2024
1 parent 79ba62b commit 6e41cbc
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ object Pipeline {
return defMethod.invoke(null)
} catch {
case _: NoSuchMethodException => throw new IllegalArgumentException(
s"could not instantiate $clz, ${par.getName} did not configured or have default value"
s"could not instantiate $clz, ${par.getName} was not configured or did not have a default value"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
package com.worksap.nlp.uzushio.lib.utils

import com.google.common.io.CountingInputStream
import com.worksap.nlp.uzushio.lib.utils.WarcFileReader.MAX_RECORD_SIZE
import com.worksap.nlp.uzushio.lib.warc.WarcRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.log4j.LogManager
import org.archive.io.warc.WARCReaderFactory

import java.io.{FilterInputStream, InputStream}
import scala.collection.JavaConverters._
import java.io.BufferedInputStream

/** Reads [[WarcRecord]]s from a WARC file using Hadoop filesystem APIs. */
class WarcFileReader(conf: Configuration, filePath: Path) {
@transient private lazy val logger = LogManager.getLogger(this.getClass.getSimpleName)

/** Opens a warc file and setup an iterator of records. */
private val fs = filePath.getFileSystem(conf)
private def fs = filePath.getFileSystem(conf)
private val fileSize = fs.getFileStatus(filePath).getLen
private val fsin = new CountingInputStream(fs.open(filePath))
private val fsin = {
val rawStream = fs.open(filePath)
val wrapped = if (rawStream.markSupported()) {
rawStream
} else new BufferedInputStream(rawStream)
//noinspection UnstableApiUsage
new CountingInputStream(wrapped)
}
private val reader = WARCReaderFactory.get(filePath.getName, fsin, true)
private val recordIter = reader.iterator

/** Init counters to report progress. */
private var recordsRead: Long = 0
private var bytesRead: Long = 0

/** Closes the file and reader. */
def close(): Unit = {
Expand Down Expand Up @@ -62,34 +68,13 @@ class WarcFileReader(conf: Configuration, filePath: Path) {
def getRecordsRead: Long = recordsRead

/** Returns the number of bytes that have been read. */
def getBytesRead: Long = bytesRead
def bytesRead: Long = fsin.getCount

/** Returns the proportion of the file that has been read. */
def getProgress: Float = {
if (fileSize <= 0) return 1.0f
bytesRead.toFloat / fileSize.toFloat
}

/** InputStream that records the number of bytes read. */
private class CountingInputStream(in: InputStream) extends FilterInputStream(in) {
override def read(): Int = {
val result = in.read()
if (result != -1) bytesRead += 1
result
}

override def read(b: Array[Byte], off: Int, len: Int): Int = {
val result = in.read(b, off, len)
if (result != -1) bytesRead += result
result
}

override def skip(n: Long): Long = {
val result = in.skip(n)
bytesRead += result
result
}
}
}

object WarcFileReader {
Expand Down

0 comments on commit 6e41cbc

Please sign in to comment.