Skip to content

Commit

Permalink
HDFS-16262. Async refresh of cached locations in DFSInputStream (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
bbeaudreault authored Jan 25, 2022
1 parent 43153e8 commit 94b884a
Show file tree
Hide file tree
Showing 10 changed files with 915 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public class ClientContext {
*/
private final String name;

/**
* The client conf used to initialize context.
*/
private final DfsClientConf dfsClientConf;

/**
* String representation of the configuration.
*/
Expand Down Expand Up @@ -130,6 +135,17 @@ public class ClientContext {
*/
private volatile DeadNodeDetector deadNodeDetector = null;

/**
* The switch for the {@link LocatedBlocksRefresher}.
*/
private final boolean locatedBlocksRefresherEnabled;

/**
* Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
* registered {@link DFSInputStream}s, to take advantage of changes in block placement.
*/
private volatile LocatedBlocksRefresher locatedBlocksRefresher = null;

/**
* Count the reference of ClientContext.
*/
Expand All @@ -146,6 +162,7 @@ private ClientContext(String name, DfsClientConf conf,
final ShortCircuitConf scConf = conf.getShortCircuitConf();

this.name = name;
this.dfsClientConf = conf;
this.confString = scConf.confAsString();
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
Expand All @@ -164,6 +181,7 @@ private ClientContext(String name, DfsClientConf conf,
this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf());
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
this.locatedBlocksRefresherEnabled = conf.isLocatedBlocksRefresherEnabled();
initTopologyResolution(config);
}

Expand Down Expand Up @@ -301,6 +319,21 @@ public DeadNodeDetector getDeadNodeDetector() {
return deadNodeDetector;
}

/**
* If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks
* of registered DFSInputStreams.
*/
public boolean isLocatedBlocksRefresherEnabled() {
return locatedBlocksRefresherEnabled;
}

/**
* Obtain LocatedBlocksRefresher of the current client.
*/
public LocatedBlocksRefresher getLocatedBlocksRefresher() {
return locatedBlocksRefresher;
}

/**
* Increment the counter. Start the dead node detector thread if there is no
* reference.
Expand All @@ -311,6 +344,10 @@ synchronized void reference() {
deadNodeDetector = new DeadNodeDetector(name, configuration);
deadNodeDetector.start();
}
if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null) {
locatedBlocksRefresher = new LocatedBlocksRefresher(name, configuration, dfsClientConf);
locatedBlocksRefresher.start();
}
}

/**
Expand All @@ -324,5 +361,10 @@ synchronized void unreference() {
deadNodeDetector.shutdown();
deadNodeDetector = null;
}

if (counter == 0 && locatedBlocksRefresherEnabled && locatedBlocksRefresher != null) {
locatedBlocksRefresher.shutdown();
locatedBlocksRefresher = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
}

public long getRefreshReadBlkLocationsInterval() {
return dfsClientConf.getRefreshReadBlockLocationsMS();
return dfsClientConf.getLocatedBlocksRefresherInterval();
}

/**
Expand Down Expand Up @@ -3459,4 +3459,36 @@ private boolean isDeadNodeDetectionEnabled() {
public DeadNodeDetector getDeadNodeDetector() {
return clientContext.getDeadNodeDetector();
}

/**
* Obtain LocatedBlocksRefresher of the current client.
*/
public LocatedBlocksRefresher getLocatedBlockRefresher() {
return clientContext.getLocatedBlocksRefresher();
}

/**
* Adds the {@link DFSInputStream} to the {@link LocatedBlocksRefresher}, so that
* the underlying {@link LocatedBlocks} is periodically refreshed.
*/
public void addLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
if (isLocatedBlocksRefresherEnabled()) {
clientContext.getLocatedBlocksRefresher().addInputStream(dfsInputStream);
}
}

/**
* Removes the {@link DFSInputStream} from the {@link LocatedBlocksRefresher}, so that
* the underlying {@link LocatedBlocks} is no longer periodically refreshed.
* @param dfsInputStream
*/
public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
if (isLocatedBlocksRefresherEnabled()) {
clientContext.getLocatedBlocksRefresher().removeInputStream(dfsInputStream);
}
}

private boolean isLocatedBlocksRefresherEnabled() {
return clientContext.isLocatedBlocksRefresherEnabled();
}
}
Loading

0 comments on commit 94b884a

Please sign in to comment.