From 941da8badf64068d11a53ac57a4ba35b2ad13490 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 29 Mar 2022 15:12:09 -0400 Subject: [PATCH] fix: `Content-Encoding: gzip` along with `Transfer-Encoding: chunked` sometimes terminates early (#1608) #### The issue When `GZIPInputStream` completes processing an individual member it will call `InputStream#available()` to determine if there is more stream to try and process. If the call to `available()` returns 0 `GZIPInputStream` will determine it has processed the entirety of the underlying stream. This is spurious, as `InputStream#available()` is allowed to return 0 if it would require blocking in order for more bytes to be available. When `GZIPInputStream` is reading from a `Transfer-Encoding: chunked` response, if the chunk boundary happens to align closely enough to the member boundary `GZIPInputStream` won't consume the whole response. #### The fix Add new `OptimisticAvailabilityInputStream`, which provides an optimistic "estimate" of the number of `available()` bytes in the underlying stream. When instantiating a `GZIPInputStream` for a response, automatically decorate the provided `InputStream` with an `OptimisticAvailabilityInputStream`. #### Verification This scenario isn't unique to processing of chunked responses, and can be replicated reliably using a `java.io.SequenceInputStream` with two underlying `java.io.ByteArrayInputStream`. See GzipSupportTest.java for a reproduction. The need for this class has been verified for the following JVMs: * ``` openjdk version "1.8.0_292" OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_292-b10) OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.292-b10, mixed mode) ``` * ``` openjdk version "11.0.14.1" 2022-02-08 OpenJDK Runtime Environment Temurin-11.0.14.1+1 (build 11.0.14.1+1) OpenJDK 64-Bit Server VM Temurin-11.0.14.1+1 (build 11.0.14.1+1, mixed mode) ``` * ``` openjdk version "17" 2021-09-14 OpenJDK Runtime Environment Temurin-17+35 (build 17+35) OpenJDK 64-Bit Server VM Temurin-17+35 (build 17+35, mixed mode, sharing) ``` --- .../google/api/client/http/GzipSupport.java | 90 ++++++++++++++++ .../google/api/client/http/HttpResponse.java | 3 +- .../api/client/http/GzipSupportTest.java | 101 ++++++++++++++++++ 3 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 google-http-client/src/main/java/com/google/api/client/http/GzipSupport.java create mode 100644 google-http-client/src/test/java/com/google/api/client/http/GzipSupportTest.java diff --git a/google-http-client/src/main/java/com/google/api/client/http/GzipSupport.java b/google-http-client/src/main/java/com/google/api/client/http/GzipSupport.java new file mode 100644 index 000000000..6dc5df304 --- /dev/null +++ b/google-http-client/src/main/java/com/google/api/client/http/GzipSupport.java @@ -0,0 +1,90 @@ +package com.google.api.client.http; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; + +final class GzipSupport { + + private GzipSupport() {} + + static GZIPInputStream newGzipInputStream(InputStream in) throws IOException { + return new GZIPInputStream(new OptimisticAvailabilityInputStream(in)); + } + + /** + * When {@link GZIPInputStream} completes processing an individual member it will call {@link + * InputStream#available()} to determine if there is more stream to try and process. If the call + * to {@code available()} returns 0 {@code GZIPInputStream} will determine it has processed the + * entirety of the underlying stream. This is spurious, as {@link InputStream#available()} is + * allowed to return 0 if it would require blocking in order for more bytes to be available. When + * {@code GZIPInputStream} is reading from a {@code Transfer-Encoding: chunked} response, if the + * chunk boundary happens to align closely enough to the member boundary {@code GZIPInputStream} + * won't consume the whole response. + * + *

This class, provides an optimistic "estimate" (in actuality, a lie) of the number of {@code + * available()} bytes in the underlying stream. It does this by tracking the last number of bytes + * read. If the last number of bytes read is grater than -1, we return {@link Integer#MAX_VALUE} + * to any call of {@link #available()}. + * + *

We're breaking the contract of available() in that we're lying about how much data we have + * accessible without blocking, however in the case where we're weaving {@link GZIPInputStream} + * into response processing we already know there are going to be blocking calls to read before + * the stream is exhausted. + * + *

This scenario isn't unique to processing of chunked responses, and can be replicated + * reliably using a {@link java.io.SequenceInputStream} with two underlying {@link + * java.io.ByteArrayInputStream}. See the corresponding test class for a reproduction. + * + *

The need for this class has been verified for the following JVMs: + * + *

    + *
  1. + *
    +   * openjdk version "1.8.0_292"
    +   * OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_292-b10)
    +   * OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.292-b10, mixed mode)
    +   *   
    + *
  2. + *
    +   * openjdk version "11.0.14.1" 2022-02-08
    +   * OpenJDK Runtime Environment Temurin-11.0.14.1+1 (build 11.0.14.1+1)
    +   * OpenJDK 64-Bit Server VM Temurin-11.0.14.1+1 (build 11.0.14.1+1, mixed mode)
    +   *   
    + *
  3. + *
    +   * openjdk version "17" 2021-09-14
    +   * OpenJDK Runtime Environment Temurin-17+35 (build 17+35)
    +   * OpenJDK 64-Bit Server VM Temurin-17+35 (build 17+35, mixed mode, sharing)
    +   *   
    + *
