Skip to content

Commit

Permalink
Merge branch 'trunk' into YARN-7707
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Jul 12, 2023
2 parents 331daec + 680af87 commit 514d9be
Show file tree
Hide file tree
Showing 25 changed files with 628 additions and 158 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import javax.annotation.Nullable;

/**
* Holds reference to an object to be attached to a stream or store to avoid
* the reference being lost to GC.
*/
public class BackReference {
private final Object reference;

public BackReference(@Nullable Object reference) {
this.reference = reference;
}

/**
* is the reference null?
* @return true if the ref. is null, else false.
*/
public boolean isNull() {
return reference == null;
}

@Override
public String toString() {
return "BackReference{" +
"reference=" + reference +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2321,6 +2321,15 @@ The switch to turn S3A auditing on or off.
</description>
</property>

<property>
<name>ipc.server.handler.queue.size</name>
<value>100</value>
<description>
Indicates how many calls per handler are allowed in the queue. This value can
determine the maximum call queue size by multiplying the number of handler threads.
</description>
</property>

<property>
<name>ipc.server.listen.queue.size</name>
<value>256</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,6 @@ Map<String, BPOfferService> getBpByNameserviceId() {
return bpByNameserviceId;
}

boolean isSlownodeByNameserviceId(String nsId) {
if (bpByNameserviceId.containsKey(nsId)) {
return bpByNameserviceId.get(nsId).isSlownode();
}
return false;
}

boolean isSlownodeByBlockPoolId(String bpId) {
if (bpByBlockPoolId.containsKey(bpId)) {
return bpByBlockPoolId.get(bpId).isSlownode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4240,10 +4240,6 @@ public DataSetLockManager getDataSetLockManager() {
return dataSetLockManager;
}

boolean isSlownodeByNameserviceId(String nsId) {
return blockPoolManager.isSlownodeByNameserviceId(nsId);
}

boolean isSlownodeByBlockPoolId(String bpId) {
return blockPoolManager.isSlownodeByBlockPoolId(bpId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import javax.annotation.Nullable;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -155,6 +156,9 @@ public class AzureBlobFileSystem extends FileSystem
/** Rate limiting for operations which use it to throttle their IO. */
private RateLimiting rateLimiting;

/** Storing full path uri for better logging. */
private URI fullPathUri;

@Override
public void initialize(URI uri, Configuration configuration)
throws IOException {
Expand All @@ -165,7 +169,7 @@ public void initialize(URI uri, Configuration configuration)
setConf(configuration);

LOG.debug("Initializing AzureBlobFileSystem for {}", uri);

this.fullPathUri = uri;
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
abfsCounters = new AbfsCountersImpl(uri);
// name of the blockFactory to be used.
Expand All @@ -192,6 +196,7 @@ public void initialize(URI uri, Configuration configuration)
.withAbfsCounters(abfsCounters)
.withBlockFactory(blockFactory)
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withBackReference(new BackReference(this))
.build();

this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder);
Expand Down Expand Up @@ -236,7 +241,7 @@ public void initialize(URI uri, Configuration configuration)
public String toString() {
final StringBuilder sb = new StringBuilder(
"AzureBlobFileSystem{");
sb.append("uri=").append(uri);
sb.append("uri=").append(fullPathUri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -189,6 +190,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;

/** ABFS instance reference to be held by the store to avoid GC close. */
private BackReference fsBackRef;

/**
* FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
* Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters
Expand All @@ -202,6 +206,7 @@ public AzureBlobFileSystemStore(
String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
this.fsBackRef = abfsStoreBuilder.fsBackRef;

leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());

Expand Down Expand Up @@ -711,6 +716,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.build();
}

Expand Down Expand Up @@ -818,6 +824,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
abfsConfiguration.shouldReadBufferSizeAlways())
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withAbfsBackRef(fsBackRef)
.build();
}

Expand Down Expand Up @@ -1871,6 +1878,7 @@ public static final class AzureBlobFileSystemStoreBuilder {
private AbfsCounters abfsCounters;
private DataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
private BackReference fsBackRef;

public AzureBlobFileSystemStoreBuilder withUri(URI value) {
this.uri = value;
Expand Down Expand Up @@ -1906,6 +1914,12 @@ public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks(
return this;
}

public AzureBlobFileSystemStoreBuilder withBackReference(
BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}

public AzureBlobFileSystemStoreBuilder build() {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -121,6 +122,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
*/
private long nextReadPos;

/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;

public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
Expand Down Expand Up @@ -152,6 +156,7 @@ public AbfsInputStream(
this.tracingContext.setStreamID(inputStreamId);
this.context = abfsInputStreamContext;
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
this.fsBackRef = abfsInputStreamContext.getFsBackRef();

// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
Expand Down Expand Up @@ -857,4 +862,9 @@ long getFCursorAfterLastRead() {
long getLimit() {
return this.limit;
}

@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;

/**
Expand Down Expand Up @@ -51,6 +53,9 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean bufferedPreadDisabled;

/** A BackReference to the FS instance that created this OutputStream. */
private BackReference fsBackRef;

public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
Expand Down Expand Up @@ -122,6 +127,12 @@ public AbfsInputStreamContext withBufferedPreadDisabled(
return this;
}

public AbfsInputStreamContext withAbfsBackRef(
final BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}

public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) {
LOG.debug(
Expand Down Expand Up @@ -180,4 +191,8 @@ public int getReadAheadBlockSize() {
public boolean isBufferedPreadDisabled() {
return bufferedPreadDisabled;
}

public BackReference getFsBackRef() {
return fsBackRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.UUID;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -126,6 +127,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
/** Executor service to carry out the parallel upload requests. */
private final ListeningExecutorService executorService;

/** ABFS instance to be held by the output stream to avoid GC close. */
private final BackReference fsBackRef;

public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
throws IOException {
this.client = abfsOutputStreamContext.getClient();
Expand All @@ -147,6 +151,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
this.numOfAppendsToServerSinceLastFlush = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
this.fsBackRef = abfsOutputStreamContext.getFsBackRef();

if (this.isAppendBlob) {
this.maxConcurrentRequestCount = 1;
Expand Down Expand Up @@ -488,6 +493,12 @@ public synchronized void close() throws IOException {
}

try {
// Check if Executor Service got shutdown before the writes could be
// completed.
if (hasActiveBlockDataToUpload() && executorService.isShutdown()) {
throw new PathIOException(path, "Executor Service closed before "
+ "writes could be completed.");
}
flushInternal(true);
} catch (IOException e) {
// Problems surface in try-with-resources clauses if
Expand Down Expand Up @@ -766,4 +777,14 @@ public String toString() {
sb.append("}");
return sb.toString();
}

@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}

@VisibleForTesting
ListeningExecutorService getExecutorService() {
return executorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.fs.store.DataBlocks;

/**
Expand Down Expand Up @@ -65,6 +66,9 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {

private TracingContext tracingContext;

/** A BackReference to the FS instance that created this OutputStream. */
private BackReference fsBackRef;

public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
Expand Down Expand Up @@ -157,6 +161,12 @@ public AbfsOutputStreamContext withTracingContext(
return this;
}

public AbfsOutputStreamContext withAbfsBackRef(
final BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}

public AbfsOutputStreamContext build() {
// Validation of parameters to be done here.
if (streamStatistics == null) {
Expand Down Expand Up @@ -261,4 +271,8 @@ public ExecutorService getExecutorService() {
public TracingContext getTracingContext() {
return tracingContext;
}

public BackReference getFsBackRef() {
return fsBackRef;
}
}
Loading

0 comments on commit 514d9be

Please sign in to comment.