Skip to content

Commit

Permalink
HDFS-16534. Split FsDatasetImpl from block pool locks to volume grain…
Browse files Browse the repository at this point in the history
… locks. (apache#4141) Contributed by limingxiang.

Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
Hexiaoqiao authored and HarshitGupta11 committed Nov 28, 2022
1 parent 1927182 commit f2d8f09
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,9 @@ private synchronized void activateVolume(
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
for (String bp : volumeMap.getBlockPoolList()) {
lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID());
}
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
Expand Down Expand Up @@ -629,6 +632,9 @@ public void removeVolumes(
synchronized (this) {
for (String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
for (String bp : volumeMap.getBlockPoolList()) {
lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid);
}
}
}
}
Expand Down Expand Up @@ -906,8 +912,8 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid)
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
Expand Down Expand Up @@ -1372,8 +1378,8 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
@Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
Expand Down Expand Up @@ -1425,7 +1431,8 @@ public ReplicaHandler append(ExtendedBlock b,
private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
bpid, replicaInfo.getStorageUuid())) {
// If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; "
Expand Down Expand Up @@ -1554,8 +1561,8 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
LOG.info("Recover failed close " + b);
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
Expand All @@ -1578,7 +1585,7 @@ public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
Expand Down Expand Up @@ -1626,20 +1633,20 @@ public ReplicaHandler createRbw(
}

ReplicaInPipeline newReplicaInfo;
try {
try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), v.getStorageID())) {
newReplicaInfo = v.createRbw(b);
if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
throw new IOException("CreateRBW returned a replica of state "
+ newReplicaInfo.getReplicaInfo().getState()
+ " for block " + b.getBlockId());
}
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
} catch (IOException e) {
IOUtils.cleanupWithLogger(null, ref);
throw e;
}

volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
} finally {
if (dataNodeMetrics != null) {
long createRbwMs = Time.monotonicNow() - startTimeMs;
Expand All @@ -1657,8 +1664,8 @@ public ReplicaHandler recoverRbw(
try {
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
Expand Down Expand Up @@ -1689,8 +1696,8 @@ public ReplicaHandler recoverRbw(
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
Expand Down Expand Up @@ -1751,8 +1758,8 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
Expand Down Expand Up @@ -1887,12 +1894,12 @@ public ReplicaHandler createTemporary(StorageType storageType,
false);
}
long startHoldLockTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), v.getStorageID())) {
try {
newReplicaInfo = v.createTemporary(b);
LOG.debug("creating temporary for block: {} on volume: {}",
Expand Down Expand Up @@ -1949,8 +1956,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
Expand Down Expand Up @@ -1986,7 +1993,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)

private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
bpid, replicaInfo.getStorageUuid())) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
Expand Down Expand Up @@ -2032,8 +2040,8 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
@Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
Expand Down Expand Up @@ -2423,10 +2431,17 @@ private void cacheBlock(String bpid, long blockId) {
long length, genstamp;
Executor volumeExecutor;

try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": ReplicaInfo not found.");
return;
}
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
info.getStorageUuid())) {
boolean success = false;
try {
info = volumeMap.get(bpid, blockId);
if (info == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": ReplicaInfo not found.");
Expand Down Expand Up @@ -2619,7 +2634,8 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
curDirScannerNotifyCount = 0;
lastDirScannerNotifyTime = startTimeMs;
}
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
vol.getStorageID())) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
Expand Down Expand Up @@ -2860,7 +2876,14 @@ ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
while (true) {
try {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo replica = map.get(bpid, block.getBlockId());
if (replica == null) {
return null;
}
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid,
replica.getStorageUuid())) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
Expand All @@ -2875,7 +2898,14 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
lockManager) throws IOException {
while (true) {
try {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo replica = map.get(bpid, block.getBlockId());
if (replica == null) {
return null;
}
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid,
replica.getStorageUuid())) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
Expand All @@ -2888,9 +2918,6 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
Block block, long recoveryId)
throws IOException, MustStopExistingWriter {
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);

//check replica
if (replica == null) {
return null;
Expand Down Expand Up @@ -2964,8 +2991,8 @@ public Replica updateReplicaUnderRecovery(
final long newBlockId,
final long newlength) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
oldBlock.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
oldBlock.getBlockPoolId(), getReplicaInfo(oldBlock).getStorageUuid())) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
Expand Down Expand Up @@ -3109,6 +3136,10 @@ public void addBlockPool(String bpid, Configuration conf)
volumeExceptions.mergeException(e);
}
volumeMap.initBlockPool(bpid);
Set<String> vols = storageMap.keySet();
for (String v : vols) {
lockManager.addLock(LockLevel.VOLUME, bpid, v);
}
}
try {
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
Expand Down Expand Up @@ -157,6 +158,13 @@ public void setupMocks() throws Exception {
Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager();
}

@After
public void checkDataSetLockManager() {
dataSetLockManager.lockLeakCheck();
// make sure no lock Leak.
assertNull(dataSetLockManager.getLastException());
}

/**
* Set up a mock NN with the bare minimum for a DN to register to it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;

public class ExternalVolumeImpl implements FsVolumeSpi {
private final String defaultStroageId = "test";
@Override
public FsVolumeReference obtainReference() throws ClosedChannelException {
return null;
Expand All @@ -54,7 +55,7 @@ public long getAvailable() throws IOException {

@Override
public String getStorageID() {
return null;
return defaultStroageId;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
Expand Down Expand Up @@ -236,6 +237,13 @@ public void setUp() throws IOException {
assertEquals(0, dataset.getNumFailedVolumes());
}

@After
public void checkDataSetLockManager() {
manager.lockLeakCheck();
// make sure no lock Leak.
assertNull(manager.getLastException());
}

@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
Expand Down Expand Up @@ -687,6 +695,7 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
when(mockVolume.getStorageID()).thenReturn("test");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(StorageDirectory.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.extdataset.ExternalVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
Expand Down Expand Up @@ -218,7 +219,7 @@ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
}

private static ReplicaInfo createReplicaInfo(Block b) {
return new FinalizedReplica(b, null, null);
return new FinalizedReplica(b, new ExternalVolumeImpl(), null);
}

private static void assertEquals(ReplicaInfo originalInfo, ReplicaRecoveryInfo recoveryInfo) {
Expand Down Expand Up @@ -318,6 +319,10 @@ public void testInitReplicaRecovery() throws IOException {
"replica.getGenerationStamp() < block.getGenerationStamp(), block=");
}
}

manager.lockLeakCheck();
// make sure no lock Leak.
assertNull(manager.getLastException());
}

/**
Expand Down

0 comments on commit f2d8f09

Please sign in to comment.