Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16534. Split FsDatasetImpl from block pool locks to volume grain locks. #4141

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,9 @@ private synchronized void activateVolume(
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
for (String bp : volumeMap.getBlockPoolList()) {
lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID());
}
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
Expand Down Expand Up @@ -629,6 +632,9 @@ public void removeVolumes(
synchronized (this) {
for (String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
for (String bp : volumeMap.getBlockPoolList()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to synchronized this segment? IMO, lock remove is thread safe here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may add lot of locks. All method related add/remove locks is get synchronized first.

lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid);
}
}
}
}
Expand Down Expand Up @@ -906,8 +912,8 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid)
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
Expand Down Expand Up @@ -1372,8 +1378,8 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
@Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
Expand Down Expand Up @@ -1425,7 +1431,8 @@ public ReplicaHandler append(ExtendedBlock b,
private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
bpid, replicaInfo.getStorageUuid())) {
// If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; "
Expand Down Expand Up @@ -1554,8 +1561,8 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
LOG.info("Recover failed close " + b);
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
Expand All @@ -1578,7 +1585,7 @@ public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
Expand Down Expand Up @@ -1626,20 +1633,20 @@ public ReplicaHandler createRbw(
}

ReplicaInPipeline newReplicaInfo;
try {
try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), v.getStorageID())) {
newReplicaInfo = v.createRbw(b);
if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
throw new IOException("CreateRBW returned a replica of state "
+ newReplicaInfo.getReplicaInfo().getState()
+ " for block " + b.getBlockId());
}
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
} catch (IOException e) {
IOUtils.cleanupWithLogger(null, ref);
throw e;
}

volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
} finally {
if (dataNodeMetrics != null) {
long createRbwMs = Time.monotonicNow() - startTimeMs;
Expand All @@ -1657,8 +1664,8 @@ public ReplicaHandler recoverRbw(
try {
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
Expand Down Expand Up @@ -1689,8 +1696,8 @@ public ReplicaHandler recoverRbw(
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
Expand Down Expand Up @@ -1751,8 +1758,8 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
Expand Down Expand Up @@ -1887,12 +1894,12 @@ public ReplicaHandler createTemporary(StorageType storageType,
false);
}
long startHoldLockTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is thread safe without any ReadWriteLock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volumes. getNextVolume() is thread safe, and no need protected by dataset lock.

.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), v.getStorageID())) {
try {
newReplicaInfo = v.createTemporary(b);
LOG.debug("creating temporary for block: {} on volume: {}",
Expand Down Expand Up @@ -1949,8 +1956,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
Expand Down Expand Up @@ -1986,7 +1993,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)

private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
bpid, replicaInfo.getStorageUuid())) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
Expand Down Expand Up @@ -2032,8 +2040,8 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
@Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
Expand Down Expand Up @@ -2423,10 +2431,17 @@ private void cacheBlock(String bpid, long blockId) {
long length, genstamp;
Executor volumeExecutor;

try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": ReplicaInfo not found.");
return;
}
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
info.getStorageUuid())) {
boolean success = false;
try {
info = volumeMap.get(bpid, blockId);
if (info == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": ReplicaInfo not found.");
Expand Down Expand Up @@ -2619,7 +2634,8 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
curDirScannerNotifyCount = 0;
lastDirScannerNotifyTime = startTimeMs;
}
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
vol.getStorageID())) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
Expand Down Expand Up @@ -2860,7 +2876,14 @@ ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
while (true) {
try {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo replica = map.get(bpid, block.getBlockId());
if (replica == null) {
MingXiangLi marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid,
replica.getStorageUuid())) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
Expand All @@ -2875,7 +2898,14 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
lockManager) throws IOException {
while (true) {
try {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo replica = map.get(bpid, block.getBlockId());
if (replica == null) {
MingXiangLi marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid,
replica.getStorageUuid())) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
Expand All @@ -2888,9 +2918,6 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
Block block, long recoveryId)
throws IOException, MustStopExistingWriter {
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
MingXiangLi marked this conversation as resolved.
Show resolved Hide resolved
+ ", replica=" + replica);

//check replica
if (replica == null) {
return null;
Expand Down Expand Up @@ -2964,8 +2991,8 @@ public Replica updateReplicaUnderRecovery(
final long newBlockId,
final long newlength) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
oldBlock.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
oldBlock.getBlockPoolId(), getReplicaInfo(oldBlock).getStorageUuid())) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
Expand Down Expand Up @@ -3109,6 +3136,10 @@ public void addBlockPool(String bpid, Configuration conf)
volumeExceptions.mergeException(e);
}
volumeMap.initBlockPool(bpid);
Set<String> vols = storageMap.keySet();
for (String v : vols) {
lockManager.addLock(LockLevel.VOLUME, bpid, v);
}
}
try {
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
Expand Down Expand Up @@ -157,6 +158,13 @@ public void setupMocks() throws Exception {
Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager();
}

@After
public void checkDataSetLockManager() {
dataSetLockManager.lockLeakCheck();
// make sure no lock Leak.
assertNull(dataSetLockManager.getLastException());
}

/**
* Set up a mock NN with the bare minimum for a DN to register to it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;

public class ExternalVolumeImpl implements FsVolumeSpi {
private final String defaultStroageId = "test";
@Override
public FsVolumeReference obtainReference() throws ClosedChannelException {
return null;
Expand All @@ -54,7 +55,7 @@ public long getAvailable() throws IOException {

@Override
public String getStorageID() {
return null;
return defaultStroageId;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
Expand Down Expand Up @@ -236,6 +237,13 @@ public void setUp() throws IOException {
assertEquals(0, dataset.getNumFailedVolumes());
}

@After
public void checkDataSetLockManager() {
manager.lockLeakCheck();
// make sure no lock Leak.
assertNull(manager.getLastException());
}

@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
Expand Down Expand Up @@ -687,6 +695,7 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
when(mockVolume.getStorageID()).thenReturn("test");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(StorageDirectory.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.extdataset.ExternalVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
Expand Down Expand Up @@ -218,7 +219,7 @@ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
}

private static ReplicaInfo createReplicaInfo(Block b) {
return new FinalizedReplica(b, null, null);
return new FinalizedReplica(b, new ExternalVolumeImpl(), null);
}

private static void assertEquals(ReplicaInfo originalInfo, ReplicaRecoveryInfo recoveryInfo) {
Expand Down Expand Up @@ -318,6 +319,10 @@ public void testInitReplicaRecovery() throws IOException {
"replica.getGenerationStamp() < block.getGenerationStamp(), block=");
}
}

manager.lockLeakCheck();
// make sure no lock Leak.
assertNull(manager.getLastException());
}

/**
Expand Down