diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java index efd3a98efe0..e18c3ff2c07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java @@ -101,8 +101,9 @@ public void setReadyToFlush() { * and resets it. Does not swap any buffers. */ public void flushTo(OutputStream out) throws IOException { - // bufReady.writeTo(out); // write data to file - bufReady.syncDB(); // write data to database + bufReady.writeTo(out); // write data to file + // We want to separate logging and metadata flush + // bufReady.syncDB(); // write data to database bufReady.reset(); // erase all data in the buffer } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 2877f29dc2d..f286b894129 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -398,7 +398,7 @@ public enum DirOp { nameCache = new NameCache(threshold); namesystem = ns; this.editLog = ns.getEditLog(); - this.editLog.logMkDir("/", rootDir); + // this.editLog.logMkDir("/", rootDir); ezManager = new EncryptionZoneManager(this, conf); this.quotaInitThreads = conf.getInt( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index a045e27be6d..984961930c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -873,8 +873,22 @@ public void logUpdateBlocks(String path, INodeFile file, boolean toLogRpcIds) { * Add create directory record to edit log */ public void logMkDir(String path, INode newNode) { + PermissionStatus permissions = newNode.getPermissionStatus(); MkdirOp op = MkdirOp.getInstance(cache.get()) - .setInodeId(newNode.getId()); + .setInodeId(newNode.getId()) + .setPath(path) + .setTimestamp(newNode.getModificationTime()) + .setPermissionStatus(permissions); + + AclFeature f = newNode.getAclFeature(); + if (f != null) { + op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode)); + } + + XAttrFeature x = newNode.getXAttrFeature(); + if (x != null) { + op.setXAttrs(x.getXAttrs()); + } logEdit(op); } @@ -987,6 +1001,7 @@ void logDelete(String src, long inodeId, long timestamp, boolean toLogRpcIds) { .setInodeId(inodeId) .setPath(src) .setTimestamp(timestamp); + logRpcIds(op, toLogRpcIds); logEdit(op); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 8d19207bd68..4e28e811b77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -431,7 +431,6 @@ static abstract class AddCloseOp boolean overwrite; byte storagePolicyId; byte erasureCodingPolicyId; - FSEditLogOpCodes code; private AddCloseOp(FSEditLogOpCodes opCode) { super(opCode); @@ -555,30 +554,28 @@ T setErasureCodingPolicyId(byte ecPolicyId) { @Override public void writeFields(DataOutputStream out) throws IOException { - if (getOpCode() == OP_ADD) { - FSImageSerialization.writeLong(inodeId, out); + FSImageSerialization.writeLong(inodeId, out); + FSImageSerialization.writeString(path, out); + FSImageSerialization.writeShort(replication, out); + FSImageSerialization.writeLong(mtime, out); + FSImageSerialization.writeLong(atime, out); + FSImageSerialization.writeLong(blockSize, out); + new ArrayWritable(Block.class, blocks).write(out); + permissions.write(out); + + if (this.opCode == OP_ADD) { + AclEditLogUtil.write(aclEntries, out); + XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder(); + b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs)); + b.build().writeDelimitedTo(out); + FSImageSerialization.writeString(clientName,out); + FSImageSerialization.writeString(clientMachine,out); + FSImageSerialization.writeBoolean(overwrite, out); + FSImageSerialization.writeByte(storagePolicyId, out); + FSImageSerialization.writeByte(erasureCodingPolicyId, out); + // write clientId and callId + writeRpcIds(rpcClientId, rpcCallId, out); } - // FSImageSerialization.writeString(path, out); - // FSImageSerialization.writeShort(replication, out); - // FSImageSerialization.writeLong(mtime, out); - // FSImageSerialization.writeLong(atime, out); - // FSImageSerialization.writeLong(blockSize, out); - // new ArrayWritable(Block.class, blocks).write(out); - // permissions.write(out); - - // if (this.opCode == OP_ADD) { - // AclEditLogUtil.write(aclEntries, out); - // XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder(); - // b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs)); - // b.build().writeDelimitedTo(out); - // FSImageSerialization.writeString(clientName,out); - // FSImageSerialization.writeString(clientMachine,out); - // FSImageSerialization.writeBoolean(overwrite, out); - // FSImageSerialization.writeByte(storagePolicyId, out); - // FSImageSerialization.writeByte(erasureCodingPolicyId, out); - // write clientId and callId - // writeRpcIds(rpcClientId, rpcCallId, out); - // } } @Override @@ -1548,31 +1545,38 @@ long getInodeId() { @Override public void writeFields(DataOutputStream out) throws IOException { - // FSImageSerialization.writeString(path, out); - // FSImageSerialization.writeLong(timestamp, out); - // writeRpcIds(rpcClientId, rpcCallId, out); FSImageSerialization.writeLong(inodeId, out); + FSImageSerialization.writeString(path, out); + FSImageSerialization.writeLong(timestamp, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override void readFields(DataInputStream in, int logVersion) throws IOException { - // if (!NameNodeLayoutVersion.supports( - // LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) { - // this.length = in.readInt(); - // if (this.length != 2) { - // throw new IOException("Incorrect data format. " + "delete operation."); - // } - // } - // this.path = FSImageSerialization.readString(in); - // if (NameNodeLayoutVersion.supports( - // LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) { - // this.timestamp = FSImageSerialization.readLong(in); - // } else { - // this.timestamp = readLong(in); - // } - // // read RPC ids if necessary - // readRpcIds(in, logVersion); + if (!NameNodeLayoutVersion.supports( + LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) { + this.length = in.readInt(); + if (this.length != 2) { + throw new IOException("Incorrect data format. " + "delete operation."); + } + } + if (NameNodeLayoutVersion.supports( + LayoutVersion.Feature.ADD_INODE_ID, logVersion)) { + this.inodeId = FSImageSerialization.readLong(in); + } else { + // This id should be updated when this editLogOp is applied + this.inodeId = HdfsConstants.GRANDFATHER_INODE_ID; + } + this.path = FSImageSerialization.readString(in); + if (NameNodeLayoutVersion.supports( + LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) { + this.timestamp = FSImageSerialization.readLong(in); + } else { + this.timestamp = readLong(in); + } + // read RPC ids if necessary + readRpcIds(in, logVersion); } @Override @@ -1683,14 +1687,14 @@ MkdirOp setXAttrs(List xAttrs) { public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeLong(inodeId, out); - // FSImageSerialization.writeString(path, out); - // FSImageSerialization.writeLong(timestamp, out); // mtime - // FSImageSerialization.writeLong(timestamp, out); // atime, unused at this - // permissions.write(out); - // AclEditLogUtil.write(aclEntries, out); - // XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder(); - // b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs)); - // b.build().writeDelimitedTo(out); + FSImageSerialization.writeString(path, out); + FSImageSerialization.writeLong(timestamp, out); // mtime + FSImageSerialization.writeLong(timestamp, out); // atime, unused at this + permissions.write(out); + AclEditLogUtil.write(aclEntries, out); + XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder(); + b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs)); + b.build().writeDelimitedTo(out); } @Override @@ -4892,23 +4896,23 @@ public Writer(DataOutputBuffer out) { * @throws IOException if an error occurs during writing. */ public void writeOp(FSEditLogOp op) throws IOException { - // int start = buf.getLength(); + int start = buf.getLength(); // write the op code first to make padding and terminator verification // work buf.writeByte(op.opCode.getOpCode()); - // buf.writeInt(0); // write 0 for the length first + buf.writeInt(0); // write 0 for the length first buf.writeLong(op.txid); op.writeFields(buf); - // int end = buf.getLength(); + int end = buf.getLength(); // write the length back: content of the op + 4 bytes checksum - op_code - // int length = end - start - 1; - // buf.writeInt(length, start + 1); + int length = end - start - 1; + buf.writeInt(length, start + 1); - // checksum.reset(); - // checksum.update(buf.getData(), start, end-start); - // int sum = (int)checksum.getValue(); - // buf.writeInt(sum); + checksum.reset(); + checksum.update(buf.getData(), start, end-start); + int sum = (int)checksum.getValue(); + buf.writeInt(sum); } } @@ -5075,8 +5079,10 @@ private static class LengthPrefixedReader extends Reader { * * The minimum Op has: * 1-byte opcode + * 4-byte length * 8-byte txid - * 8-byte inodeid + * 0-byte body + * 4-byte checksum */ private static final int MIN_OP_LENGTH = 17; @@ -5087,6 +5093,11 @@ private static class LengthPrefixedReader extends Reader { */ private static final int OP_ID_LENGTH = 1; + /** + * The checksum length. + * + * Not included in the stored length. + */ private static final int CHECKSUM_LENGTH = 4; private final Checksum checksum; @@ -5099,42 +5110,22 @@ private static class LengthPrefixedReader extends Reader { @Override public FSEditLogOp decodeOp() throws IOException { - in.mark(maxOpSize); - - byte opCodeByte; - try { - opCodeByte = in.readByte(); - } catch (EOFException eof) { - // EOF at an opcode boundary is expected. - return null; - } - - if (opCodeByte == FSEditLogOpCodes.OP_INVALID.getOpCode()) { - // verifyTerminator(); + long txid = decodeOpFrame(); + if (txid == HdfsServerConstants.INVALID_TXID) { return null; } - - FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte); + in.reset(); + in.mark(maxOpSize); + FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(in.readByte()); FSEditLogOp op = cache.get(opCode); if (op == null) { throw new IOException("Read invalid opcode " + opCode); } - - long txid = in.readLong(); - if (txid == HdfsServerConstants.INVALID_TXID) { - return null; - } op.setTransactionId(txid); - if (op.getOpCode() == OP_ADD) { - AddOp addop = (AddOp)op; - addop.setInodeId(in.readLong()); - } else if (op.getOpCode() == OP_DELETE) { - DeleteOp deleteop = (DeleteOp)op; - deleteop.setInodeId(in.readLong()); - } else if (op.getOpCode() == OP_MKDIR) { - MkdirOp mkdirop = (MkdirOp)op; - mkdirop.setInodeId(in.readLong()); - } + IOUtils.skipFully(in, 4 + 8); // skip length and txid + op.readFields(in, logVersion); + // skip over the checksum, which we validated above. + IOUtils.skipFully(in, CHECKSUM_LENGTH); return op; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 98757a1b31c..944de35b868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2538,12 +2538,7 @@ private HdfsFileStatus startFileInt(String src, // There might be transactions logged while trying to recover the lease. // They need to be sync'ed even when an exception was thrown. if (!skipSync) { - String syncLog = System.getenv("SYNC_FILESCALE_LOG"); - if (Boolean.parseBoolean(syncLog) == false) { - INodeKeyedObjects.asyncUpdateDB(); - } else { - getEditLog().logSync(); - } + getEditLog().logSync(); if (toRemoveBlocks != null) { removeBlocks(toRemoveBlocks); toRemoveBlocks.clear(); @@ -3082,12 +3077,7 @@ boolean delete(String src, boolean recursive, boolean logRetryCache) writeUnlock(operationName); } - String syncLog = System.getenv("SYNC_FILESCALE_LOG"); - if (Boolean.parseBoolean(syncLog) == false) { - INodeKeyedObjects.asyncUpdateDB(); - } else { - getEditLog().logSync(); - } + getEditLog().logSync(); if (toRemovedBlocks != null) { removeBlocks(toRemovedBlocks); // Incremental deletion of blocks @@ -3236,12 +3226,8 @@ boolean mkdirs(String src, PermissionStatus permissions, } finally { writeUnlock(operationName); } - String syncLog = System.getenv("SYNC_FILESCALE_LOG"); - if (Boolean.parseBoolean(syncLog) == false) { - INodeKeyedObjects.asyncUpdateDB(); - } else { - getEditLog().logSync(); - } + + getEditLog().logSync(); logAuditEvent(true, operationName, src, null, auditStat); return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeKeyedObjects.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeKeyedObjects.java index 1b6d62b6a4a..a59771cc01b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeKeyedObjects.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeKeyedObjects.java @@ -171,11 +171,8 @@ public static IndexedCache getCache() { concurrentHashSet = ConcurrentHashMap.newKeySet(); concurrentRemoveSet = ConcurrentHashMap.newKeySet(); - String syncLog = System.getenv("SYNC_FILESCALE_LOG"); - - if (Boolean.parseBoolean(syncLog) == false) { - BackupSetToDB(); - } + // async write updates to buffer + BackupSetToDB(); // Assuming each INode has 600 bytes, then // 10000000 * 600 / 2^30 = 5.58 GB. diff --git a/voltdb/BatchRemoveINodes.java b/voltdb/BatchRemoveINodes.java index 88a0b09ad94..8525c7b5b6f 100644 --- a/voltdb/BatchRemoveINodes.java +++ b/voltdb/BatchRemoveINodes.java @@ -59,7 +59,7 @@ public long run(final long[] ids) throws VoltAbortException { long cid = set.get(i); i++; voltQueueSQL(sql1, cid); - VoltTable[] results = voltExecuteSQL(); + results = voltExecuteSQL(); if (results[0].getRowCount() < 1) { continue; }