Skip to content

Commit

Permalink
Fallback to next urls if download fails in HttpDownloader
Browse files Browse the repository at this point in the history
Fixes #8974.

`HttpDownloader` never retried `IOException` that could have occurred during `ByteStreams.copy(payload, out)`, HttpConnector would have retried connection phase errors but not payload ones.

This chageset adds fallback to next url(s) if present for both payload read errors and connection errors and adds (completely) missing tests for `HttpDownloader`.

Note, that this PR technically disables questionable `HttpConnectorMultiplexer` optimization that attempts to connect to multiple urls at the same time (by starting threads that race with each other) and picking the one that connected faster. There is a way to keep that optimization while falling back to next urls, but it would require all exceptions to contain url that caused it so that `HttpDownloader` could retry download on other urls. I think making `HttpDownloader` more reliable and actually using provided `urls` as fallback is much more important than mentioned optimization.

Closes #9015.

PiperOrigin-RevId: 261702678
  • Loading branch information
artem-zinnatullin authored and katre committed Aug 6, 2019
1 parent 80094d4 commit 427fa4e
Show file tree
Hide file tree
Showing 5 changed files with 446 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache;
import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache.KeyType;
Expand All @@ -35,8 +36,11 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -199,21 +203,60 @@ public Path download(
HttpConnectorMultiplexer multiplexer =
new HttpConnectorMultiplexer(eventHandler, connector, httpStreamFactory, clock, sleeper);

// Connect to the best mirror and download the file, while reporting progress to the CLI.
semaphore.acquire();
// Iterate over urls and download the file falling back to the next url if previous failed,
// while reporting progress to the CLI.
boolean success = false;
try (HttpStream payload = multiplexer.connect(urls, checksum, authHeaders);
OutputStream out = destination.getOutputStream()) {
ByteStreams.copy(payload, out);
success = true;
} catch (InterruptedIOException e) {
throw new InterruptedException();
} catch (IOException e) {
throw new IOException(
"Error downloading " + urls + " to " + destination + ": " + e.getMessage());
} finally {
semaphore.release();
eventHandler.post(new FetchEvent(urls.get(0).toString(), success));

List<IOException> ioExceptions = ImmutableList.of();

for (URL url : urls) {
semaphore.acquire();

try (HttpStream payload =
multiplexer.connect(Collections.singletonList(url), checksum, authHeaders);
OutputStream out = destination.getOutputStream()) {
try {
ByteStreams.copy(payload, out);
} catch (SocketTimeoutException e) {
// SocketTimeoutExceptions are InterruptedIOExceptions; however they do not signify
// an external interruption, but simply a failed download due to some server timing
// out. So rethrow them as ordinary IOExceptions.
throw new IOException(e);
}
success = true;
break;
} catch (InterruptedIOException e) {
throw new InterruptedException(e.getMessage());
} catch (IOException e) {
if (ioExceptions.isEmpty()) {
ioExceptions = new ArrayList<>(1);
}
ioExceptions.add(e);
eventHandler.handle(
Event.warn("Download from " + url + " failed: " + e.getClass() + " " + e.getMessage()));
continue;
} finally {
semaphore.release();
eventHandler.post(new FetchEvent(url.toString(), success));
}
}

if (!success) {
final IOException exception =
new IOException(
"Error downloading "
+ urls
+ " to "
+ destination
+ (ioExceptions.isEmpty()
? ""
: ": " + Iterables.getLast(ioExceptions).getMessage()));

for (IOException cause : ioExceptions) {
exception.addSuppressed(cause);
}

throw exception;
}

if (isCachingByProvidedChecksum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib:util",
"//src/main/java/com/google/devtools/build/lib/bazel/repository/cache",
"//src/main/java/com/google/devtools/build/lib/bazel/repository/downloader",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/test/java/com/google/devtools/build/lib:foundations_testutil",
"//src/test/java/com/google/devtools/build/lib:test_runner",
"//src/test/java/com/google/devtools/build/lib:testutil",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
HttpConnectorMultiplexerIntegrationTest.class,
HttpConnectorMultiplexerTest.class,
HttpConnectorTest.class,
HttpDownloaderTest.class,
HttpStreamTest.class,
HttpUtilsTest.class,
ProgressInputStreamTest.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class HttpConnectorMultiplexerIntegrationTest {
private final Sleeper sleeper = mock(Sleeper.class);
private final Locale locale = Locale.US;
private final HttpConnector connector =
new HttpConnector(locale, eventHandler, proxyHelper, sleeper);
new HttpConnector(locale, eventHandler, proxyHelper, sleeper, 0.1f);
private final ProgressInputStream.Factory progressInputStreamFactory =
new ProgressInputStream.Factory(locale, clock, eventHandler);
private final HttpStream.Factory httpStreamFactory =
Expand Down Expand Up @@ -102,7 +102,7 @@ public void normalRequest() throws Exception {
try (ServerSocket server1 = new ServerSocket(0, 1, InetAddress.getByName(null));
ServerSocket server2 = new ServerSocket(0, 1, InetAddress.getByName(null))) {
for (final ServerSocket server : asList(server1, server2)) {
@SuppressWarnings("unused")
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(
new Callable<Object>() {
Expand Down Expand Up @@ -241,4 +241,52 @@ public Object call() throws Exception {
HELLO_SHA256);
}
}

@Test
public void firstUrlSocketTimeout_secondOk() throws Exception {
final Phaser phaser = new Phaser(3);
try (ServerSocket server1 = new ServerSocket(0, 1, InetAddress.getByName(null));
ServerSocket server2 = new ServerSocket(0, 1, InetAddress.getByName(null))) {

@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(
() -> {
phaser.arriveAndAwaitAdvance();
try (Socket socket = server1.accept()) {
// Do nothing to cause SocketTimeoutException on client side.
}
return null;
});

@SuppressWarnings("unused")
Future<?> possiblyIgnoredError2 =
executor.submit(
() -> {
phaser.arriveAndAwaitAdvance();
try (Socket socket = server2.accept()) {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 200 OK",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"",
"hello");
}
return null;
});

phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
try (HttpStream stream =
multiplexer.connect(
ImmutableList.of(
new URL(String.format("http://localhost:%d", server1.getLocalPort())),
new URL(String.format("http://localhost:%d", server2.getLocalPort()))),
HELLO_SHA256)) {
assertThat(toByteArray(stream)).isEqualTo("hello".getBytes(US_ASCII));
}
}
}
}
Loading

0 comments on commit 427fa4e

Please sign in to comment.