Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close resources to stop Apache lib from pending #3524

Merged
merged 4 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
* This request factory will attempt to load resources using {@link
* org.mapfish.print.config.Configuration#loadFile(String)} and {@link
* org.mapfish.print.config.Configuration#isAccessible(String)} to load the resources if the http
* method is GET and will fallback to the normal/wrapped factory to make http requests.
* method is GET and will fall back to the normal/wrapped factory to make http requests.
*/
public final class ConfigFileResolvingHttpRequestFactory implements MfClientHttpRequestFactory {
private final Configuration config;
@Nonnull private final Map<String, String> mdcContext;
private final MfClientHttpRequestFactoryImpl httpRequestFactory;
private final List<RequestConfigurator> callbacks = new CopyOnWriteArrayList<>();

/** Maximum number of attempts to try to fetch the same http request in case it is failing. */
@Value("${httpRequest.fetchRetry.maxNumber}")
private int httpRequestMaxNumberFetchRetry;

Expand Down Expand Up @@ -54,7 +55,9 @@ public void register(@Nonnull final RequestConfigurator callback) {
}

@Override
public ClientHttpRequest createRequest(final URI uri, final HttpMethod httpMethod) {
@Nonnull
public ClientHttpRequest createRequest(
@Nonnull final URI uri, @Nonnull final HttpMethod httpMethod) {
return new ConfigFileResolvingRequest(this, uri, httpMethod);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,73 +141,83 @@ private ClientHttpResponse doFileRequest() throws IOException {
}

private ClientHttpResponse doHttpRequestWithRetry(final HttpHeaders headers) throws IOException {
AtomicInteger counter = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
do {
logFetchingURIResource(headers);
try {
ClientHttpResponse response = attemptToFetchResponse(headers, counter);
ClientHttpResponse response = attemptToFetchResponse(headers);
if (response != null) {
return response;
} else {
if (canRetry(counter)) {
sleepWithExceptionHandling();
LOGGER.debug("Retry fetching {}", this.getURI());
} else {
PrintException printException = new PrintException("Failed fetching " + getURI());
LOGGER.debug("Throwing exception since it cannot retry.", printException);
throw printException;
}
}
} catch (final IOException | RuntimeException e) {
boolean hasSlept = sleepIfPossible(e, counter);
if (!hasSlept) {
throw e;
}
} catch (final IOException e) {
handleIOException(e, counter);
}
} while (true);
}

private void logFetchingURIResource(final HttpHeaders headers) {
// Display headers, one by line <name>: <value>
LOGGER.debug(
"Fetching URI resource {}, headers:\n{}",
"Fetching {}, using headers:\n{}",
this.getURI(),
headers.entrySet().stream()
.map(entry -> entry.getKey() + "=" + String.join(", ", entry.getValue()))
.collect(Collectors.joining("\n")));
}

private ClientHttpResponse attemptToFetchResponse(
final HttpHeaders headers, final AtomicInteger counter) throws IOException {
private ClientHttpResponse attemptToFetchResponse(final HttpHeaders headers) throws IOException {
ClientHttpRequest requestUsed =
this.request != null ? this.request : createRequestFromWrapped(headers);
LOGGER.debug("Executing http request: {}", requestUsed.getURI());
ClientHttpResponse response = executeCallbacksAndRequest(requestUsed);
if (response.getRawStatusCode() < 500) {
LOGGER.debug(
"Fetching success URI resource {}, status code {}",
getURI(),
response.getRawStatusCode());
return response;
}
LOGGER.debug(
"Fetching failed URI resource {}, error code {}", getURI(), response.getRawStatusCode());
if (canRetry(counter)) {
sleepWithExceptionHandling();
LOGGER.debug("Retry fetching URI resource {}", this.getURI());
} else {
throw new PrintException(
String.format(
"Fetching failed URI resource %s, error code %s",
getURI(), response.getRawStatusCode()));
final int minStatusCodeError = HttpStatus.INTERNAL_SERVER_ERROR.value();
int rawStatusCode = minStatusCodeError;
try {
rawStatusCode = response.getRawStatusCode();
if (rawStatusCode < minStatusCodeError) {
LOGGER.debug("Successfully fetched {}, with status code {}", getURI(), rawStatusCode);
return response;
}
LOGGER.debug("Failed fetching {}, with error code {}", getURI(), rawStatusCode);
return null;
} finally {
if (rawStatusCode >= minStatusCodeError) {
response.close();
}
}
return null;
}

private void handleIOException(final IOException e, final AtomicInteger counter)
throws IOException {

private boolean sleepIfPossible(final Exception e, final AtomicInteger counter) {
if (canRetry(counter)) {
sleepWithExceptionHandling();
LOGGER.debug("Retry fetching URI resource {}", this.getURI());
LOGGER.debug("Retry fetching {} following exception", this.getURI(), e);
sebr72 marked this conversation as resolved.
Show resolved Hide resolved
return true;
} else {
LOGGER.debug("Fetching failed URI resource {}", getURI());
throw e;
LOGGER.debug(
"Reached maximum number of {} allowed requests attempts for {}",
getHttpRequestMaxNumberFetchRetry(),
getURI());
return false;
}
}

private void sleepWithExceptionHandling() {
int sleepMs = configFileResolvingHttpRequestFactory.getHttpRequestFetchRetryIntervalMillis();
LOGGER.debug("Sleeping {} ms before retrying.", sleepMs);
try {
TimeUnit.MILLISECONDS.sleep(
configFileResolvingHttpRequestFactory.getHttpRequestFetchRetryIntervalMillis());
TimeUnit.MILLISECONDS.sleep(sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PrintException("Interrupted while sleeping", e);
Expand Down
22 changes: 13 additions & 9 deletions core/src/main/java/org/mapfish/print/http/HttpRequestFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,22 @@ private CachedClientHttpResponse(final ClientHttpResponse originalResponse) thro
this.headers = originalResponse.getHeaders();
this.status = originalResponse.getRawStatusCode();
this.statusText = originalResponse.getStatusText();
this.cachedFile =
this.cachedFile = createCachedFile(originalResponse.getBody());
}

private File createCachedFile(final InputStream originalBody) throws IOException {
File tempFile =
File.createTempFile("cacheduri", null, HttpRequestFetcher.this.temporaryDirectory);
LOGGER.debug("Caching URI resource to {}", this.cachedFile);
try (OutputStream os = Files.newOutputStream(this.cachedFile.toPath())) {
InputStream body = originalResponse.getBody();
LOGGER.debug("Caching URI resource to {}", tempFile);
try (OutputStream os = Files.newOutputStream(tempFile.toPath())) {
LOGGER.debug(
"Get from input stream {}, for response {}, body available: {}",
body.getClass(),
originalResponse.getClass(),
body.available());
IOUtils.copy(body, os);
"Get from input stream {}, body available: {}",
originalBody.getClass(),
originalBody.available());
IOUtils.copy(originalBody, os);
originalBody.close();
}
return tempFile;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -31,6 +32,7 @@
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.mapfish.print.config.Configuration;
import org.mapfish.print.servlet.job.impl.ThreadPoolJobManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
Expand All @@ -52,30 +54,53 @@ public class MfClientHttpRequestFactoryImpl extends HttpComponentsClientHttpRequ
* @param maxConnTotal Maximum total connections.
* @param maxConnPerRoute Maximum connections per route.
*/
public MfClientHttpRequestFactoryImpl(final int maxConnTotal, final int maxConnPerRoute) {
super(createHttpClient(maxConnTotal, maxConnPerRoute));
public MfClientHttpRequestFactoryImpl(
final int maxConnTotal,
final int maxConnPerRoute,
final ThreadPoolJobManager threadPoolJobManager) {
super(createHttpClient(maxConnTotal, maxConnPerRoute, threadPoolJobManager));
}

@Nullable
static Configuration getCurrentConfiguration() {
return CURRENT_CONFIGURATION.get();
}

private static int getIntProperty(final String name) {
/**
* Return the number of milliseconds until the timeout Use the Automatic cancellation timeout if
* not defined.
*
* @param name timeout idemtifier
* @return the number of milliseconds until the timeout
*/
private static int getTimeoutValue(
final String name, final ThreadPoolJobManager threadPoolJobManager) {
final String value = System.getProperty(name);
if (value == null) {
return -1;
long millis = TimeUnit.SECONDS.toMillis(threadPoolJobManager.getTimeout());
if (millis > Integer.MAX_VALUE) {
LOGGER.warn(
"The value of {} is too large. The timeout will be set to the maximum value of {}",
name,
Integer.MAX_VALUE);
return Integer.MAX_VALUE;
} else {
return Integer.parseInt(Long.toString(millis));
}
}
return Integer.parseInt(value);
}

private static CloseableHttpClient createHttpClient(
final int maxConnTotal, final int maxConnPerRoute) {
final int maxConnTotal,
final int maxConnPerRoute,
final ThreadPoolJobManager threadPoolJobManager) {
final RequestConfig requestConfig =
RequestConfig.custom()
.setConnectionRequestTimeout(getIntProperty("http.connectionRequestTimeout"))
.setConnectTimeout(getIntProperty("http.connectTimeout"))
.setSocketTimeout(getIntProperty("http.socketTimeout"))
.setConnectionRequestTimeout(
getTimeoutValue("http.connectionRequestTimeout", threadPoolJobManager))
.setConnectTimeout(getTimeoutValue("http.connectTimeout", threadPoolJobManager))
.setSocketTimeout(getTimeoutValue("http.socketTimeout", threadPoolJobManager))
.build();

final HttpClientBuilder httpClientBuilder =
Expand All @@ -89,11 +114,19 @@ private static CloseableHttpClient createHttpClient(
.setMaxConnTotal(maxConnTotal)
.setMaxConnPerRoute(maxConnPerRoute)
.setUserAgent(UserAgentCreator.getUserAgent());
return httpClientBuilder.build();
CloseableHttpClient closeableHttpClient = httpClientBuilder.build();
LOGGER.debug(
"Created CloseableHttpClient using connectionRequestTimeout: {} connectTimeout: {}"
+ " socketTimeout: {}",
getTimeoutValue("http.connectionRequestTimeout", threadPoolJobManager),
getTimeoutValue("http.connectTimeout", threadPoolJobManager),
getTimeoutValue("http.socketTimeout", threadPoolJobManager));
return closeableHttpClient;
}

// allow extension only for testing
@Override
@Nonnull
public ConfigurableRequest createRequest(
@Nonnull final URI uri, @Nonnull final HttpMethod httpMethod) {
HttpRequestBase httpRequest = (HttpRequestBase) createHttpUriRequest(httpMethod, uri);
Expand Down Expand Up @@ -161,20 +194,24 @@ public HttpMethod getMethod() {
return HttpMethod.valueOf(this.request.getMethod());
}

@Nonnull
@Override
public String getMethodValue() {
return this.request.getMethod();
}

@Nonnull
public URI getURI() {
return this.request.getURI();
}

@Nonnull
@Override
protected OutputStream getBodyInternal(@Nonnull final HttpHeaders headers) {
return this.outputStream;
}

@Nonnull
@Override
protected Response executeInternal(@Nonnull final HttpHeaders headers) throws IOException {
CURRENT_CONFIGURATION.set(this.configuration);
Expand Down Expand Up @@ -207,7 +244,7 @@ protected Response executeInternal(@Nonnull final HttpHeaders headers) throws IO
}
}

static class Response extends AbstractClientHttpResponse {
public static class Response extends AbstractClientHttpResponse {
private static final Logger LOGGER = LoggerFactory.getLogger(Response.class);
private static final AtomicInteger ID_COUNTER = new AtomicInteger();
private final HttpResponse response;
Expand All @@ -224,6 +261,7 @@ public int getRawStatusCode() {
return this.response.getStatusLine().getStatusCode();
}

@Nonnull
@Override
public String getStatusText() {
return this.response.getStatusLine().getReasonPhrase();
Expand All @@ -247,6 +285,7 @@ public void close() {
LOGGER.trace("Closed Http Response object: {}", this.id);
}

@Nonnull
@Override
public synchronized InputStream getBody() throws IOException {
if (this.inputStream == null) {
Expand All @@ -262,6 +301,7 @@ public synchronized InputStream getBody() throws IOException {
return this.inputStream;
}

@Nonnull
@Override
public HttpHeaders getHeaders() {
final HttpHeaders translatedHeaders = new HttpHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public final void setTimeout(final long timeout) {
this.timeout = timeout;
}

public final long getTimeout() {
return this.timeout;
}

public final void setAbandonedTimeout(final long abandonedTimeout) {
this.abandonedTimeout = abandonedTimeout;
}
Expand Down Expand Up @@ -452,10 +456,13 @@ private boolean updateRegistry() {
final SubmittedPrintJob printJob = submittedJobs.next();
if (!printJob.getReportFuture().isDone()
&& (isTimeoutExceeded(printJob) || isAbandoned(printJob))) {
LOGGER.info("Canceling job after timeout {}", printJob.getEntry().getReferenceId());
LOGGER.info(
"About to attempt timeout based automatic cancellation of job {}",
printJob.getEntry().getReferenceId());
if (!printJob.getReportFuture().cancel(true)) {
LOGGER.info(
"Could not cancel job after timeout {}", printJob.getEntry().getReferenceId());
"Automatic cancellation after timeout failed for job {}",
printJob.getEntry().getReferenceId());
}
// remove all canceled tasks from the work queue (otherwise the queue comparator
// might stumble on non-PrintJob entries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
<bean id="healthCheckRegistry" class="org.mapfish.print.metrics.HealthCheckingRegistry"/>
<bean id="httpClientFactory" class="org.mapfish.print.http.MfClientHttpRequestFactoryImpl">
<constructor-arg index="0" value="${maxConnectionsTotal}" />
<constructor-arg index="1" value="${maxConnectionsPerRoute}" />
<constructor-arg index="1" value="${maxConnectionsPerRoute}"/>
<constructor-arg index="2" ref="jobManager"/>
</bean>
<bean id="metricNameStrategy" class="org.mapfish.print.metrics.MetricsNameStrategyFactory" factory-method="hostAndMethod" />
<bean id="loggingMetricsConfigurator" class="org.mapfish.print.metrics.LoggingMetricsConfigurator" lazy-init="false"/>
Expand Down
Loading
Loading