Skip to content

Commit

Permalink
HDFS-17496. DataNode supports more fine-grained dataset lock based on…
Browse files Browse the repository at this point in the history
… blockid. (apache#6764). Contributed by farmmamba.

Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
hfutatzhanghb authored Jan 2, 2025
1 parent 305e3e7 commit 94d6a77
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean
DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false;

public static final String DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY =
"dfs.datanode.dataset.sublock.count";
public static final long DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT = 1000L;

// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public interface DataNodeLockManager<T extends AutoCloseDataSetLock> {
*/
enum LockLevel {
BLOCK_POOl,
VOLUME
VOLUME,
DIR
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ private String generateLockName(LockLevel level, String... resources) {
+ resources[0] + "volume lock :" + resources[1]);
}
return resources[0] + resources[1];
} else if (resources.length == 3 && level == LockLevel.DIR) {
if (resources[0] == null || resources[1] == null || resources[2] == null) {
throw new IllegalArgumentException("acquire a null dataset lock : "
+ resources[0] + ",volume lock :" + resources[1]
+ ",subdir lock :" + resources[2]);
}
return resources[0] + resources[1] + resources[2];
} else {
throw new IllegalArgumentException("lock level do not match resource");
}
Expand Down Expand Up @@ -153,7 +160,7 @@ public DataSetLockManager() {
public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
if (level == LockLevel.BLOCK_POOl) {
return getReadLock(level, resources[0]);
} else {
} else if (level == LockLevel.VOLUME){
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getReadLock(level, resources);
volLock.setParentLock(bpLock);
Expand All @@ -162,14 +169,25 @@ public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
resources[0]);
}
return volLock;
} else {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]);
volLock.setParentLock(bpLock);
AutoCloseDataSetLock dirLock = getReadLock(level, resources);
dirLock.setParentLock(volLock);
if (openLockTrace) {
LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " +
resources[0] + resources[1]);
}
return dirLock;
}
}

@Override
public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
if (level == LockLevel.BLOCK_POOl) {
return getWriteLock(level, resources[0]);
} else {
} else if (level == LockLevel.VOLUME) {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getWriteLock(level, resources);
volLock.setParentLock(bpLock);
Expand All @@ -178,6 +196,17 @@ public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
resources[0]);
}
return volLock;
} else {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]);
volLock.setParentLock(bpLock);
AutoCloseDataSetLock dirLock = getWriteLock(level, resources);
dirLock.setParentLock(volLock);
if (openLockTrace) {
LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " +
resources[0] + resources[1]);
}
return dirLock;
}
}

Expand Down Expand Up @@ -224,8 +253,13 @@ public void addLock(LockLevel level, String... resources) {
String lockName = generateLockName(level, resources);
if (level == LockLevel.BLOCK_POOl) {
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
} else if (level == LockLevel.VOLUME) {
lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
} else {
lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
lockMap.addLock(generateLockName(LockLevel.VOLUME, resources[0], resources[1]),
new ReentrantReadWriteLock(isFair));
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.datanode;

import java.util.List;

/**
* This interface is used to generate sub lock name for a blockid.
*/
public interface DataSetSubLockStrategy {

/**
* Generate sub lock name for the given blockid.
* @param blockid the block id.
* @return sub lock name for the input blockid.
*/
String blockIdToSubLock(long blockid);

List<String> getAllSubLockName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.datanode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class ModDataSetSubLockStrategy implements DataSetSubLockStrategy {
public static final Logger LOG = LoggerFactory.getLogger(DataSetSubLockStrategy.class);

private static final String LOCK_NAME_PERFIX = "SubLock";
private long modFactor;

public ModDataSetSubLockStrategy(long mod) {
if (mod <= 0) {
mod = 1L;
}
this.modFactor = mod;
}

@Override
public String blockIdToSubLock(long blockid) {
return LOCK_NAME_PERFIX + (blockid % modFactor);
}

@Override
public List<String> getAllSubLockName() {
List<String> res = new ArrayList<>();
for (long i = 0L; i < modFactor; i++) {
res.add(LOCK_NAME_PERFIX + i);
}
return res;
}
}
Loading

0 comments on commit 94d6a77

Please sign in to comment.