From 2907e7097682c1f361cf46ea6f778936c42faa53 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Fri, 26 Jun 2020 00:49:55 -0400 Subject: [PATCH 01/36] split FileDirectoriesIndex single db to 100 to improve concurrency --- .../build/buildfarm/cas/CASFileCache.java | 24 +-- .../buildfarm/cas/FileDirectoriesIndex.java | 193 ++++++++++++------ .../cas/FileDirectoriesIndexTest.java | 2 +- 3 files changed, 135 insertions(+), 84 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index c657fad5cd..30488fcd67 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -115,9 +115,6 @@ public abstract class CASFileCache implements ContentAddressableStorage { private static final Logger logger = Logger.getLogger(CASFileCache.class.getName()); - protected static final String DEFAULT_DIRECTORIES_INDEX_NAME = "directories.sqlite"; - protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; - private final Path root; private final FileStore fileStore; private final long maxSizeInBytes; @@ -252,7 +249,7 @@ public CASFileCache( expireService, accessRecorder, /* storage=*/ Maps.newConcurrentMap(), - /* directoriesIndexDbName=*/ DEFAULT_DIRECTORIES_INDEX_NAME, + /* directoriesIndexDbName=*/ FileDirectoriesIndex.DEFAULT_DIRECTORIES_INDEX_NAME, /* onPut=*/ (digest) -> {}, /* onExpire=*/ (digests) -> {}, /* delegate=*/ null); @@ -286,24 +283,7 @@ public CASFileCache( this.onExpire = onExpire; this.delegate = delegate; this.directoriesIndexDbName = directoriesIndexDbName; - - String directoriesIndexUrl = "jdbc:sqlite:"; - if (directoriesIndexDbName.equals(DIRECTORIES_INDEX_NAME_MEMORY)) { - directoriesIndexUrl += directoriesIndexDbName; - } else { - // db is ephemeral for now, no reuse occurs to match it, computation - // occurs each time anyway, and expected use of put is noop on collision - Path path = getPath(directoriesIndexDbName); - try { - if (Files.exists(path)) { - Files.delete(path); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - directoriesIndexUrl += path.toString(); - } - this.directoriesIndex = new FileDirectoriesIndex(directoriesIndexUrl, root); + this.directoriesIndex = new FileDirectoriesIndex(directoriesIndexDbName, root); header.before = header.after = header; } diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 01f0abab69..33c796bc62 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -21,6 +21,8 @@ import build.buildfarm.common.DigestUtil; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; @@ -32,8 +34,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; import java.util.Set; -import javax.annotation.concurrent.GuardedBy; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Ephemeral file manifestations of the entry/directory mappings Directory entries are stored in @@ -42,46 +49,84 @@ *

Sqlite db should be removed prior to using this index */ class FileDirectoriesIndex implements DirectoriesIndex { + public static final Logger logger = Logger.getLogger(FileDirectoriesIndex.class.getName()); + + protected static final String DEFAULT_DIRECTORIES_INDEX_NAME = "directories.sqlite"; + protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; + private static final Charset UTF_8 = Charset.forName("UTF-8"); + private static final int DEFAULT_NUM_OF_DB = 100; - private final String dbUrl; private final Path root; + private final int numOfdb; - private boolean opened = false; - private Connection conn; + private String[] dbUrls; + private boolean[] isOpen; + private Connection[] conns; - FileDirectoriesIndex(String dbUrl, Path root) { - this.dbUrl = dbUrl; + FileDirectoriesIndex(String directoriesIndexDbName, Path root, int numOfdb) { this.root = root; + this.numOfdb = numOfdb; + this.dbUrls = new String[this.numOfdb]; + String directoriesIndexUrl = "jdbc:sqlite:"; + if (directoriesIndexDbName.equals(DIRECTORIES_INDEX_NAME_MEMORY)) { + directoriesIndexUrl += directoriesIndexDbName; + Arrays.fill(dbUrls, directoriesIndexUrl); + } else { + // db is ephemeral for now, no reuse occurs to match it, computation + // occurs each time anyway, and expected use of put is noop on collision + for (int i = 0; i < dbUrls.length; i++) { + Path path = root.resolve(directoriesIndexDbName + i); + try { + if (Files.exists(path)) { + Files.delete(path); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + dbUrls[i] = directoriesIndexUrl + path.toString(); + } + } + + isOpen = new boolean[this.numOfdb]; + conns = new Connection[this.numOfdb]; + open(); + } + + FileDirectoriesIndex(String dbUrl, Path root) { + this(dbUrl, root, DEFAULT_NUM_OF_DB); } - @GuardedBy("this") private void open() { - if (!opened) { - try { - conn = DriverManager.getConnection(dbUrl); - try (Statement safetyStatement = conn.createStatement()) { - safetyStatement.execute("PRAGMA synchronous=OFF"); - safetyStatement.execute("PRAGMA journal_mode=OFF"); - safetyStatement.execute("PRAGMA cache_size=100000"); + for (int i = 0; i < isOpen.length; i++) { + if (!isOpen[i]) { + try { + logger.log(Level.WARNING, "Creating: " + i + " -> " + dbUrls[i]); + conns[i] = DriverManager.getConnection(dbUrls[i]); + try (Statement safetyStatement = conns[i].createStatement()) { + safetyStatement.execute("PRAGMA synchronous=OFF"); + safetyStatement.execute("PRAGMA journal_mode=OFF"); + safetyStatement.execute("PRAGMA cache_size=100000"); + } + } catch (SQLException e) { + throw new RuntimeException(e); } - } catch (SQLException e) { - throw new RuntimeException(e); - } - String createEntriesSql = - "CREATE TABLE entries (\n" - + " path TEXT NOT NULL,\n" - + " directory TEXT NOT NULL\n" - + ")"; + String createEntriesSql = + "CREATE TABLE entries (\n" + + " path TEXT NOT NULL,\n" + + " directory TEXT NOT NULL\n" + + ")"; - try (Statement stmt = conn.createStatement()) { - stmt.execute(createEntriesSql); - } catch (SQLException e) { - throw new RuntimeException(e); - } + try (Statement stmt = conns[i].createStatement()) { + logger.log(Level.WARNING, "Creating: " + dbUrls[i]); + stmt.execute(createEntriesSql); + } catch (SQLException e) { + throw new RuntimeException(e); + } - opened = true; + isOpen[i] = true; + } } } @@ -90,27 +135,42 @@ public synchronized void start() { open(); String createIndexSql = "CREATE INDEX path_idx ON entries (path)"; - try (Statement stmt = conn.createStatement()) { - stmt.execute(createIndexSql); - } catch (SQLException e) { - throw new RuntimeException(e); + int nThread = Runtime.getRuntime().availableProcessors(); + String threadNameFormat = "create-sqlite-index-%d"; + ExecutorService pool = Executors.newFixedThreadPool( + nThread, new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build() + ); + + for (Connection conn : conns) { + pool.execute( + () -> { + try (Statement stmt = conn.createStatement()) { + stmt.execute(createIndexSql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + ); } } @Override public void close() { - try { - conn.close(); - } catch (SQLException e) { - throw new RuntimeException(e); + for (int i = 0; i < conns.length; i++) { + try { + conns[i].close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + isOpen[i] = false; } - opened = false; } - @GuardedBy("this") + // The method should be GuardedBy(conns[Math.abs(entry.hashCode()) % DATABASE_NUMBER]). private Set removeEntryDirectories(String entry) { open(); + Connection conn = conns[Math.abs(entry.hashCode()) % numOfdb]; String selectSql = "SELECT directory FROM entries WHERE path = ?"; String deleteSql = "DELETE FROM entries where path = ?"; @@ -136,8 +196,12 @@ Path path(Digest digest) { } @Override - public synchronized Set removeEntry(String entry) { - Set directories = removeEntryDirectories(entry); + public Set removeEntry(String entry) { + int dbIndex = Math.abs(entry.hashCode()) % numOfdb; + Set directories; + synchronized (conns[dbIndex]) { + directories = removeEntryDirectories(entry); + } try { for (Digest directory : directories) { Files.delete(path(directory)); @@ -159,20 +223,19 @@ public Iterable directoryEntries(Digest directory) { } } - private synchronized void addEntriesDirectory(Set entries, Digest directory) { + // The method should be GuardedBy(conns[Math.abs(entry.hashCode()) % DATABASE_NUMBER]). + private void addEntriesDirectory(String entry, Digest directory) { open(); String digest = DigestUtil.toString(directory); - String insertSql = "INSERT INTO entries (path, directory) VALUES (?,?)"; - try (PreparedStatement insertStatement = conn.prepareStatement(insertSql)) { - conn.setAutoCommit(false); + String insertSql = "INSERT INTO entries (path, directory)\n" + " VALUES (?,?)"; + int dbIndex = Math.abs(entry.hashCode()) % numOfdb; + try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { + conns[dbIndex].setAutoCommit(false); insertStatement.setString(2, digest); - for (String entry : entries) { - insertStatement.setString(1, entry); - insertStatement.addBatch(); - } - insertStatement.executeBatch(); - conn.commit(); + insertStatement.setString(1, entry); + insertStatement.executeUpdate(); + conns[dbIndex].commit(); } catch (SQLException e) { throw new RuntimeException(e); } @@ -185,24 +248,28 @@ public void put(Digest directory, Iterable entries) { } catch (IOException e) { throw new RuntimeException(e); } - addEntriesDirectory(ImmutableSet.copyOf(entries), directory); + Set uniqueEntries = ImmutableSet.copyOf(entries); + for (String entry : uniqueEntries) { + synchronized (conns[Math.abs(entry.hashCode()) % numOfdb]) { + addEntriesDirectory(entry, directory); + } + } } - @GuardedBy("this") - private void removeEntriesDirectory(Iterable entries, Digest directory) { + // The method should be GuardedBy(conns[Math.abs(entry.hashCode()) % DATABASE_NUMBER]). + private void removeEntriesDirectory(String entry, Digest directory) { open(); String digest = DigestUtil.toString(directory); String deleteSql = "DELETE FROM entries WHERE path = ? AND directory = ?"; - try (PreparedStatement deleteStatement = conn.prepareStatement(deleteSql)) { - conn.setAutoCommit(false); + int dbIndex = Math.abs(entry.hashCode()) % numOfdb; + try (PreparedStatement deleteStatement = conns[dbIndex].prepareStatement(deleteSql)) { + conns[dbIndex].setAutoCommit(false); // safe for multi delete + deleteStatement.setString(1, entry); deleteStatement.setString(2, digest); - for (String entry : entries) { - deleteStatement.setString(1, entry); - deleteStatement.executeUpdate(); - } - conn.commit(); + deleteStatement.executeUpdate(); + conns[dbIndex].commit(); } catch (SQLException e) { throw new RuntimeException(e); } @@ -218,6 +285,10 @@ public synchronized void remove(Digest directory) { } catch (IOException e) { throw new RuntimeException(e); } - removeEntriesDirectory(entries, directory); + for (String entry : entries) { + synchronized (conns[Math.abs(entry.hashCode()) % numOfdb]) { + removeEntriesDirectory(entry, directory); + } + } } } diff --git a/src/test/java/build/buildfarm/cas/FileDirectoriesIndexTest.java b/src/test/java/build/buildfarm/cas/FileDirectoriesIndexTest.java index e83e9c6664..33323bdd84 100644 --- a/src/test/java/build/buildfarm/cas/FileDirectoriesIndexTest.java +++ b/src/test/java/build/buildfarm/cas/FileDirectoriesIndexTest.java @@ -36,7 +36,7 @@ public class FileDirectoriesIndexTest { private final DigestUtil DIGEST_UTIL = new DigestUtil(HashFunction.SHA256); - private final String jdbcIndexUrl = "jdbc:sqlite::memory:"; + private final String jdbcIndexUrl = ":memory:"; private Path root; private FileDirectoriesIndex directoriesIndex; From 64c885073afaad8d312caf12fc4d49b6df2d3c5d Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Fri, 26 Jun 2020 01:03:39 -0400 Subject: [PATCH 02/36] fix compiling error --- src/main/java/build/buildfarm/cas/CASFileCache.java | 4 +++- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index 30488fcd67..1282162ac9 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -115,6 +115,8 @@ public abstract class CASFileCache implements ContentAddressableStorage { private static final Logger logger = Logger.getLogger(CASFileCache.class.getName()); + protected static final String DEFAULT_DIRECTORIES_INDEX_NAME = "directories.sqlite"; + private final Path root; private final FileStore fileStore; private final long maxSizeInBytes; @@ -249,7 +251,7 @@ public CASFileCache( expireService, accessRecorder, /* storage=*/ Maps.newConcurrentMap(), - /* directoriesIndexDbName=*/ FileDirectoriesIndex.DEFAULT_DIRECTORIES_INDEX_NAME, + /* directoriesIndexDbName=*/ DEFAULT_DIRECTORIES_INDEX_NAME, /* onPut=*/ (digest) -> {}, /* onExpire=*/ (digests) -> {}, /* delegate=*/ null); diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 33c796bc62..a1bf01b181 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -51,7 +51,6 @@ class FileDirectoriesIndex implements DirectoriesIndex { public static final Logger logger = Logger.getLogger(FileDirectoriesIndex.class.getName()); - protected static final String DEFAULT_DIRECTORIES_INDEX_NAME = "directories.sqlite"; protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); From c17cdd382bf3b0f12e440873c8a4223d8e875366 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Fri, 26 Jun 2020 01:22:28 -0400 Subject: [PATCH 03/36] remove logging --- .../java/build/buildfarm/cas/FileDirectoriesIndex.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index a1bf01b181..16ab716a87 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -36,11 +36,8 @@ import java.sql.Statement; import java.util.Arrays; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.logging.Level; -import java.util.logging.Logger; /** * Ephemeral file manifestations of the entry/directory mappings Directory entries are stored in @@ -49,8 +46,6 @@ *

Sqlite db should be removed prior to using this index */ class FileDirectoriesIndex implements DirectoriesIndex { - public static final Logger logger = Logger.getLogger(FileDirectoriesIndex.class.getName()); - protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); @@ -100,7 +95,6 @@ private void open() { for (int i = 0; i < isOpen.length; i++) { if (!isOpen[i]) { try { - logger.log(Level.WARNING, "Creating: " + i + " -> " + dbUrls[i]); conns[i] = DriverManager.getConnection(dbUrls[i]); try (Statement safetyStatement = conns[i].createStatement()) { safetyStatement.execute("PRAGMA synchronous=OFF"); @@ -118,7 +112,6 @@ private void open() { + ")"; try (Statement stmt = conns[i].createStatement()) { - logger.log(Level.WARNING, "Creating: " + dbUrls[i]); stmt.execute(createEntriesSql); } catch (SQLException e) { throw new RuntimeException(e); From 502cc2d22455aa69bc07f626c9eb3bf671ca2ddf Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Tue, 30 Jun 2020 17:51:56 -0400 Subject: [PATCH 04/36] add queue for each sqlite db as buffer zone --- .../build/buildfarm/cas/CASFileCache.java | 2 + .../build/buildfarm/cas/DirectoriesIndex.java | 2 + .../buildfarm/cas/FileDirectoriesIndex.java | 103 ++++++++++++++++-- .../buildfarm/cas/MemoryDirectoriesIndex.java | 3 + 4 files changed, 101 insertions(+), 9 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index 1282162ac9..0f3a9e8047 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -1448,6 +1448,7 @@ private List computeDirectories(CacheScanResults cacheScanResults) ImmutableList.Builder invalidDirectories = new ImmutableList.Builder<>(); + directoriesIndex.setBatchMode(true); for (Path path : cacheScanResults.computeDirs) { pool.execute( () -> { @@ -1479,6 +1480,7 @@ private List computeDirectories(CacheScanResults cacheScanResults) } joinThreads(pool, "Populating Directories...", 1, MINUTES); + directoriesIndex.setBatchMode(false); return invalidDirectories.build(); } diff --git a/src/main/java/build/buildfarm/cas/DirectoriesIndex.java b/src/main/java/build/buildfarm/cas/DirectoriesIndex.java index 9af93feb54..89884e41b0 100644 --- a/src/main/java/build/buildfarm/cas/DirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/DirectoriesIndex.java @@ -34,4 +34,6 @@ interface DirectoriesIndex { void remove(Digest directory); void start(); + + void setBatchMode(boolean batchMode) throws InterruptedException; } diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 16ab716a87..aa68407f37 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; @@ -35,9 +34,12 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; +import java.util.LinkedList; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Ephemeral file manifestations of the entry/directory mappings Directory entries are stored in @@ -50,6 +52,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = 100; + private static final int QUEUE_SIZE = 100; private final Path root; private final int numOfdb; @@ -58,6 +61,9 @@ class FileDirectoriesIndex implements DirectoriesIndex { private boolean[] isOpen; private Connection[] conns; + private boolean batchMode = false; + private Queue[] queues; + FileDirectoriesIndex(String directoriesIndexDbName, Path root, int numOfdb) { this.root = root; this.numOfdb = numOfdb; @@ -84,6 +90,12 @@ class FileDirectoriesIndex implements DirectoriesIndex { isOpen = new boolean[this.numOfdb]; conns = new Connection[this.numOfdb]; + queues = new Queue[this.numOfdb]; + + for (int i = 0; i < queues.length; i++) { + queues[i] = new LinkedList<>(); + } + open(); } @@ -129,9 +141,9 @@ public synchronized void start() { String createIndexSql = "CREATE INDEX path_idx ON entries (path)"; int nThread = Runtime.getRuntime().availableProcessors(); String threadNameFormat = "create-sqlite-index-%d"; - ExecutorService pool = Executors.newFixedThreadPool( - nThread, new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build() - ); + ExecutorService pool = + Executors.newFixedThreadPool( + nThread, new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build()); for (Connection conn : conns) { pool.execute( @@ -141,8 +153,42 @@ nThread, new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build() } catch (SQLException e) { throw new RuntimeException(e); } - } - ); + }); + } + + pool.shutdown(); + while (!pool.isTerminated()) { + try { + pool.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void setBatchMode(boolean batchMode) throws InterruptedException { + this.batchMode = batchMode; + if (batchMode) { + return; + } + int nThread = Runtime.getRuntime().availableProcessors(); + String threadNameFormat = "drain-queue-%d"; + ExecutorService pool = + Executors.newFixedThreadPool( + nThread, new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build()); + for (int i = 0; i < queues.length; i++) { + int index = i; + pool.execute(() -> addEntriesDirectory(index)); + } + + pool.shutdown(); + while (!pool.isTerminated()) { + try { + pool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } @@ -224,8 +270,8 @@ private void addEntriesDirectory(String entry, Digest directory) { int dbIndex = Math.abs(entry.hashCode()) % numOfdb; try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { conns[dbIndex].setAutoCommit(false); - insertStatement.setString(2, digest); insertStatement.setString(1, entry); + insertStatement.setString(2, digest); insertStatement.executeUpdate(); conns[dbIndex].commit(); } catch (SQLException e) { @@ -233,6 +279,25 @@ private void addEntriesDirectory(String entry, Digest directory) { } } + private void addEntriesDirectory(int dbIndex) { + open(); + + String insertSql = "INSERT INTO entries (path, directory)\n" + " VALUES (?,?)"; + try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { + conns[dbIndex].setAutoCommit(false); + while (!queues[dbIndex].isEmpty()) { + MapEntry e = queues[dbIndex].poll(); + insertStatement.setString(1, e.entry); + insertStatement.setString(2, e.digest); + insertStatement.addBatch(); + } + insertStatement.executeBatch(); + conns[dbIndex].commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + @Override public void put(Digest directory, Iterable entries) { try { @@ -242,8 +307,18 @@ public void put(Digest directory, Iterable entries) { } Set uniqueEntries = ImmutableSet.copyOf(entries); for (String entry : uniqueEntries) { - synchronized (conns[Math.abs(entry.hashCode()) % numOfdb]) { - addEntriesDirectory(entry, directory); + int index = Math.abs(entry.hashCode()) % numOfdb; + if (batchMode) { + synchronized (queues[index]) { + queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); + if (queues[index].size() >= QUEUE_SIZE) { + addEntriesDirectory(index); + } + } + } else { + synchronized (conns[index]) { + addEntriesDirectory(entry, directory); + } } } } @@ -283,4 +358,14 @@ public synchronized void remove(Digest directory) { } } } + + private static class MapEntry { + String entry; + String digest; + + MapEntry(String entry, String digest) { + this.entry = entry; + this.digest = digest; + } + } } diff --git a/src/main/java/build/buildfarm/cas/MemoryDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/MemoryDirectoriesIndex.java index 7e8f9b61f8..def4577a61 100644 --- a/src/main/java/build/buildfarm/cas/MemoryDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/MemoryDirectoriesIndex.java @@ -39,6 +39,9 @@ public void close() {} @Override public void start() {} + @Override + public void setBatchMode(boolean batchMode) {} + @Override public synchronized Set removeEntry(String entry) { Set directories = entryDirectories.removeAll(entry); From 5417f0949c9a9934328f57bc7515769a195de0d9 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Tue, 30 Jun 2020 18:08:48 -0400 Subject: [PATCH 05/36] add logging for debug --- src/main/java/build/buildfarm/cas/CASFileCache.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index 0f3a9e8047..7e44d07936 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -1284,9 +1284,14 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD LogComputeDirectoriesResults(invalidDirectories); deleteInvalidFileContent(invalidDirectories, removeDirectoryService); + Instant beforeIndexTime= Instant.now(); + logger.log(Level.INFO, "Inserting done: " + Duration.between(startTime, beforeIndexTime).getSeconds() + "s"); + logger.log(Level.INFO, "Creating Index"); directoriesIndex.start(); logger.log(Level.INFO, "Index Created"); + Instant afterIndexTime= Instant.now(); + logger.log(Level.INFO, "Inserting done: " + Duration.between(beforeIndexTime, afterIndexTime).getSeconds() + "s"); // Calculate Startup time Instant endTime = Instant.now(); From ede212edafaca6a709768f00281448bcc071dcb8 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 1 Jul 2020 11:02:18 -0400 Subject: [PATCH 06/36] adjust cas-test cas size and HashFunction and add logging --- src/main/java/build/buildfarm/CASTest.java | 4 +-- .../build/buildfarm/cas/CASFileCache.java | 25 +++++++++++++------ .../buildfarm/cas/FileDirectoriesIndex.java | 2 +- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/main/java/build/buildfarm/CASTest.java b/src/main/java/build/buildfarm/CASTest.java index 4ce10572cb..14280577da 100644 --- a/src/main/java/build/buildfarm/CASTest.java +++ b/src/main/java/build/buildfarm/CASTest.java @@ -65,8 +65,8 @@ public static void main(String[] args) throws Exception { CASFileCache fileCache = new LocalCASFileCache( root, - /* maxSizeInBytes=*/ GBtoBytes(500), - new DigestUtil(HashFunction.SHA1), + /* maxSizeInBytes=*/ GBtoBytes(2 * 1024), + new DigestUtil(HashFunction.SHA256), /* expireService=*/ newDirectExecutorService(), /* accessRecorder=*/ directExecutor()); diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index 7e44d07936..c549cc0136 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -1277,6 +1277,12 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD CacheScanResults cacheScanResults = scanRoot(); LogCacheScanResults(cacheScanResults); deleteInvalidFileContent(cacheScanResults.deleteFiles, removeDirectoryService); + Instant phase1Ending = Instant.now(); + logger.log( + Level.INFO, + "TIME_LOGGING: Phase 1 done: " + + Duration.between(startTime, phase1Ending).getSeconds() + + "s"); // Phase 2: Compute // recursively construct all directory structures. @@ -1284,14 +1290,19 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD LogComputeDirectoriesResults(invalidDirectories); deleteInvalidFileContent(invalidDirectories, removeDirectoryService); - Instant beforeIndexTime= Instant.now(); - logger.log(Level.INFO, "Inserting done: " + Duration.between(startTime, beforeIndexTime).getSeconds() + "s"); + Instant beforeIndexing = Instant.now(); + logger.log( + Level.INFO, + "TIME_LOGGING: Phase 2 Inserting done: " + + Duration.between(phase1Ending, beforeIndexing).getSeconds() + + "s"); - logger.log(Level.INFO, "Creating Index"); directoriesIndex.start(); - logger.log(Level.INFO, "Index Created"); - Instant afterIndexTime= Instant.now(); - logger.log(Level.INFO, "Inserting done: " + Duration.between(beforeIndexTime, afterIndexTime).getSeconds() + "s"); + Instant afterIndexing = Instant.now(); + logger.log( + Level.INFO, + "TIME_LOGGING: Index Created: " + Duration.between(beforeIndexing, afterIndexing)); + Instant afterIndexTime = Instant.now(); // Calculate Startup time Instant endTime = Instant.now(); @@ -1387,7 +1398,7 @@ private void processRootFile( // ignore our directories index database // indexes will be removed and rebuilt for compute - if (!basename.equals(directoriesIndexDbName)) { + if (!basename.startsWith(directoriesIndexDbName)) { FileStatus stat = stat(file, false); // mark directory for later key compute diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index aa68407f37..17c9b46b10 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -52,7 +52,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = 100; - private static final int QUEUE_SIZE = 100; + private static final int QUEUE_SIZE = 1000; private final Path root; private final int numOfdb; From 1cc0f23875bfe49ff5a11ec110cb3d893c8b60c4 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 1 Jul 2020 14:21:35 -0400 Subject: [PATCH 07/36] drain all queues each time any queue is full --- src/main/java/build/buildfarm/cas/CASFileCache.java | 5 +++-- .../java/build/buildfarm/cas/FileDirectoriesIndex.java | 9 ++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index c549cc0136..7da98d7e52 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -1301,8 +1301,9 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD Instant afterIndexing = Instant.now(); logger.log( Level.INFO, - "TIME_LOGGING: Index Created: " + Duration.between(beforeIndexing, afterIndexing)); - Instant afterIndexTime = Instant.now(); + "TIME_LOGGING: Index Created: " + + Duration.between(beforeIndexing, afterIndexing).getSeconds() + + "s"); // Calculate Startup time Instant endTime = Instant.now(); diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 17c9b46b10..dd31e9e21d 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -172,6 +172,10 @@ public void setBatchMode(boolean batchMode) throws InterruptedException { if (batchMode) { return; } + drainQueues(); + } + + private void drainQueues() { int nThread = Runtime.getRuntime().availableProcessors(); String threadNameFormat = "drain-queue-%d"; ExecutorService pool = @@ -190,6 +194,7 @@ public void setBatchMode(boolean batchMode) throws InterruptedException { throw new RuntimeException(e); } } + } @Override @@ -312,7 +317,9 @@ public void put(Digest directory, Iterable entries) { synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); if (queues[index].size() >= QUEUE_SIZE) { - addEntriesDirectory(index); + synchronized (this) { + drainQueues(); + } } } } else { From 91890bea3d9c79c18f729bf6b826888fb27d3c31 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 1 Jul 2020 14:54:04 -0400 Subject: [PATCH 08/36] avoid npe --- .../java/build/buildfarm/cas/FileDirectoriesIndex.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index dd31e9e21d..d426980c5f 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -287,11 +287,18 @@ private void addEntriesDirectory(String entry, Digest directory) { private void addEntriesDirectory(int dbIndex) { open(); + if (queues[dbIndex].isEmpty()) { + return; + } + String insertSql = "INSERT INTO entries (path, directory)\n" + " VALUES (?,?)"; try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { conns[dbIndex].setAutoCommit(false); while (!queues[dbIndex].isEmpty()) { MapEntry e = queues[dbIndex].poll(); + if (e == null) { + continue; + } insertStatement.setString(1, e.entry); insertStatement.setString(2, e.digest); insertStatement.addBatch(); From 2648403223b08aab3817215f1e6493495677e8dd Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 1 Jul 2020 17:23:27 -0400 Subject: [PATCH 09/36] synchronize queues draining --- .../buildfarm/cas/FileDirectoriesIndex.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index d426980c5f..56bbc4abf7 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -40,6 +40,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Ephemeral file manifestations of the entry/directory mappings Directory entries are stored in @@ -48,11 +51,13 @@ *

Sqlite db should be removed prior to using this index */ class FileDirectoriesIndex implements DirectoriesIndex { + private static final Logger logger = Logger.getLogger(CASFileCache.class.getName()); + protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = 100; - private static final int QUEUE_SIZE = 1000; + private static final int MAX_QUEUE_SIZE = 10000; private final Path root; private final int numOfdb; @@ -63,6 +68,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private boolean batchMode = false; private Queue[] queues; + private AtomicInteger queueSize = new AtomicInteger(0); FileDirectoriesIndex(String directoriesIndexDbName, Path root, int numOfdb) { this.root = root; @@ -85,6 +91,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { throw new RuntimeException(e); } dbUrls[i] = directoriesIndexUrl + path.toString(); + logger.log(Level.INFO, "Database url: " + dbUrls[i]); } } @@ -173,9 +180,10 @@ public void setBatchMode(boolean batchMode) throws InterruptedException { return; } drainQueues(); + queueSize.set(0); } - private void drainQueues() { + private synchronized void drainQueues() { int nThread = Runtime.getRuntime().availableProcessors(); String threadNameFormat = "drain-queue-%d"; ExecutorService pool = @@ -186,15 +194,16 @@ private void drainQueues() { pool.execute(() -> addEntriesDirectory(index)); } + logger.log(Level.INFO, "Start to drain the queue."); pool.shutdown(); while (!pool.isTerminated()) { try { - pool.awaitTermination(1, TimeUnit.SECONDS); + pool.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } } - + logger.log(Level.INFO, "Queue Empty"); } @Override @@ -321,13 +330,12 @@ public void put(Digest directory, Iterable entries) { for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; if (batchMode) { + if (queueSize.get() >= MAX_QUEUE_SIZE) { + drainQueues(); + } synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); - if (queues[index].size() >= QUEUE_SIZE) { - synchronized (this) { - drainQueues(); - } - } + queueSize.incrementAndGet(); } } else { synchronized (conns[index]) { From 57859effeb126022b7f83dd565b573543f17b4d0 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 1 Jul 2020 17:41:50 -0400 Subject: [PATCH 10/36] increae max total queue size --- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 56bbc4abf7..2ebd72ab87 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,7 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = 100; - private static final int MAX_QUEUE_SIZE = 10000; + private static final int MAX_QUEUE_SIZE = 1000 * 1000; private final Path root; private final int numOfdb; From 48c1d543df51e0c2569921d56e946769aa774026 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 1 Jul 2020 20:51:38 -0400 Subject: [PATCH 11/36] adjust synchronization point and number of database --- .../build/buildfarm/cas/FileDirectoriesIndex.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 2ebd72ab87..e124c5f8b1 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -56,7 +56,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); - private static final int DEFAULT_NUM_OF_DB = 100; + private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors(); private static final int MAX_QUEUE_SIZE = 1000 * 1000; private final Path root; @@ -91,7 +91,6 @@ class FileDirectoriesIndex implements DirectoriesIndex { throw new RuntimeException(e); } dbUrls[i] = directoriesIndexUrl + path.toString(); - logger.log(Level.INFO, "Database url: " + dbUrls[i]); } } @@ -183,7 +182,7 @@ public void setBatchMode(boolean batchMode) throws InterruptedException { queueSize.set(0); } - private synchronized void drainQueues() { + private void drainQueues() { int nThread = Runtime.getRuntime().availableProcessors(); String threadNameFormat = "drain-queue-%d"; ExecutorService pool = @@ -330,8 +329,10 @@ public void put(Digest directory, Iterable entries) { for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; if (batchMode) { - if (queueSize.get() >= MAX_QUEUE_SIZE) { - drainQueues(); + synchronized (this) { + if (queueSize.get() >= MAX_QUEUE_SIZE) { + drainQueues(); + } } synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); From 2070ff1c95670f1377b04ac06e0e1ae39c319ab1 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 1 Jul 2020 21:01:08 -0400 Subject: [PATCH 12/36] adjust synchronization point --- .../build/buildfarm/cas/FileDirectoriesIndex.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index e124c5f8b1..1880b5853b 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,7 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors(); - private static final int MAX_QUEUE_SIZE = 1000 * 1000; + private static final int MAX_QUEUE_SIZE = 10 * 1000 * 1000; private final Path root; private final int numOfdb; @@ -320,6 +320,11 @@ private void addEntriesDirectory(int dbIndex) { @Override public void put(Digest directory, Iterable entries) { + synchronized (this) { + if (queueSize.get() >= MAX_QUEUE_SIZE) { + drainQueues(); + } + } try { asCharSink(path(directory), UTF_8).writeLines(entries); } catch (IOException e) { @@ -329,11 +334,6 @@ public void put(Digest directory, Iterable entries) { for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; if (batchMode) { - synchronized (this) { - if (queueSize.get() >= MAX_QUEUE_SIZE) { - drainQueues(); - } - } synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); queueSize.incrementAndGet(); From d452210a5b6d95fbb07fd94ffdadcaa32ee55199 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 1 Jul 2020 23:41:36 -0400 Subject: [PATCH 13/36] adjust synchronization --- .../build/buildfarm/cas/FileDirectoriesIndex.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 1880b5853b..86170c8c0e 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -319,11 +319,10 @@ private void addEntriesDirectory(int dbIndex) { } @Override - public void put(Digest directory, Iterable entries) { - synchronized (this) { - if (queueSize.get() >= MAX_QUEUE_SIZE) { - drainQueues(); - } + public synchronized void put(Digest directory, Iterable entries) { + if (queueSize.get() >= MAX_QUEUE_SIZE) { + drainQueues(); + queueSize.set(0); } try { asCharSink(path(directory), UTF_8).writeLines(entries); @@ -334,15 +333,11 @@ public void put(Digest directory, Iterable entries) { for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; if (batchMode) { - synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); queueSize.incrementAndGet(); - } } else { - synchronized (conns[index]) { addEntriesDirectory(entry, directory); } - } } } From 8a574afdf29c6aaeddf9fb015fcf206d0563f882 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 2 Jul 2020 00:24:32 -0400 Subject: [PATCH 14/36] maintain all in memory and then flush to disk --- .../buildfarm/cas/FileDirectoriesIndex.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 86170c8c0e..22d0b11692 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,7 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors(); - private static final int MAX_QUEUE_SIZE = 10 * 1000 * 1000; + private static final int MAX_QUEUE_SIZE = 1000 * 1000 * 1000; private final Path root; private final int numOfdb; @@ -319,11 +319,13 @@ private void addEntriesDirectory(int dbIndex) { } @Override - public synchronized void put(Digest directory, Iterable entries) { - if (queueSize.get() >= MAX_QUEUE_SIZE) { - drainQueues(); - queueSize.set(0); - } + public void put(Digest directory, Iterable entries) { + //synchronized (this) { + // if (queueSize.get() >= MAX_QUEUE_SIZE) { + // drainQueues(); + // queueSize.set(0); + // } + //} try { asCharSink(path(directory), UTF_8).writeLines(entries); } catch (IOException e) { @@ -333,11 +335,16 @@ public synchronized void put(Digest directory, Iterable entries) { for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; if (batchMode) { + synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); - queueSize.incrementAndGet(); + int current = queueSize.incrementAndGet(); + logger.log(Level.INFO, "Current queue Size:" + current); + } } else { + synchronized (conns[index]) { addEntriesDirectory(entry, directory); } + } } } From 88a4a070fdc442d3d889f6575f688c38127022cb Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 2 Jul 2020 00:38:07 -0400 Subject: [PATCH 15/36] avoid logging per entry --- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 22d0b11692..37db34154c 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -338,7 +338,9 @@ public void put(Digest directory, Iterable entries) { synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); int current = queueSize.incrementAndGet(); - logger.log(Level.INFO, "Current queue Size:" + current); + if (current % 100000 == 0) { + logger.log(Level.INFO, "Current queue Size:" + current); + } } } else { synchronized (conns[index]) { From 0c7023b7d2eaef8e7263d523e54bddb4128633f4 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 2 Jul 2020 01:53:26 -0400 Subject: [PATCH 16/36] drain each queue independently --- .../java/build/buildfarm/cas/FileDirectoriesIndex.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 37db34154c..83be5fc660 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,7 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors(); - private static final int MAX_QUEUE_SIZE = 1000 * 1000 * 1000; + private static final int MAX_QUEUE_SIZE = 10 * 1000; private final Path root; private final int numOfdb; @@ -334,12 +334,12 @@ public void put(Digest directory, Iterable entries) { Set uniqueEntries = ImmutableSet.copyOf(entries); for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; + // BatchMode is only used in the worker startup. if (batchMode) { synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); - int current = queueSize.incrementAndGet(); - if (current % 100000 == 0) { - logger.log(Level.INFO, "Current queue Size:" + current); + if (queues[index].size() > MAX_QUEUE_SIZE) { + addEntriesDirectory(index); } } } else { From 6580d554ea30b15b8109c794cdb7cca0a52d1678 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 2 Jul 2020 02:06:16 -0400 Subject: [PATCH 17/36] add logging for debugging --- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 83be5fc660..6b12413e8b 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -299,6 +299,7 @@ private void addEntriesDirectory(int dbIndex) { return; } + logger.log(Level.INFO, "Start draining separate queue: " + dbIndex); String insertSql = "INSERT INTO entries (path, directory)\n" + " VALUES (?,?)"; try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { conns[dbIndex].setAutoCommit(false); @@ -316,6 +317,7 @@ private void addEntriesDirectory(int dbIndex) { } catch (SQLException e) { throw new RuntimeException(e); } + logger.log(Level.INFO, "Drained separate queue: " + dbIndex); } @Override @@ -338,6 +340,10 @@ public void put(Digest directory, Iterable entries) { if (batchMode) { synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); + int current = queueSize.incrementAndGet(); + if (current % (1000 * 10) == 0) { + logger.log(Level.INFO, "Entry added: " + current); + } if (queues[index].size() > MAX_QUEUE_SIZE) { addEntriesDirectory(index); } From 385af36e4d0b5287adebf46b5bc5f5f96f365ace Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 2 Jul 2020 02:07:42 -0400 Subject: [PATCH 18/36] adjust queue size --- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 6b12413e8b..913159fa26 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,7 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors(); - private static final int MAX_QUEUE_SIZE = 10 * 1000; + private static final int MAX_QUEUE_SIZE = 100 * 1000; private final Path root; private final int numOfdb; From bff954d92dc76e6b107c27c72904e9e033b556fa Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 2 Jul 2020 02:44:40 -0400 Subject: [PATCH 19/36] adjust queue size and queue number --- .../java/build/buildfarm/cas/FileDirectoriesIndex.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 913159fa26..c4741093d1 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -56,8 +56,8 @@ class FileDirectoriesIndex implements DirectoriesIndex { protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); - private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors(); - private static final int MAX_QUEUE_SIZE = 100 * 1000; + private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 10; + private static final int MAX_QUEUE_SIZE = 10 * 1000; private final Path root; private final int numOfdb; @@ -197,7 +197,7 @@ private void drainQueues() { pool.shutdown(); while (!pool.isTerminated()) { try { - pool.awaitTermination(5, TimeUnit.SECONDS); + pool.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -341,8 +341,8 @@ public void put(Digest directory, Iterable entries) { synchronized (queues[index]) { queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); int current = queueSize.incrementAndGet(); - if (current % (1000 * 10) == 0) { - logger.log(Level.INFO, "Entry added: " + current); + if (current % (1000 * 1000) == 0) { + logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + "million"); } if (queues[index].size() > MAX_QUEUE_SIZE) { addEntriesDirectory(index); From cec07a6e98f4c92d9fadff477b01325a9d869cc2 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Sun, 5 Jul 2020 17:34:36 -0400 Subject: [PATCH 20/36] drain queue concurrently --- .../build/buildfarm/cas/FileDirectoriesIndex.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index c4741093d1..306addd647 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -56,8 +56,9 @@ class FileDirectoriesIndex implements DirectoriesIndex { protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); - private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 10; + private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 2; private static final int MAX_QUEUE_SIZE = 10 * 1000; + private static final int MAX_TOTAL_QUEUE_SIZE = 2 * 1000 * 1000; private final Path root; private final int numOfdb; @@ -333,6 +334,11 @@ public void put(Digest directory, Iterable entries) { } catch (IOException e) { throw new RuntimeException(e); } + synchronized (this) { + if (queueSize.get() > MAX_TOTAL_QUEUE_SIZE) { + drainQueues(); + } + } Set uniqueEntries = ImmutableSet.copyOf(entries); for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; @@ -344,9 +350,9 @@ public void put(Digest directory, Iterable entries) { if (current % (1000 * 1000) == 0) { logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + "million"); } - if (queues[index].size() > MAX_QUEUE_SIZE) { - addEntriesDirectory(index); - } + //if (queues[index].size() > MAX_QUEUE_SIZE) { + // addEntriesDirectory(index); + //} } } else { synchronized (conns[index]) { From 16e19f63cc98449fbb8b21253a8d6002fe153ff8 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Sun, 5 Jul 2020 17:43:07 -0400 Subject: [PATCH 21/36] increase total queue size --- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 306addd647..262142656e 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -58,7 +58,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 2; private static final int MAX_QUEUE_SIZE = 10 * 1000; - private static final int MAX_TOTAL_QUEUE_SIZE = 2 * 1000 * 1000; + private static final int MAX_TOTAL_QUEUE_SIZE = 20 * 1000 * 1000; private final Path root; private final int numOfdb; @@ -337,6 +337,7 @@ public void put(Digest directory, Iterable entries) { synchronized (this) { if (queueSize.get() > MAX_TOTAL_QUEUE_SIZE) { drainQueues(); + queueSize.set(0); } } Set uniqueEntries = ImmutableSet.copyOf(entries); From 75bdbe3b052b0fd4856d4568b9829e8c80feca08 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Sun, 5 Jul 2020 18:03:00 -0400 Subject: [PATCH 22/36] increase total number of database to reduce lock retension in adding --- .../java/build/buildfarm/cas/FileDirectoriesIndex.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 262142656e..a6a8861262 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -56,7 +56,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); - private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 2; + private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 10; private static final int MAX_QUEUE_SIZE = 10 * 1000; private static final int MAX_TOTAL_QUEUE_SIZE = 20 * 1000 * 1000; @@ -346,11 +346,12 @@ public void put(Digest directory, Iterable entries) { // BatchMode is only used in the worker startup. if (batchMode) { synchronized (queues[index]) { - queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); - int current = queueSize.incrementAndGet(); + int current = queueSize.get(); if (current % (1000 * 1000) == 0) { logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + "million"); } + queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); + queueSize.incrementAndGet(); //if (queues[index].size() > MAX_QUEUE_SIZE) { // addEntriesDirectory(index); //} From 61cf41b93cc4c714c823214e8ad6d9519f1272ef Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Sun, 5 Jul 2020 18:11:32 -0400 Subject: [PATCH 23/36] drain queue independently --- .../buildfarm/cas/FileDirectoriesIndex.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index a6a8861262..9900b4b2d3 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -334,27 +334,20 @@ public void put(Digest directory, Iterable entries) { } catch (IOException e) { throw new RuntimeException(e); } - synchronized (this) { - if (queueSize.get() > MAX_TOTAL_QUEUE_SIZE) { - drainQueues(); - queueSize.set(0); - } - } Set uniqueEntries = ImmutableSet.copyOf(entries); for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; // BatchMode is only used in the worker startup. if (batchMode) { synchronized (queues[index]) { - int current = queueSize.get(); + queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); + int current = queueSize.incrementAndGet(); if (current % (1000 * 1000) == 0) { - logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + "million"); + logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + " million"); + } + if (queues[index].size() > MAX_QUEUE_SIZE) { + addEntriesDirectory(index); } - queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); - queueSize.incrementAndGet(); - //if (queues[index].size() > MAX_QUEUE_SIZE) { - // addEntriesDirectory(index); - //} } } else { synchronized (conns[index]) { From bccffb24000616bb1a87f7e677035f857aad150b Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Sun, 5 Jul 2020 18:26:40 -0400 Subject: [PATCH 24/36] incresease separate queue size --- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 9900b4b2d3..d6bde214cd 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,7 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 10; - private static final int MAX_QUEUE_SIZE = 10 * 1000; + private static final int MAX_QUEUE_SIZE = 100 * 1000; private static final int MAX_TOTAL_QUEUE_SIZE = 20 * 1000 * 1000; private final Path root; From 2b97d0e322d85acfe7fa7ed77d25f0b32f2c3f0b Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Mon, 6 Jul 2020 01:00:23 -0400 Subject: [PATCH 25/36] using string array to replace MapEntry --- .../buildfarm/cas/FileDirectoriesIndex.java | 35 ++++++------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index d6bde214cd..7702f68cc4 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,8 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 10; - private static final int MAX_QUEUE_SIZE = 100 * 1000; - private static final int MAX_TOTAL_QUEUE_SIZE = 20 * 1000 * 1000; + private static final int MAX_QUEUE_SIZE = 10 * 1000; private final Path root; private final int numOfdb; @@ -68,7 +67,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private Connection[] conns; private boolean batchMode = false; - private Queue[] queues; + private Queue[] queues; private AtomicInteger queueSize = new AtomicInteger(0); FileDirectoriesIndex(String directoriesIndexDbName, Path root, int numOfdb) { @@ -167,6 +166,7 @@ public synchronized void start() { while (!pool.isTerminated()) { try { pool.awaitTermination(10, TimeUnit.SECONDS); + logger.log(Level.INFO, "Creating Index ..."); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -198,7 +198,8 @@ private void drainQueues() { pool.shutdown(); while (!pool.isTerminated()) { try { - pool.awaitTermination(1, TimeUnit.SECONDS); + pool.awaitTermination(10, TimeUnit.SECONDS); + logger.log(Level.INFO, "Draining all queues ..."); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -305,12 +306,12 @@ private void addEntriesDirectory(int dbIndex) { try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { conns[dbIndex].setAutoCommit(false); while (!queues[dbIndex].isEmpty()) { - MapEntry e = queues[dbIndex].poll(); - if (e == null) { + String[] entry = queues[dbIndex].poll(); + if (entry == null) { continue; } - insertStatement.setString(1, e.entry); - insertStatement.setString(2, e.digest); + insertStatement.setString(1, entry[0]); + insertStatement.setString(2, entry[1]); insertStatement.addBatch(); } insertStatement.executeBatch(); @@ -323,12 +324,6 @@ private void addEntriesDirectory(int dbIndex) { @Override public void put(Digest directory, Iterable entries) { - //synchronized (this) { - // if (queueSize.get() >= MAX_QUEUE_SIZE) { - // drainQueues(); - // queueSize.set(0); - // } - //} try { asCharSink(path(directory), UTF_8).writeLines(entries); } catch (IOException e) { @@ -340,7 +335,7 @@ public void put(Digest directory, Iterable entries) { // BatchMode is only used in the worker startup. if (batchMode) { synchronized (queues[index]) { - queues[index].add(new MapEntry(entry, DigestUtil.toString(directory))); + queues[index].add(new String[]{entry, DigestUtil.toString(directory)}); int current = queueSize.incrementAndGet(); if (current % (1000 * 1000) == 0) { logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + " million"); @@ -392,14 +387,4 @@ public synchronized void remove(Digest directory) { } } } - - private static class MapEntry { - String entry; - String digest; - - MapEntry(String entry, String digest) { - this.entry = entry; - this.digest = digest; - } - } } From 589fcbac2ba21d9fe078067bd51ef1135982d711 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Mon, 6 Jul 2020 01:05:08 -0400 Subject: [PATCH 26/36] adjust logging --- src/main/java/build/buildfarm/cas/CASFileCache.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index 7da98d7e52..b7b6468502 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -1280,7 +1280,7 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD Instant phase1Ending = Instant.now(); logger.log( Level.INFO, - "TIME_LOGGING: Phase 1 done: " + "Phase 1 Time: " + Duration.between(startTime, phase1Ending).getSeconds() + "s"); @@ -1293,7 +1293,7 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD Instant beforeIndexing = Instant.now(); logger.log( Level.INFO, - "TIME_LOGGING: Phase 2 Inserting done: " + "Phase 2 Inserting Time: " + Duration.between(phase1Ending, beforeIndexing).getSeconds() + "s"); @@ -1301,7 +1301,7 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD Instant afterIndexing = Instant.now(); logger.log( Level.INFO, - "TIME_LOGGING: Index Created: " + "Phase 2 Building Index Time: " + Duration.between(beforeIndexing, afterIndexing).getSeconds() + "s"); From 783ea4549299772a7a02741d710cc504a637981e Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 8 Jul 2020 02:24:33 -0400 Subject: [PATCH 27/36] use 40 times number of queue --- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 7702f68cc4..67e26328c4 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -56,7 +56,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); - private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 10; + private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 40; private static final int MAX_QUEUE_SIZE = 10 * 1000; private final Path root; From 2e3fe95a2d9da0882d33b2516b81a45470f2c2a0 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 8 Jul 2020 11:13:58 -0400 Subject: [PATCH 28/36] logging out of synchronization block --- .../java/build/buildfarm/cas/FileDirectoriesIndex.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 67e26328c4..8f5ceb25d0 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -336,10 +336,7 @@ public void put(Digest directory, Iterable entries) { if (batchMode) { synchronized (queues[index]) { queues[index].add(new String[]{entry, DigestUtil.toString(directory)}); - int current = queueSize.incrementAndGet(); - if (current % (1000 * 1000) == 0) { - logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + " million"); - } + queueSize.incrementAndGet(); if (queues[index].size() > MAX_QUEUE_SIZE) { addEntriesDirectory(index); } @@ -349,6 +346,10 @@ public void put(Digest directory, Iterable entries) { addEntriesDirectory(entry, directory); } } + int current = queueSize.get(); + if (current % (1000 * 1000) == 0) { + logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + " million"); + } } } From 5b031225b859c84c2888c0058aa1b6245471ff7f Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 8 Jul 2020 12:54:25 -0400 Subject: [PATCH 29/36] drain queue concurrently --- .../buildfarm/cas/FileDirectoriesIndex.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 8f5ceb25d0..4a7fe75064 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -56,8 +56,8 @@ class FileDirectoriesIndex implements DirectoriesIndex { protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private static final Charset UTF_8 = Charset.forName("UTF-8"); - private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 40; - private static final int MAX_QUEUE_SIZE = 10 * 1000; + private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 2; + private static final int MAX_QUEUE_SIZE = 1000 * 1000; private final Path root; private final int numOfdb; @@ -329,17 +329,21 @@ public void put(Digest directory, Iterable entries) { } catch (IOException e) { throw new RuntimeException(e); } - Set uniqueEntries = ImmutableSet.copyOf(entries); - for (String entry : uniqueEntries) { + + synchronized (this) { + if (queueSize.get() > MAX_QUEUE_SIZE) { + drainQueues(); + queueSize.set(0); + } + } + + for (String entry : entries) { int index = Math.abs(entry.hashCode()) % numOfdb; // BatchMode is only used in the worker startup. if (batchMode) { synchronized (queues[index]) { queues[index].add(new String[]{entry, DigestUtil.toString(directory)}); queueSize.incrementAndGet(); - if (queues[index].size() > MAX_QUEUE_SIZE) { - addEntriesDirectory(index); - } } } else { synchronized (conns[index]) { From 68e9d80755cd649340ff4679d7f4c3df67e3bae1 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 8 Jul 2020 13:16:52 -0400 Subject: [PATCH 30/36] adjust number of thread --- .../java/build/buildfarm/cas/FileDirectoriesIndex.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 4a7fe75064..4e472ce0dd 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -184,7 +184,7 @@ public void setBatchMode(boolean batchMode) throws InterruptedException { } private void drainQueues() { - int nThread = Runtime.getRuntime().availableProcessors(); + int nThread = queues.length; String threadNameFormat = "drain-queue-%d"; ExecutorService pool = Executors.newFixedThreadPool( @@ -330,6 +330,8 @@ public void put(Digest directory, Iterable entries) { throw new RuntimeException(e); } + Set uniqueEntries = ImmutableSet.copyOf(entries); + synchronized (this) { if (queueSize.get() > MAX_QUEUE_SIZE) { drainQueues(); @@ -337,7 +339,7 @@ public void put(Digest directory, Iterable entries) { } } - for (String entry : entries) { + for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; // BatchMode is only used in the worker startup. if (batchMode) { @@ -350,10 +352,6 @@ public void put(Digest directory, Iterable entries) { addEntriesDirectory(entry, directory); } } - int current = queueSize.get(); - if (current % (1000 * 1000) == 0) { - logger.log(Level.INFO, "Entry added: " + current / (1000 * 1000) + " million"); - } } } From ab29fa7f149e3dc8a441f87b221f929ac7dba4d2 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 8 Jul 2020 13:27:11 -0400 Subject: [PATCH 31/36] add synchronization --- .../buildfarm/cas/FileDirectoriesIndex.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 4e472ce0dd..e6efa55d58 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -303,21 +303,23 @@ private void addEntriesDirectory(int dbIndex) { logger.log(Level.INFO, "Start draining separate queue: " + dbIndex); String insertSql = "INSERT INTO entries (path, directory)\n" + " VALUES (?,?)"; - try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { - conns[dbIndex].setAutoCommit(false); - while (!queues[dbIndex].isEmpty()) { - String[] entry = queues[dbIndex].poll(); - if (entry == null) { - continue; + synchronized (queues[dbIndex]) { + try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { + conns[dbIndex].setAutoCommit(false); + while (!queues[dbIndex].isEmpty()) { + String[] entry = queues[dbIndex].poll(); + if (entry == null) { + continue; + } + insertStatement.setString(1, entry[0]); + insertStatement.setString(2, entry[1]); + insertStatement.addBatch(); } - insertStatement.setString(1, entry[0]); - insertStatement.setString(2, entry[1]); - insertStatement.addBatch(); + insertStatement.executeBatch(); + conns[dbIndex].commit(); + } catch (SQLException e) { + throw new RuntimeException(e); } - insertStatement.executeBatch(); - conns[dbIndex].commit(); - } catch (SQLException e) { - throw new RuntimeException(e); } logger.log(Level.INFO, "Drained separate queue: " + dbIndex); } From 4848ce10501b0abdb0038fef8e0dac566a3043d5 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Wed, 8 Jul 2020 22:38:22 -0400 Subject: [PATCH 32/36] double producer count --- src/main/java/build/buildfarm/cas/CASFileCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index b7b6468502..7b017436ec 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -1457,7 +1457,7 @@ private List computeDirectories(CacheScanResults cacheScanResults) throws IOException, InterruptedException { // create thread pool - int nThreads = Runtime.getRuntime().availableProcessors(); + int nThreads = Runtime.getRuntime().availableProcessors() * 2; String threadNameFormat = "compute-cache-pool-%d"; ExecutorService pool = Executors.newFixedThreadPool( From 8432eba9e01c7f2bfcd2e6b4be588cd2dc14520b Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 9 Jul 2020 16:56:15 -0400 Subject: [PATCH 33/36] use ConcurrentHashMap --- src/main/java/build/buildfarm/cas/CASFileCache.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index 7b017436ec..1dbf93fecc 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -128,7 +128,7 @@ public abstract class CASFileCache implements ContentAddressableStorage { private final Executor accessRecorder; private final ExecutorService expireService; - private final Map directoryStorage = Maps.newHashMap(); + private final Map directoryStorage = Maps.newConcurrentMap(); private final DirectoriesIndex directoriesIndex; private final String directoriesIndexDbName; private final LockMap locks = new LockMap(); @@ -1482,9 +1482,7 @@ private List computeDirectories(CacheScanResults cacheScanResults) if (digest != null && getDirectoryPath(digest).equals(path)) { DirectoryEntry e = new DirectoryEntry(directory, Deadline.after(10, SECONDS)); directoriesIndex.put(digest, inputsBuilder.build()); - synchronized (this) { directoryStorage.put(digest, e); - } } else { synchronized (invalidDirectories) { invalidDirectories.add(path); @@ -2227,9 +2225,7 @@ private ListenableFuture putDirectorySynchronized( ? Directory.getDefaultInstance() : directoriesByDigest.get(digest), Deadline.after(10, SECONDS)); - synchronized (this) { - directoryStorage.put(digest, e); - } + directoryStorage.put(digest, e); return path; }, service); From 8cc33ce8b403553fe25943810809d38095c34050 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 9 Jul 2020 19:46:04 -0400 Subject: [PATCH 34/36] unlimited queue --- .../build/buildfarm/cas/FileDirectoriesIndex.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index e6efa55d58..27aec8750f 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,7 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 2; - private static final int MAX_QUEUE_SIZE = 1000 * 1000; + private static final int MAX_QUEUE_SIZE = 1000 * 1000 * 1000; private final Path root; private final int numOfdb; @@ -334,12 +334,12 @@ public void put(Digest directory, Iterable entries) { Set uniqueEntries = ImmutableSet.copyOf(entries); - synchronized (this) { - if (queueSize.get() > MAX_QUEUE_SIZE) { - drainQueues(); - queueSize.set(0); - } - } + //synchronized (this) { + // if (queueSize.get() > MAX_QUEUE_SIZE) { + // drainQueues(); + // queueSize.set(0); + // } + //} for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; From 042c9e07800b163afd96213753f742e4335cee60 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 9 Jul 2020 19:54:13 -0400 Subject: [PATCH 35/36] add logging --- src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 27aec8750f..2360ca8c31 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -347,13 +347,16 @@ public void put(Digest directory, Iterable entries) { if (batchMode) { synchronized (queues[index]) { queues[index].add(new String[]{entry, DigestUtil.toString(directory)}); - queueSize.incrementAndGet(); } } else { synchronized (conns[index]) { addEntriesDirectory(entry, directory); } } + int current = queueSize.incrementAndGet(); + if (current % (1000000) == 0) { + logger.log(Level.INFO, current / 1000000 + " million"); + } } } From b258c246b87268b0c54e5d32b545b2efff6af7f5 Mon Sep 17 00:00:00 2001 From: Jianqiu Date: Thu, 9 Jul 2020 21:20:53 -0400 Subject: [PATCH 36/36] drain queue independently --- .../buildfarm/cas/FileDirectoriesIndex.java | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 2360ca8c31..158c233df1 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -57,7 +57,7 @@ class FileDirectoriesIndex implements DirectoriesIndex { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 2; - private static final int MAX_QUEUE_SIZE = 1000 * 1000 * 1000; + private static final int MAX_QUEUE_SIZE = 10 * 1000; private final Path root; private final int numOfdb; @@ -303,23 +303,21 @@ private void addEntriesDirectory(int dbIndex) { logger.log(Level.INFO, "Start draining separate queue: " + dbIndex); String insertSql = "INSERT INTO entries (path, directory)\n" + " VALUES (?,?)"; - synchronized (queues[dbIndex]) { - try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { - conns[dbIndex].setAutoCommit(false); - while (!queues[dbIndex].isEmpty()) { - String[] entry = queues[dbIndex].poll(); - if (entry == null) { - continue; - } - insertStatement.setString(1, entry[0]); - insertStatement.setString(2, entry[1]); - insertStatement.addBatch(); + try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { + conns[dbIndex].setAutoCommit(false); + while (!queues[dbIndex].isEmpty()) { + String[] entry = queues[dbIndex].poll(); + if (entry == null) { + continue; } - insertStatement.executeBatch(); - conns[dbIndex].commit(); - } catch (SQLException e) { - throw new RuntimeException(e); + insertStatement.setString(1, entry[0]); + insertStatement.setString(2, entry[1]); + insertStatement.addBatch(); } + insertStatement.executeBatch(); + conns[dbIndex].commit(); + } catch (SQLException e) { + throw new RuntimeException(e); } logger.log(Level.INFO, "Drained separate queue: " + dbIndex); } @@ -334,19 +332,15 @@ public void put(Digest directory, Iterable entries) { Set uniqueEntries = ImmutableSet.copyOf(entries); - //synchronized (this) { - // if (queueSize.get() > MAX_QUEUE_SIZE) { - // drainQueues(); - // queueSize.set(0); - // } - //} - for (String entry : uniqueEntries) { int index = Math.abs(entry.hashCode()) % numOfdb; // BatchMode is only used in the worker startup. if (batchMode) { synchronized (queues[index]) { queues[index].add(new String[]{entry, DigestUtil.toString(directory)}); + if (queues[index].size() > MAX_QUEUE_SIZE) { + addEntriesDirectory(index); + } } } else { synchronized (conns[index]) {