Skip to content

Commit

Permalink
HDFS-16477. [SPS]: Add metric PendingSPSPaths for getting the number …
Browse files Browse the repository at this point in the history
…of paths to be processed by SPS (#4009). Contributed by  tomscut.

Signed-off-by: Ayush Saxena <[email protected]>
  • Loading branch information
tomscut authored Apr 2, 2022
1 parent 4b1a6bf commit 34b3275
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ Each metrics record contains tags such as HAState and Hostname as additional inf
| `FSN(Read/Write)Lock`*OperationName*`NanosAvgTime` | Average time of holding the lock by operations in nanoseconds |
| `FSN(Read/Write)LockOverallNanosNumOps` | Total number of acquiring lock by all operations |
| `FSN(Read/Write)LockOverallNanosAvgTime` | Average time of holding the lock by all operations in nanoseconds |
| `PendingSPSPaths` | The number of paths to be processed by storage policy satisfier |

JournalNode
-----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,11 @@ public interface FederationMBean {
* with the highest risk of loss.
*/
long getHighestPriorityLowRedundancyECBlocks();

/**
* Returns the number of paths to be processed by storage policy satisfier.
*
* @return The number of paths to be processed by sps.
*/
int getPendingSPSPaths();
}
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,16 @@ public long getCurrentTokensCount() {
return 0;
}

@Override
public int getPendingSPSPaths() {
try {
return getRBFMetrics().getPendingSPSPaths();
} catch (IOException e) {
LOG.debug("Failed to get number of paths to be processed by sps", e);
}
return 0;
}

private Router getRouter() throws IOException {
if (this.router == null) {
throw new IOException("Router is not initialized");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,12 @@ public long getHighestPriorityLowRedundancyECBlocks() {
MembershipStats::getHighestPriorityLowRedundancyECBlocks);
}

@Override
public int getPendingSPSPaths() {
return getNameserviceAggregatedInt(
MembershipStats::getPendingSPSPaths);
}

@Override
@Metric({"RouterFederationRenameCount", "Number of federation rename"})
public int getRouterFederationRenameCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public boolean registerNamenode(NamenodeStatusReport report)
report.getHighestPriorityLowRedundancyReplicatedBlocks());
stats.setHighestPriorityLowRedundancyECBlocks(
report.getHighestPriorityLowRedundancyECBlocks());
stats.setPendingSPSPaths(report.getPendingSPSPaths());
record.setStats(stats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class NamenodeStatusReport {
private long numberOfMissingBlocksWithReplicationFactorOne = -1;
private long highestPriorityLowRedundancyReplicatedBlocks = -1;
private long highestPriorityLowRedundancyECBlocks = -1;
private int pendingSPSPaths = -1;

/** If the fields are valid. */
private boolean registrationValid = false;
Expand Down Expand Up @@ -367,12 +368,13 @@ public int getNumEnteringMaintenanceDataNodes() {
* @param numBlocksPendingReplication Number of blocks pending replication.
* @param numBlocksUnderReplicated Number of blocks under replication.
* @param numBlocksPendingDeletion Number of blocks pending deletion.
* @param providedSpace Space in provided storage.
* @param providedStorageSpace Space in provided storage.
* @param numPendingSPSPaths The number of paths to be processed by storage policy satisfier.
*/
public void setNamesystemInfo(long available, long total,
long numFiles, long numBlocks, long numBlocksMissing,
long numBlocksPendingReplication, long numBlocksUnderReplicated,
long numBlocksPendingDeletion, long providedSpace) {
long numBlocksPendingDeletion, long providedStorageSpace, int numPendingSPSPaths) {
this.totalSpace = total;
this.availableSpace = available;
this.numOfBlocks = numBlocks;
Expand All @@ -382,7 +384,8 @@ public void setNamesystemInfo(long available, long total,
this.numOfBlocksPendingDeletion = numBlocksPendingDeletion;
this.numOfFiles = numFiles;
this.statsValid = true;
this.providedSpace = providedSpace;
this.providedSpace = providedStorageSpace;
this.pendingSPSPaths = numPendingSPSPaths;
}

/**
Expand Down Expand Up @@ -460,6 +463,15 @@ public long getHighestPriorityLowRedundancyECBlocks() {
return this.highestPriorityLowRedundancyECBlocks;
}

/**
* Returns the number of paths to be processed by storage policy satisfier.
*
* @return The number of paths to be processed by sps.
*/
public int getPendingSPSPaths() {
return this.pendingSPSPaths;
}

/**
* Get the number of blocks.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ private void getFsNamesystemMetrics(String address,
jsonObject.getLong("PendingReplicationBlocks"),
jsonObject.getLong("UnderReplicatedBlocks"),
jsonObject.getLong("PendingDeletionBlocks"),
jsonObject.optLong("ProvidedCapacityTotal"));
jsonObject.optLong("ProvidedCapacityTotal"),
jsonObject.getInt("PendingSPSPaths"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public abstract void setHighestPriorityLowRedundancyECBlocks(

public abstract long getHighestPriorityLowRedundancyECBlocks();

public abstract void setPendingSPSPaths(int pendingSPSPaths);

public abstract int getPendingSPSPaths();

@Override
public SortedMap<String, String> getPrimaryKeys() {
// This record is not stored directly, no key needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,14 @@ public long getHighestPriorityLowRedundancyECBlocks() {
return this.translator.getProtoOrBuilder()
.getHighestPriorityLowRedundancyECBlocks();
}

@Override
public void setPendingSPSPaths(int pendingSPSPaths) {
this.translator.getBuilder().setPendingSPSPaths(pendingSPSPaths);
}

@Override
public int getPendingSPSPaths() {
return this.translator.getProtoOrBuilder().getPendingSPSPaths();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ message NamenodeMembershipStatsRecordProto {
optional uint64 numberOfMissingBlocksWithReplicationFactorOne = 31;
optional uint64 highestPriorityLowRedundancyReplicatedBlocks = 32;
optional uint64 HighestPriorityLowRedundancyECBlocks = 33;
optional uint32 pendingSPSPaths = 34;
}

message NamenodeMembershipRecordProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public void testNameserviceStatsDataSource()
json.getLong("numOfEnteringMaintenanceDataNodes"));
assertEquals(stats.getProvidedSpace(),
json.getLong("providedSpace"));
assertEquals(stats.getPendingSPSPaths(),
json.getInt("pendingSPSPaths"));
nameservicesFound++;
}
assertEquals(getNameservices().size(), nameservicesFound);
Expand Down Expand Up @@ -296,6 +298,7 @@ private void validateClusterStatsFederationBean(FederationMBean bean) {
long highestPriorityLowRedundancyReplicatedBlocks = 0;
long highestPriorityLowRedundancyECBlocks = 0;
long numFiles = 0;
int pendingSPSPaths = 0;
for (MembershipState mock : getActiveMemberships()) {
MembershipStats stats = mock.getStats();
numBlocks += stats.getNumOfBlocks();
Expand All @@ -316,6 +319,7 @@ private void validateClusterStatsFederationBean(FederationMBean bean) {
stats.getHighestPriorityLowRedundancyReplicatedBlocks();
highestPriorityLowRedundancyECBlocks +=
stats.getHighestPriorityLowRedundancyECBlocks();
pendingSPSPaths += stats.getPendingSPSPaths();
}

assertEquals(numBlocks, bean.getNumBlocks());
Expand All @@ -342,6 +346,7 @@ private void validateClusterStatsFederationBean(FederationMBean bean) {
bean.getHighestPriorityLowRedundancyReplicatedBlocks());
assertEquals(highestPriorityLowRedundancyECBlocks,
bean.getHighestPriorityLowRedundancyECBlocks());
assertEquals(pendingSPSPaths, bean.getPendingSPSPaths());
}

private void validateClusterStatsRouterBean(RouterMBean bean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ public static MembershipState createMockRegistrationForNamenode(
stats.setNumOfDecomActiveDatanodes(15);
stats.setNumOfDecomDeadDatanodes(5);
stats.setNumOfBlocks(10);
stats.setPendingSPSPaths(10);
entry.setStats(stats);
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ public long getTotalECBlockGroups() {
return blocksMap.getECBlockGroups();
}

/** Used by metrics. */
public int getPendingSPSPaths() {
if (spsManager != null) {
return spsManager.getPendingSPSPaths();
}
return 0;
}

/**
* redundancyRecheckInterval is how often namenode checks for new
* reconstruction work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4875,6 +4875,12 @@ public long getCurrentTokensCount() {
dtSecretManager.getCurrentTokensSize() : -1;
}

@Override
@Metric({"PendingSPSPaths", "The number of paths to be processed by storage policy satisfier"})
public int getPendingSPSPaths() {
return blockManager.getPendingSPSPaths();
}

/**
* Returns the length of the wait Queue for the FSNameSystemLock.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,11 @@ public interface FSNamesystemMBean {
* @return number of DTs
*/
long getCurrentTokensCount();

/**
* Returns the number of paths to be processed by storage policy satisfier.
*
* @return The number of paths to be processed by sps.
*/
int getPendingSPSPaths();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class StoragePolicySatisfyManager {
private final StoragePolicySatisfier spsService;
private final boolean storagePolicyEnabled;
private volatile StoragePolicySatisfierMode mode;
private final Queue<Long> pathsToBeTraveresed;
private final Queue<Long> pathsToBeTraversed;
private final int outstandingPathsLimit;
private final Namesystem namesystem;

Expand All @@ -77,7 +77,7 @@ public StoragePolicySatisfyManager(Configuration conf,
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
mode = StoragePolicySatisfierMode.fromString(modeVal);
pathsToBeTraveresed = new LinkedList<Long>();
pathsToBeTraversed = new LinkedList<Long>();
this.namesystem = namesystem;
// instantiate SPS service by just keeps config reference and not starting
// any supporting threads.
Expand Down Expand Up @@ -218,8 +218,8 @@ public boolean isSatisfierRunning() {
* storages.
*/
public Long getNextPathId() {
synchronized (pathsToBeTraveresed) {
return pathsToBeTraveresed.poll();
synchronized (pathsToBeTraversed) {
return pathsToBeTraversed.poll();
}
}

Expand All @@ -228,7 +228,7 @@ public Long getNextPathId() {
* @throws IOException
*/
public void verifyOutstandingPathQLimit() throws IOException {
long size = pathsToBeTraveresed.size();
long size = pathsToBeTraversed.size();
// Checking that the SPS call Q exceeds the allowed limit.
if (outstandingPathsLimit - size <= 0) {
LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
Expand All @@ -244,15 +244,15 @@ public void verifyOutstandingPathQLimit() throws IOException {
* @throws IOException
*/
private void clearPathIds(){
synchronized (pathsToBeTraveresed) {
Iterator<Long> iterator = pathsToBeTraveresed.iterator();
synchronized (pathsToBeTraversed) {
Iterator<Long> iterator = pathsToBeTraversed.iterator();
while (iterator.hasNext()) {
Long trackId = iterator.next();
try {
namesystem.removeXattr(trackId,
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
} catch (IOException e) {
LOG.debug("Failed to remove sps xatttr!", e);
LOG.debug("Failed to remove sps xattr!", e);
}
iterator.remove();
}
Expand All @@ -263,8 +263,8 @@ private void clearPathIds(){
* Clean up all sps path ids.
*/
public void removeAllPathIds() {
synchronized (pathsToBeTraveresed) {
pathsToBeTraveresed.clear();
synchronized (pathsToBeTraversed) {
pathsToBeTraversed.clear();
}
}

Expand All @@ -273,8 +273,8 @@ public void removeAllPathIds() {
* @param id
*/
public void addPathId(long id) {
synchronized (pathsToBeTraveresed) {
pathsToBeTraveresed.add(id);
synchronized (pathsToBeTraversed) {
pathsToBeTraversed.add(id);
}
}

Expand All @@ -292,4 +292,11 @@ public boolean isEnabled() {
public StoragePolicySatisfierMode getMode() {
return mode;
}

/**
* @return the number of paths to be processed by storage policy satisfier.
*/
public int getPendingSPSPaths() {
return pathsToBeTraversed.size();
}
}
Loading

0 comments on commit 34b3275

Please sign in to comment.