Skip to content

Commit

Permalink
Merge pull request #204 from DSL-UMD/distributed
Browse files Browse the repository at this point in the history
[FIX]  awaitility schedule conflicts
  • Loading branch information
gangliao authored Sep 28, 2019
2 parents 9d0d181 + 92f5eb7 commit 1129490
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 54 deletions.
6 changes: 0 additions & 6 deletions hadoop-hdfs-project/hadoop-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
</properties>

<dependencies>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>commons-pool2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2552,7 +2552,7 @@ private HdfsFileStatus startFileInt(String src,

String syncStr = System.getenv("SYNC_COMMAND_LOGGING");
if (syncStr != null && Boolean.parseBoolean(syncStr) == true) {
await().atMost(10, SECONDS).until(() -> INodeKeyedObjects.getBackupSet().size() < 1024);
INodeKeyedObjects.syncUpdateDB();
}

return stat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,59 +33,62 @@ public static Set<Long> getBackupSet() {
return concurrentHashSet;
}

public static void BackupSetToDB() {
public static void syncUpdateDB() {
// In HDFS, the default log buffer size is 512 * 1024 bytes, or 512 KB.
// We assume that each object size is 512 bytes, then the size of
// concurrentHashSet should be 1024 which only records INode Id.
// Note: Using INode Id, it's easy to find INode object in cache.

int i = 0;
final int num = 1024;
if (concurrentHashSet.size() >= num) {
Iterator<Long> iterator = concurrentHashSet.iterator();
if (LOG.isInfoEnabled()) {
LOG.info("Sync files/directories from cache to database.");
}

List<Long> longAttr = new ArrayList<>();
List<String> strAttr = new ArrayList<>();

List<Long> fileIds = new ArrayList<>();
List<String> fileAttr = new ArrayList<>();
while (iterator.hasNext()) {
INode inode =
INodeKeyedObjects.getCache().getIfPresent(Long.class, iterator.next());

strAttr.add(inode.getLocalName());
longAttr.add(inode.getParentId());
longAttr.add(inode.getId());
longAttr.add(inode.getModificationTime());
longAttr.add(inode.getAccessTime());
longAttr.add(inode.getPermissionLong());
if (inode.isDirectory()) {
longAttr.add(0L);
} else {
longAttr.add(inode.asFile().getHeaderLong());
FileUnderConstructionFeature uc =
inode.asFile().getFileUnderConstructionFeature();
if (uc != null) {
fileIds.add(inode.getId());
fileAttr.add(uc.getClientName(inode.getId()));
fileAttr.add(uc.getClientMachine(inode.getId()));
}
}
iterator.remove();
if (++i >= num) break;
}
try {
DatabaseINode.batchUpdateINodes(longAttr, strAttr, fileIds, fileAttr);
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static void BackupSetToDB() {
final Runnable updateToDB =
new Runnable() {
public void run() {
int i = 0;
final int num = 1024;
if (concurrentHashSet.size() >= num) {
Iterator<Long> iterator = concurrentHashSet.iterator();
if (LOG.isInfoEnabled()) {
LOG.info("Sync files/directories from cache to database.");
}

List<Long> longAttr = new ArrayList<>();
List<String> strAttr = new ArrayList<>();

List<Long> fileIds = new ArrayList<>();
List<String> fileAttr = new ArrayList<>();
while (iterator.hasNext()) {
INode inode =
INodeKeyedObjects.getCache().getIfPresent(Long.class, iterator.next());

strAttr.add(inode.getLocalName());
longAttr.add(inode.getParentId());
longAttr.add(inode.getId());
longAttr.add(inode.getModificationTime());
longAttr.add(inode.getAccessTime());
longAttr.add(inode.getPermissionLong());
if (inode.isDirectory()) {
longAttr.add(0L);
} else {
longAttr.add(inode.asFile().getHeaderLong());
FileUnderConstructionFeature uc =
inode.asFile().getFileUnderConstructionFeature();
if (uc != null) {
fileIds.add(inode.getId());
fileAttr.add(uc.getClientName(inode.getId()));
fileAttr.add(uc.getClientMachine(inode.getId()));
}
}
iterator.remove();
if (++i >= num) break;
}
try {
DatabaseINode.batchUpdateINodes(longAttr, strAttr, fileIds, fileAttr);
} catch (Exception e) {
e.printStackTrace();
}
}
syncUpdateDB();
}
};

Expand Down Expand Up @@ -117,7 +120,11 @@ public void run() {
public static IndexedCache<CompositeKey, INode> getCache() {
if (cache == null) {
concurrentHashSet = ConcurrentHashMap.newKeySet();
BackupSetToDB();

String syncStr = System.getenv("SYNC_COMMAND_LOGGING");
if (syncStr == null || Boolean.parseBoolean(syncStr) == false) {
BackupSetToDB();
}

// Assuming each INode has 600 bytes, then
// 10000000 * 600 / 2^30 = 5.58 GB.
Expand Down

0 comments on commit 1129490

Please sign in to comment.