Skip to content

Commit

Permalink
Merge pull request #245 from DSLAM-UMD/synclog
Browse files Browse the repository at this point in the history
feat: separate medata flush and logging
  • Loading branch information
gangliao authored Feb 10, 2020
2 parents d6af300 + aec052c commit 55bae70
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ public void setReadyToFlush() {
* and resets it. Does not swap any buffers.
*/
public void flushTo(OutputStream out) throws IOException {
// bufReady.writeTo(out); // write data to file
bufReady.syncDB(); // write data to database
bufReady.writeTo(out); // write data to file
// We want to separate logging and metadata flush
// bufReady.syncDB(); // write data to database
bufReady.reset(); // erase all data in the buffer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public enum DirOp {
nameCache = new NameCache<ByteArray>(threshold);
namesystem = ns;
this.editLog = ns.getEditLog();
this.editLog.logMkDir("/", rootDir);
// this.editLog.logMkDir("/", rootDir);
ezManager = new EncryptionZoneManager(this, conf);

this.quotaInitThreads = conf.getInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,22 @@ public void logUpdateBlocks(String path, INodeFile file, boolean toLogRpcIds) {
* Add create directory record to edit log
*/
public void logMkDir(String path, INode newNode) {
PermissionStatus permissions = newNode.getPermissionStatus();
MkdirOp op = MkdirOp.getInstance(cache.get())
.setInodeId(newNode.getId());
.setInodeId(newNode.getId())
.setPath(path)
.setTimestamp(newNode.getModificationTime())
.setPermissionStatus(permissions);

AclFeature f = newNode.getAclFeature();
if (f != null) {
op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
}

XAttrFeature x = newNode.getXAttrFeature();
if (x != null) {
op.setXAttrs(x.getXAttrs());
}
logEdit(op);
}

Expand Down Expand Up @@ -987,6 +1001,7 @@ void logDelete(String src, long inodeId, long timestamp, boolean toLogRpcIds) {
.setInodeId(inodeId)
.setPath(src)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ static abstract class AddCloseOp
boolean overwrite;
byte storagePolicyId;
byte erasureCodingPolicyId;
FSEditLogOpCodes code;

private AddCloseOp(FSEditLogOpCodes opCode) {
super(opCode);
Expand Down Expand Up @@ -555,30 +554,28 @@ <T extends AddCloseOp> T setErasureCodingPolicyId(byte ecPolicyId) {

@Override
public void writeFields(DataOutputStream out) throws IOException {
if (getOpCode() == OP_ADD) {
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeShort(replication, out);
FSImageSerialization.writeLong(mtime, out);
FSImageSerialization.writeLong(atime, out);
FSImageSerialization.writeLong(blockSize, out);
new ArrayWritable(Block.class, blocks).write(out);
permissions.write(out);

if (this.opCode == OP_ADD) {
AclEditLogUtil.write(aclEntries, out);
XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
b.build().writeDelimitedTo(out);
FSImageSerialization.writeString(clientName,out);
FSImageSerialization.writeString(clientMachine,out);
FSImageSerialization.writeBoolean(overwrite, out);
FSImageSerialization.writeByte(storagePolicyId, out);
FSImageSerialization.writeByte(erasureCodingPolicyId, out);
// write clientId and callId
writeRpcIds(rpcClientId, rpcCallId, out);
}
// FSImageSerialization.writeString(path, out);
// FSImageSerialization.writeShort(replication, out);
// FSImageSerialization.writeLong(mtime, out);
// FSImageSerialization.writeLong(atime, out);
// FSImageSerialization.writeLong(blockSize, out);
// new ArrayWritable(Block.class, blocks).write(out);
// permissions.write(out);

// if (this.opCode == OP_ADD) {
// AclEditLogUtil.write(aclEntries, out);
// XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
// b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
// b.build().writeDelimitedTo(out);
// FSImageSerialization.writeString(clientName,out);
// FSImageSerialization.writeString(clientMachine,out);
// FSImageSerialization.writeBoolean(overwrite, out);
// FSImageSerialization.writeByte(storagePolicyId, out);
// FSImageSerialization.writeByte(erasureCodingPolicyId, out);
// write clientId and callId
// writeRpcIds(rpcClientId, rpcCallId, out);
// }
}

@Override
Expand Down Expand Up @@ -1548,31 +1545,38 @@ long getInodeId() {
@Override
public
void writeFields(DataOutputStream out) throws IOException {
// FSImageSerialization.writeString(path, out);
// FSImageSerialization.writeLong(timestamp, out);
// writeRpcIds(rpcClientId, rpcCallId, out);
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeLong(timestamp, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}

@Override
void readFields(DataInputStream in, int logVersion)
throws IOException {
// if (!NameNodeLayoutVersion.supports(
// LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
// this.length = in.readInt();
// if (this.length != 2) {
// throw new IOException("Incorrect data format. " + "delete operation.");
// }
// }
// this.path = FSImageSerialization.readString(in);
// if (NameNodeLayoutVersion.supports(
// LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
// this.timestamp = FSImageSerialization.readLong(in);
// } else {
// this.timestamp = readLong(in);
// }
// // read RPC ids if necessary
// readRpcIds(in, logVersion);
if (!NameNodeLayoutVersion.supports(
LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
this.length = in.readInt();
if (this.length != 2) {
throw new IOException("Incorrect data format. " + "delete operation.");
}
}
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
this.inodeId = FSImageSerialization.readLong(in);
} else {
// This id should be updated when this editLogOp is applied
this.inodeId = HdfsConstants.GRANDFATHER_INODE_ID;
}
this.path = FSImageSerialization.readString(in);
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
this.timestamp = FSImageSerialization.readLong(in);
} else {
this.timestamp = readLong(in);
}
// read RPC ids if necessary
readRpcIds(in, logVersion);
}

@Override
Expand Down Expand Up @@ -1683,14 +1687,14 @@ MkdirOp setXAttrs(List<XAttr> xAttrs) {
public
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(inodeId, out);
// FSImageSerialization.writeString(path, out);
// FSImageSerialization.writeLong(timestamp, out); // mtime
// FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
// permissions.write(out);
// AclEditLogUtil.write(aclEntries, out);
// XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
// b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
// b.build().writeDelimitedTo(out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeLong(timestamp, out); // mtime
FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
permissions.write(out);
AclEditLogUtil.write(aclEntries, out);
XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
b.build().writeDelimitedTo(out);
}

@Override
Expand Down Expand Up @@ -4892,23 +4896,23 @@ public Writer(DataOutputBuffer out) {
* @throws IOException if an error occurs during writing.
*/
public void writeOp(FSEditLogOp op) throws IOException {
// int start = buf.getLength();
int start = buf.getLength();
// write the op code first to make padding and terminator verification
// work
buf.writeByte(op.opCode.getOpCode());
// buf.writeInt(0); // write 0 for the length first
buf.writeInt(0); // write 0 for the length first
buf.writeLong(op.txid);
op.writeFields(buf);
// int end = buf.getLength();
int end = buf.getLength();

// write the length back: content of the op + 4 bytes checksum - op_code
// int length = end - start - 1;
// buf.writeInt(length, start + 1);
int length = end - start - 1;
buf.writeInt(length, start + 1);

// checksum.reset();
// checksum.update(buf.getData(), start, end-start);
// int sum = (int)checksum.getValue();
// buf.writeInt(sum);
checksum.reset();
checksum.update(buf.getData(), start, end-start);
int sum = (int)checksum.getValue();
buf.writeInt(sum);
}
}

Expand Down Expand Up @@ -5075,8 +5079,10 @@ private static class LengthPrefixedReader extends Reader {
*
* The minimum Op has:
* 1-byte opcode
* 4-byte length
* 8-byte txid
* 8-byte inodeid
* 0-byte body
* 4-byte checksum
*/
private static final int MIN_OP_LENGTH = 17;

Expand All @@ -5087,6 +5093,11 @@ private static class LengthPrefixedReader extends Reader {
*/
private static final int OP_ID_LENGTH = 1;

/**
* The checksum length.
*
* Not included in the stored length.
*/
private static final int CHECKSUM_LENGTH = 4;

private final Checksum checksum;
Expand All @@ -5099,42 +5110,22 @@ private static class LengthPrefixedReader extends Reader {

@Override
public FSEditLogOp decodeOp() throws IOException {
in.mark(maxOpSize);

byte opCodeByte;
try {
opCodeByte = in.readByte();
} catch (EOFException eof) {
// EOF at an opcode boundary is expected.
return null;
}

if (opCodeByte == FSEditLogOpCodes.OP_INVALID.getOpCode()) {
// verifyTerminator();
long txid = decodeOpFrame();
if (txid == HdfsServerConstants.INVALID_TXID) {
return null;
}

FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
in.reset();
in.mark(maxOpSize);
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(in.readByte());
FSEditLogOp op = cache.get(opCode);
if (op == null) {
throw new IOException("Read invalid opcode " + opCode);
}

long txid = in.readLong();
if (txid == HdfsServerConstants.INVALID_TXID) {
return null;
}
op.setTransactionId(txid);
if (op.getOpCode() == OP_ADD) {
AddOp addop = (AddOp)op;
addop.setInodeId(in.readLong());
} else if (op.getOpCode() == OP_DELETE) {
DeleteOp deleteop = (DeleteOp)op;
deleteop.setInodeId(in.readLong());
} else if (op.getOpCode() == OP_MKDIR) {
MkdirOp mkdirop = (MkdirOp)op;
mkdirop.setInodeId(in.readLong());
}
IOUtils.skipFully(in, 4 + 8); // skip length and txid
op.readFields(in, logVersion);
// skip over the checksum, which we validated above.
IOUtils.skipFully(in, CHECKSUM_LENGTH);
return op;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2538,12 +2538,7 @@ private HdfsFileStatus startFileInt(String src,
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
String syncLog = System.getenv("SYNC_FILESCALE_LOG");
if (Boolean.parseBoolean(syncLog) == false) {
INodeKeyedObjects.asyncUpdateDB();
} else {
getEditLog().logSync();
}
getEditLog().logSync();
if (toRemoveBlocks != null) {
removeBlocks(toRemoveBlocks);
toRemoveBlocks.clear();
Expand Down Expand Up @@ -3082,12 +3077,7 @@ boolean delete(String src, boolean recursive, boolean logRetryCache)
writeUnlock(operationName);
}

String syncLog = System.getenv("SYNC_FILESCALE_LOG");
if (Boolean.parseBoolean(syncLog) == false) {
INodeKeyedObjects.asyncUpdateDB();
} else {
getEditLog().logSync();
}
getEditLog().logSync();

if (toRemovedBlocks != null) {
removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
Expand Down Expand Up @@ -3236,12 +3226,8 @@ boolean mkdirs(String src, PermissionStatus permissions,
} finally {
writeUnlock(operationName);
}
String syncLog = System.getenv("SYNC_FILESCALE_LOG");
if (Boolean.parseBoolean(syncLog) == false) {
INodeKeyedObjects.asyncUpdateDB();
} else {
getEditLog().logSync();
}

getEditLog().logSync();
logAuditEvent(true, operationName, src, null, auditStat);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,8 @@ public static IndexedCache<CompositeKey, INode> getCache() {
concurrentHashSet = ConcurrentHashMap.newKeySet();
concurrentRemoveSet = ConcurrentHashMap.newKeySet();

String syncLog = System.getenv("SYNC_FILESCALE_LOG");

if (Boolean.parseBoolean(syncLog) == false) {
BackupSetToDB();
}
// async write updates to buffer
BackupSetToDB();

// Assuming each INode has 600 bytes, then
// 10000000 * 600 / 2^30 = 5.58 GB.
Expand Down
2 changes: 1 addition & 1 deletion voltdb/BatchRemoveINodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public long run(final long[] ids) throws VoltAbortException {
long cid = set.get(i);
i++;
voltQueueSQL(sql1, cid);
VoltTable[] results = voltExecuteSQL();
results = voltExecuteSQL();
if (results[0].getRowCount() < 1) {
continue;
}
Expand Down

0 comments on commit 55bae70

Please sign in to comment.