+ */ + private static final class OptimisticAvailabilityInputStream extends FilterInputStream { + private int lastRead = 0; + + OptimisticAvailabilityInputStream(InputStream delegate) { + super(delegate); + } + + @Override + public int available() throws IOException { + return lastRead > -1 ? Integer.MAX_VALUE : 0; + } + + @Override + public int read() throws IOException { + return lastRead = super.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return lastRead = super.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return lastRead = super.read(b, off, len); + } + } +} diff --git a/google-http-client/src/main/java/com/google/api/client/http/HttpResponse.java b/google-http-client/src/main/java/com/google/api/client/http/HttpResponse.java index e97943210..130208671 100644 --- a/google-http-client/src/main/java/com/google/api/client/http/HttpResponse.java +++ b/google-http-client/src/main/java/com/google/api/client/http/HttpResponse.java @@ -30,7 +30,6 @@ import java.util.Locale; import java.util.logging.Level; import java.util.logging.Logger; -import java.util.zip.GZIPInputStream; /** * HTTP response. @@ -362,7 +361,7 @@ public InputStream getContent() throws IOException { // GZIPInputStream.close() --> ConsumingInputStream.close() --> // exhaust(ConsumingInputStream) lowLevelResponseContent = - new GZIPInputStream(new ConsumingInputStream(lowLevelResponseContent)); + GzipSupport.newGzipInputStream(new ConsumingInputStream(lowLevelResponseContent)); } } // logging (wrap content with LoggingInputStream) diff --git a/google-http-client/src/test/java/com/google/api/client/http/GzipSupportTest.java b/google-http-client/src/test/java/com/google/api/client/http/GzipSupportTest.java new file mode 100644 index 000000000..f7c863222 --- /dev/null +++ b/google-http-client/src/test/java/com/google/api/client/http/GzipSupportTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.api.client.http; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.util.zip.GZIPInputStream; +import org.junit.Test; + +public final class GzipSupportTest { + + @SuppressWarnings("UnstableApiUsage") // CountingInputStream is @Beta + @Test + public void gzipInputStreamConsumesAllBytes() throws IOException { + byte[] data = new byte[] {(byte) 'a', (byte) 'b'}; + // `echo -n a > a.txt && gzip -n9 a.txt` + byte[] member0 = + new byte[] { + 0x1f, + (byte) 0x8b, + 0x08, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x02, + 0x03, + 0x4b, + 0x04, + 0x00, + (byte) 0x43, + (byte) 0xbe, + (byte) 0xb7, + (byte) 0xe8, + 0x01, + 0x00, + 0x00, + 0x00 + }; + // `echo -n b > b.txt && gzip -n9 b.txt` + byte[] member1 = + new byte[] { + 0x1f, + (byte) 0x8b, + 0x08, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x02, + 0x03, + 0x4b, + 0x02, + 0x00, + (byte) 0xf9, + (byte) 0xef, + (byte) 0xbe, + (byte) 0x71, + 0x01, + 0x00, + 0x00, + 0x00 + }; + int totalZippedBytes = member0.length + member1.length; + try (InputStream s = + new SequenceInputStream( + new ByteArrayInputStream(member0), new ByteArrayInputStream(member1)); + CountingInputStream countS = new CountingInputStream(s); + GZIPInputStream g = GzipSupport.newGzipInputStream(countS); + CountingInputStream countG = new CountingInputStream(g)) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteStreams.copy(countG, baos); + assertThat(baos.toByteArray()).isEqualTo(data); + assertThat(countG.getCount()).isEqualTo(data.length); + assertThat(countS.getCount()).isEqualTo(totalZippedBytes); + } + } +}