Skip to content

Commit

Permalink
HDFS-16429.Add DataSetLockManager to maintain block pool level lock a…
Browse files Browse the repository at this point in the history
…nd volume level lock
  • Loading branch information
limingxiang02 committed Jan 24, 2022
1 parent 034dc8d commit e28b21d
Show file tree
Hide file tree
Showing 8 changed files with 637 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;

import java.util.Iterator;
import java.util.function.Consumer;

/**
* A low memory footprint {@link GSet} implementation,
* which uses an array for storing the elements
Expand Down Expand Up @@ -86,17 +89,36 @@ public LightWeightResizableGSet(int initCapacity) {
}

@Override
public E put(final E element) {
public synchronized E put(final E element) {
E existing = super.put(element);
expandIfNecessary();
return existing;
}

@Override
public synchronized E get(K key) {
return super.get(key);
}

@Override
public synchronized E remove(K key) {
return super.remove(key);
}

@Override
public synchronized int size() {
return super.size();
}

public synchronized void getIterator(Consumer<Iterator<E>> consumer) {
consumer.accept(super.values().iterator());
}

/**
* Resize the internal table to given capacity.
*/
@SuppressWarnings("unchecked")
protected void resize(int cap) {
protected synchronized void resize(int cap) {
int newCapacity = actualArrayLength(cap);
if (newCapacity == this.capacity) {
return;
Expand All @@ -121,7 +143,7 @@ protected void resize(int cap) {
/**
* Checks if we need to expand, and expands if necessary.
*/
protected void expandIfNecessary() {
protected synchronized void expandIfNecessary() {
if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) {
resize(capacity * 2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_NAMESERVICES_RESOLVER_IMPL =
"dfs.datanode.nameservices.resolver.impl";

public static final String
DFS_DATANODE_LOCKMANAGER_TRACE =
"dfs.datanode.lockmanager.trace";

public static final boolean
DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false;

// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;

import java.util.concurrent.locks.Lock;

import static org.apache.hadoop.hdfs.server.datanode.DataSetLockManager.LOG;

/**
* Extending AutoCloseableLock such that the users can
* use a try-with-resource syntax.
*/
public class AutoCloseDataSetLock extends AutoCloseableLock {
private Lock lock;
private AutoCloseDataSetLock parentLock;
private DataNodeLockManager<AutoCloseDataSetLock> dataNodeLockManager;

public AutoCloseDataSetLock(Lock lock) {
this.lock = lock;
}

@Override
public void close() {
if (lock != null) {
lock.unlock();
if (dataNodeLockManager != null) {
dataNodeLockManager.hook();
}
} else {
LOG.error("Try to unlock null lock" +
StringUtils.getStackTrace(Thread.currentThread()));
}
if (parentLock != null) {
parentLock.close();
}
}

/**
* Actually acquire the lock.
*/
public void lock() {
if (lock != null) {
lock.lock();
return;
}
LOG.error("Try to lock null lock" +
StringUtils.getStackTrace(Thread.currentThread()));
}

public void setParentLock(AutoCloseDataSetLock parent) {
if (parentLock == null) {
this.parentLock = parent;
}
}

public void setDataNodeLockManager(DataNodeLockManager<AutoCloseDataSetLock>
dataNodeLockManager) {
this.dataNodeLockManager = dataNodeLockManager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

/**
* Use for manage a set of lock for datanode.
*/
public interface DataNodeLockManager<T extends AutoCloseDataSetLock> {

/**
* Acquire block pool level first if you want to Acquire volume lock.
* Or only acquire block pool level lock.
*/
enum LockLevel {
BLOCK_POOl,
VOLUME
}

/**
* Acquire readLock and then lock.
*/
T readLock(LockLevel level, String... resources);

/**
* Acquire writeLock and then lock.
*/
T writeLock(LockLevel level, String... resources);

/**
* Add a lock to LockManager.
*/
void addLock(LockLevel level, String... resources);

/**
* Remove a lock from LockManager.
*/
void removeLock(LockLevel level, String... resources);

/**
* LockManager may need to back hook.
*/
void hook();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

import java.util.concurrent.locks.Lock;

/**
* Some ut or temp replicaMap not need to lock with DataSetLockManager.
*/
public class NoLockManager implements DataNodeLockManager<AutoCloseDataSetLock> {
private final NoDataSetLock lock = new NoDataSetLock(null);

private static final class NoDataSetLock extends AutoCloseDataSetLock {

private NoDataSetLock(Lock lock) {
super(lock);
}

@Override
public void lock() {
}

@Override
public void close() {
}
}

public NoLockManager() {
}

@Override
public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
return lock;
}

@Override
public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
return lock;
}

@Override
public void addLock(LockLevel level, String... resources) {
}

@Override
public void removeLock(LockLevel level, String... resources) {
}

@Override
public void hook() {
}
}
Loading

0 comments on commit e28b21d

Please sign in to comment.