Skip to content

Commit

Permalink
remote rename failure: NullPointerException (#269) (#270)
Browse files Browse the repository at this point in the history
* remote rename failure: NullPointerException (#269)

* add parent into INode in fsimage proto

* fix typo

* fix: INodeSection.INode.getParent

* fix: replace FSEditLogProtocol with FSEditLogProtocolImpl

* fix: only async remove current folder node

* import CompletableFuture

* fix: rename childs' parent id

* fix: compile error

* fix typo
  • Loading branch information
gangliao authored Jun 10, 2020
1 parent e4dd3f9 commit 5a3b1b0
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public static void setParent(final long id, final long parent) {
}
}

public static void setParents(final Long[] ids, final long parent) {
public static void setParents(final long[] ids, final long parent) {
try {
DatabaseConnection obj = Database.getInstance().getConnection();
String env = System.getenv("DATABASE");
Expand Down Expand Up @@ -1388,6 +1388,35 @@ public static void setUcClientMachine(final long id, final String clientMachine)
}
}

public static void removeINodeNoRecursive(final long id) {
try {
DatabaseConnection obj = Database.getInstance().getConnection();
String env = System.getenv("DATABASE");
if (env.equals("VOLT")) {
// call a stored procedure
try {
obj.getVoltClient().callProcedure(new NullCallback(), "RemoveINodeNoRecursive", id);
} catch (Exception e) {
e.printStackTrace();
}
} else {
Connection conn = obj.getConnection();
// delete file/directory
String sql = "DELETE FROM inodes WHERE id = ?;";
PreparedStatement pst = conn.prepareStatement(sql);
pst.setLong(1, id);
pst.executeUpdate();
pst.close();
}
Database.getInstance().retConnection(obj);
} catch (SQLException ex) {
System.err.println(ex.getMessage());
}
if (LOG.isInfoEnabled()) {
LOG.info("removeINodeNoRecursive: " + id);
}
}

public static void removeUc(final long id) {
try {
DatabaseConnection obj = Database.getInstance().getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,8 @@ public void remoteLogOpenFile(INodeFile newNode, String nameNodeAddress) {
INodeSection.INode r = INodeSection.INode.newBuilder()
.setId(newNode.getId())
.setName(ByteString.copyFrom(newNode.getLocalNameBytes()))
.setType(INodeSection.INode.Type.FILE).setFile(b).build();
.setType(INodeSection.INode.Type.FILE).setFile(b)
.setParent(newNode.getParentId()).build();

byte[] data = r.toByteArray();
FSEditLogProtocol proxy = (FSEditLogProtocol) RPC.getProxy(
Expand Down Expand Up @@ -972,7 +973,8 @@ public void remoteLogMkDir(INodeDirectory newNode, String nameNodeAddress) {
INodeSection.INode r = INodeSection.INode.newBuilder()
.setId(newNode.getId())
.setName(ByteString.copyFrom(newNode.getLocalNameBytes()))
.setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
.setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b)
.setParent(newNode.getParentId()).build();

byte[] data = r.toByteArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public INodeFile loadINodeFile(INodeSection.INode n) {
file.setBlock(file.numBlocks() - 1, ucBlk);
}
}

// set parent
file.setParent(n.getParent());
return file;
}

Expand All @@ -153,37 +156,48 @@ public INodeDirectory loadINodeDirectory(INodeSection.INode n) {
if (d.hasXAttrs()) {
dir.addXAttrFeature(new XAttrFeature(dir.getId(), loadXAttrs(d.getXAttrs(), null)));
}

// set parent
dir.setParent(n.getParent());
return dir;
}

@Override
public void logEdit(byte[] in) throws IOException {
INodeSection.INode p = NULL;
INodeSection.INode p = null;
try {
p = INodeSection.INode.parseFrom(in);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
INode n;
INode dir;
switch (p.getType()) {
case FILE:
INodeFile file = loadINodeFile(p);
file.getParent().addChild(file);
String filename = DFSUtil.bytes2String(file.getLocalNameBytes());
INodeKeyedObjects.getCache()
.put(
new CompositeKey(file.getId(), new ImmutablePair<>(file.getParentId(),
DFSUtil.bytes2String(file.getLocalNameBytes()))), file);
new CompositeKey(file.getId(), new ImmutablePair<>(file.getParentId(), filename)), file);
INodeKeyedObjects.getBackupSet().add(file.getId());
FSDirectory.getInstance().getEditLog().logOpenFile(null, file, true, false);
dir = INodeKeyedObjects.getCache().getIfPresent(Long.class, (Long)file.getParentId());
if (dir != null) {
dir.asDirectory().addChild(file);
dir.asDirectory().filter.put(String.valueOf(file.getParentId()) + filename);
}
case DIRECTORY:
INodeDirectory inode = loadINodeDirectory(p);
inode.getParent().addChild(inode);
String dirname = DFSUtil.bytes2String(inode.getLocalNameBytes());
INodeKeyedObjects.getCache()
.put(
new CompositeKey(inode.getId(), new ImmutablePair<>(inode.getParentId(),
DFSUtil.bytes2String(inode.getLocalNameBytes()))), inode);
new CompositeKey(inode.getId(), new ImmutablePair<>(inode.getParentId(), dirname)), inode);
INodeKeyedObjects.getBackupSet().add(inode.getId());
FSDirectory.getInstance().getEditLog().logMkDir(null, inode);
dir = INodeKeyedObjects.getCache().getIfPresent(Long.class, (Long)inode.getParentId());
if (dir != null) {
dir.asDirectory().addChild(inode);
dir.asDirectory().filter.put(String.valueOf(inode.getParentId()) + dirname);
}
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,19 @@ public final INodeDirectory getParent() {
INode dir = INodeKeyedObjects.getCache().getIfPresent(Long.class, id);
if (dir == null) {
dir = new INodeDirectory(id);
DatabaseINode.LoadINode node = new DatabaseINode().loadINode(id);
byte[] name = (node.name != null && node.name.length() > 0) ? DFSUtil.string2Bytes(node.name) : null;
dir
.asDirectory()
.InitINodeDirectory(
node.parent,
node.id,
name,
node.permission,
node.modificationTime,
node.accessTime,
node.header);

INodeKeyedObjects.getCache().put(
new CompositeKey((Long)id,
new ImmutablePair<>(dir.getParentId(), dir.getLocalName())), dir.asDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -683,18 +685,19 @@ public void localRename(INode node) {
}

public void remoteRename(INode node, String address) {
String name = DFSUtil.bytes2String(node.getLocalNameBytes());
// String name = DFSUtil.bytes2String(node.getLocalNameBytes());
if (node.isDirectory()) {
INodeDirectory inode = node.asDirectory().copyINodeDirectory();
inode.setId(node.getId() + NameNode.getId());
long oldParent = node.getId();
long newParent = node.getId() + NameNode.getId();
inode.setId(newParent);

// TODO: using stored procedure to optimize and update the immediated childs
// update immediate childs's parent id
HashSet<Long> childs = ((INodeDirectory)node).getCurrentChildrenList2();
for (long id : childs) {
INode child = FSDirectory.getInstance().getInode(id);
if (child != null) {
child.setParent(inode.getId());
// write ahead log
if (child.isDirectory()) {
INodeKeyedObjects.getCache().invalidateAllWithIndex(Long.class, (Long) child.getId());
Expand All @@ -706,16 +709,18 @@ public void remoteRename(INode node, String address) {
}
}
// using a stored procedure to update childs' parent
Long[] kids = childs.toArray(new Long[childs.size()]);
DatabaseINode.setParents(kids, getId());
DatabaseINode.setParents(childs.toArray(new Long[childs.size()]), newParent);

// invalidate old inode
INodeKeyedObjects.getCache().invalidateAllWithIndex(Long.class, (Long) node.getId());
INodeKeyedObjects.getRemoveSet().add(node.getId());
INodeKeyedObjects.getCache().invalidateAllWithIndex(Long.class, (Long) oldParent);
CompletableFuture.runAsync(() -> {
DatabaseINode.removeINodeNoRecursive(oldParent);
}, Database.getInstance().getExecutorService());

// local sync log
FSDirectory.getInstance()
.getEditLog()
.logDelete(null, node.getId(), inode.getModificationTime(), true);
.logDelete(null, oldParent, inode.getModificationTime(), true);

// remote logging
FSDirectory.getInstance().getEditLog().remoteLogMkDir(inode, address);
Expand All @@ -728,12 +733,13 @@ public void remoteRename(INode node, String address) {
}

// invalidate old inode
INodeKeyedObjects.getCache().invalidateAllWithIndex(Long.class, (Long) node.getId());
INodeKeyedObjects.getRemoveSet().add(node.getId());
long oldId = node.getId();
INodeKeyedObjects.getCache().invalidateAllWithIndex(Long.class, (Long) oldId);
INodeKeyedObjects.getRemoveSet().add(oldId);
// local sync log
FSDirectory.getInstance()
.getEditLog()
.logDelete(null, node.getId(), inode.getModificationTime(), true);
.logDelete(null, oldId, inode.getModificationTime(), true);

FSDirectory.getInstance().getEditLog().remoteLogOpenFile(inode, address);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ public static void asyncUpdateDB() {
LOG.info("Propagate updated files/directories from cache to database.");
}
try {
List<Long> longAttr = new ArrayList<>();
List<String> strAttr = new ArrayList<>();
List<Long> fileIds = new ArrayList<>();
List<String> fileAttr = new ArrayList<>();
List<Long> longAttr = new ArrayList<>();
List<String> strAttr = new ArrayList<>();
List<Long> fileIds = new ArrayList<>();
List<String> fileAttr = new ArrayList<>();
for (Long id : concurrentUpdateSet) {
INode inode = INodeKeyedObjects.getCache().getIfPresent(Long.class, id);

Expand Down
2 changes: 2 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ message INodeSection {
optional INodeFile file = 4;
optional INodeDirectory directory = 5;
optional INodeSymlink symlink = 6;

optional uint64 parent = 7;
}

optional uint64 lastInodeId = 1;
Expand Down
12 changes: 12 additions & 0 deletions voltdb/RemoveINodeNoRecursive.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import java.util.*;
import org.voltdb.*;

public class RemoveINodeNoRecursive extends VoltProcedure {
public final SQLStmt sql = new SQLStmt("DELETE FROM inodes WHERE id = ?;");

public long run(long id) throws VoltAbortException {
voltQueueSQL(sql, id);
voltExecuteSQL();
return 1;
}
}
2 changes: 1 addition & 1 deletion voltdb/SetParents.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public class SetParents extends VoltProcedure {

public final SQLStmt sql = new SQLStmt("UPDATE inodes SET parent = ? WHERE id = ?;");

public long run(final Long ids[], final long parent) throws VoltAbortException {
public long run(final long ids[], final long parent) throws VoltAbortException {
if (int i = 0; i < ids.length; ++i) {
voltQueueSQL(sql, parent, ids[i]);
}
Expand Down

0 comments on commit 5a3b1b0

Please sign in to comment.