diff --git a/tika-core/src/main/java/org/apache/tika/io/IOUtils.java b/tika-core/src/main/java/org/apache/tika/io/IOUtils.java index 18e86f1e77..b6c18c19b1 100644 --- a/tika-core/src/main/java/org/apache/tika/io/IOUtils.java +++ b/tika-core/src/main/java/org/apache/tika/io/IOUtils.java @@ -1264,4 +1264,24 @@ public static long skip(final InputStream input, final long toSkip) throws IOExc return toSkip - remain; } + public static long skip(final InputStream input, final long toSkip, byte[] buffer) throws IOException { + if (toSkip < 0) { + throw new IllegalArgumentException("Skip count must be non-negative, actual: " + toSkip); + } + /* + * N.B. no need to synchronize this because: - we don't care if the buffer is created multiple times (the data + * is ignored) - we always use the same size buffer, so if it it is recreated it will still be OK (if the buffer + * size were variable, we would need to synch. to ensure some other thread did not create a smaller one) + */ + long remain = toSkip; + while (remain > 0) { + // See https://issues.apache.org/jira/browse/IO-203 for why we use read() rather than delegating to skip() + final long n = input.read(buffer, 0, (int) Math.min(remain, buffer.length)); + if (n < 0) { // EOF + break; + } + remain -= n; + } + return toSkip - remain; + } } diff --git a/tika-core/src/main/java/org/apache/tika/io/TikaInputStream.java b/tika-core/src/main/java/org/apache/tika/io/TikaInputStream.java index cf5cf84ec4..a0275a86e3 100644 --- a/tika-core/src/main/java/org/apache/tika/io/TikaInputStream.java +++ b/tika-core/src/main/java/org/apache/tika/io/TikaInputStream.java @@ -479,6 +479,7 @@ public static TikaInputStream get(URL url, Metadata metadata) private int consecutiveEOFs = 0; + private byte[] skipBuffer; /** * Creates a TikaInputStream instance. This private constructor is used * by the static factory methods based on the available information. @@ -705,7 +706,13 @@ public long getPosition() { */ @Override public long skip(long ln) throws IOException { - long n = IOUtils.skip(super.in, ln); + //On TIKA-3092, we found that using the static byte array buffer + //caused problems with multithreading with the FlateInputStream + //from a POIFS document stream + if (skipBuffer == null) { + skipBuffer = new byte[4096]; + } + long n = IOUtils.skip(super.in, ln, skipBuffer); if (n != ln) { throw new IOException("tried to skip "+ln + " but actually skipped: "+n); } diff --git a/tika-core/src/test/java/org/apache/tika/MultiThreadedTikaTest.java b/tika-core/src/test/java/org/apache/tika/MultiThreadedTikaTest.java index 9450f24ac0..67f1ffdcaa 100644 --- a/tika-core/src/test/java/org/apache/tika/MultiThreadedTikaTest.java +++ b/tika-core/src/test/java/org/apache/tika/MultiThreadedTikaTest.java @@ -25,7 +25,6 @@ import org.apache.tika.mime.MediaType; import org.apache.tika.parser.ParseContext; import org.apache.tika.parser.Parser; -import org.apache.tika.parser.RecursiveParserWrapper; import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler; import org.apache.tika.sax.BasicContentHandlerFactory; import org.apache.tika.sax.RecursiveParserWrapperHandler; diff --git a/tika-parsers/src/main/java/org/apache/tika/parser/hwp/HwpStreamReader.java b/tika-parsers/src/main/java/org/apache/tika/parser/hwp/HwpStreamReader.java index 978b36159f..2ba548c759 100644 --- a/tika-parsers/src/main/java/org/apache/tika/parser/hwp/HwpStreamReader.java +++ b/tika-parsers/src/main/java/org/apache/tika/parser/hwp/HwpStreamReader.java @@ -24,6 +24,7 @@ import org.apache.poi.util.LittleEndian; public class HwpStreamReader { + private byte[] skipBuffer = new byte[4096]; private InputStream input; private byte[] buf; @@ -32,15 +33,6 @@ public HwpStreamReader(InputStream inputStream) { buf = new byte[4]; } - /** - * More data to read ? - * - * @return - * @throws IOException - */ - public boolean available() throws IOException { - return input.available() > 0; - } /** * unsigned 1 byte @@ -129,6 +121,12 @@ public long uint32() throws IOException { * @throws IOException */ public void ensureSkip(long n) throws IOException { - IOUtils.skipFully(input, n); + //Leaving this for anyone who can figure out why this doesn't + //work. See HwpV5ParserTest#testMultiThreadedSkipFully + //long skipped = org.apache.tika.io.IOUtils.skip(input, n); + long skipped = org.apache.tika.io.IOUtils.skip(input, n, skipBuffer); + if (skipped != n) { + throw new EOFException(); + } } } \ No newline at end of file diff --git a/tika-parsers/src/main/java/org/apache/tika/parser/hwp/HwpTextExtractorV5.java b/tika-parsers/src/main/java/org/apache/tika/parser/hwp/HwpTextExtractorV5.java index 8f95831f7a..48b8c02934 100644 --- a/tika-parsers/src/main/java/org/apache/tika/parser/hwp/HwpTextExtractorV5.java +++ b/tika-parsers/src/main/java/org/apache/tika/parser/hwp/HwpTextExtractorV5.java @@ -37,6 +37,7 @@ import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; +import org.apache.commons.compress.compressors.deflate.DeflateParameters; import org.apache.poi.hpsf.NoPropertySetStreamException; import org.apache.poi.hpsf.Property; import org.apache.poi.hpsf.PropertySet; @@ -261,7 +262,6 @@ private void parseBodyText(FileHeader header, DirectoryNode root, if (entry.getName().startsWith("Section") && entry instanceof DocumentEntry) { LOG.debug("extract {}", entry.getName()); - InputStream input = new DocumentInputStream( (DocumentEntry) entry); @@ -384,7 +384,7 @@ public InputStream createDecryptStream(InputStream input, Key key) */ private void parse(HwpStreamReader reader, XHTMLContentHandler xhtml) throws IOException, SAXException { - StringBuffer buf = new StringBuffer(1024); + StringBuilder buf = new StringBuilder(); TagInfo tag = new TagInfo(); while (true) { @@ -421,7 +421,7 @@ private void parse(HwpStreamReader reader, XHTMLContentHandler xhtml) * @throws IOException */ private void writeParaText(HwpStreamReader reader, long datasize, - StringBuffer buf) throws IOException { + StringBuilder buf) throws IOException { int[] chars = reader.uint16((int) (datasize / 2)); for (int index = 0; index < chars.length; index++) { diff --git a/tika-parsers/src/test/java/org/apache/tika/parser/hwp/HwpV5ParserTest.java b/tika-parsers/src/test/java/org/apache/tika/parser/hwp/HwpV5ParserTest.java index 76dc470998..523f17c11a 100644 --- a/tika-parsers/src/test/java/org/apache/tika/parser/hwp/HwpV5ParserTest.java +++ b/tika-parsers/src/test/java/org/apache/tika/parser/hwp/HwpV5ParserTest.java @@ -18,13 +18,19 @@ import static org.junit.Assert.assertEquals; +import org.apache.commons.io.filefilter.RegexFileFilter; +import org.apache.tika.MultiThreadedTikaTest; import org.apache.tika.TikaTest; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.parser.ParseContext; import org.apache.tika.parser.Parser; +import org.apache.tika.parser.RecursiveParserWrapper; import org.junit.Test; -public class HwpV5ParserTest extends TikaTest { +import java.nio.file.Paths; + +public class HwpV5ParserTest extends MultiThreadedTikaTest { @Test public void testHwpV5Parser() throws Exception { @@ -65,4 +71,15 @@ public void testExisting() throws Exception { assertEquals("next1009", metadata.get(TikaCoreProperties.CREATOR)); assertEquals("\uD14C\uC2A4\uD2B8", metadata.get(TikaCoreProperties.TITLE)); } + + @Test + public void testMultiThreadedSkipFully() throws Exception { + //TIKA-3092 + int numThreads = 2; + int numIterations = 50; + ParseContext[] parseContexts = new ParseContext[numThreads]; + + testMultiThreaded(new RecursiveParserWrapper(AUTO_DETECT_PARSER), parseContexts, numThreads, numIterations, + new RegexFileFilter(".*\\.hwp")); + } }