diff --git a/src/main/java/build/buildfarm/CASTest.java b/src/main/java/build/buildfarm/CASTest.java index 4ce10572cb..14280577da 100644 --- a/src/main/java/build/buildfarm/CASTest.java +++ b/src/main/java/build/buildfarm/CASTest.java @@ -65,8 +65,8 @@ public static void main(String[] args) throws Exception { CASFileCache fileCache = new LocalCASFileCache( root, - /* maxSizeInBytes=*/ GBtoBytes(500), - new DigestUtil(HashFunction.SHA1), + /* maxSizeInBytes=*/ GBtoBytes(2 * 1024), + new DigestUtil(HashFunction.SHA256), /* expireService=*/ newDirectExecutorService(), /* accessRecorder=*/ directExecutor()); diff --git a/src/main/java/build/buildfarm/cas/CASFileCache.java b/src/main/java/build/buildfarm/cas/CASFileCache.java index c657fad5cd..1dbf93fecc 100644 --- a/src/main/java/build/buildfarm/cas/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/CASFileCache.java @@ -116,7 +116,6 @@ public abstract class CASFileCache implements ContentAddressableStorage { private static final Logger logger = Logger.getLogger(CASFileCache.class.getName()); protected static final String DEFAULT_DIRECTORIES_INDEX_NAME = "directories.sqlite"; - protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; private final Path root; private final FileStore fileStore; @@ -129,7 +128,7 @@ public abstract class CASFileCache implements ContentAddressableStorage { private final Executor accessRecorder; private final ExecutorService expireService; - private final Map directoryStorage = Maps.newHashMap(); + private final Map directoryStorage = Maps.newConcurrentMap(); private final DirectoriesIndex directoriesIndex; private final String directoriesIndexDbName; private final LockMap locks = new LockMap(); @@ -286,24 +285,7 @@ public CASFileCache( this.onExpire = onExpire; this.delegate = delegate; this.directoriesIndexDbName = directoriesIndexDbName; - - String directoriesIndexUrl = "jdbc:sqlite:"; - if (directoriesIndexDbName.equals(DIRECTORIES_INDEX_NAME_MEMORY)) { - directoriesIndexUrl += directoriesIndexDbName; - } else { - // db is ephemeral for now, no reuse occurs to match it, computation - // occurs each time anyway, and expected use of put is noop on collision - Path path = getPath(directoriesIndexDbName); - try { - if (Files.exists(path)) { - Files.delete(path); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - directoriesIndexUrl += path.toString(); - } - this.directoriesIndex = new FileDirectoriesIndex(directoriesIndexUrl, root); + this.directoriesIndex = new FileDirectoriesIndex(directoriesIndexDbName, root); header.before = header.after = header; } @@ -1295,6 +1277,12 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD CacheScanResults cacheScanResults = scanRoot(); LogCacheScanResults(cacheScanResults); deleteInvalidFileContent(cacheScanResults.deleteFiles, removeDirectoryService); + Instant phase1Ending = Instant.now(); + logger.log( + Level.INFO, + "Phase 1 Time: " + + Duration.between(startTime, phase1Ending).getSeconds() + + "s"); // Phase 2: Compute // recursively construct all directory structures. @@ -1302,9 +1290,20 @@ public StartupCacheResults start(Consumer onPut, ExecutorService removeD LogComputeDirectoriesResults(invalidDirectories); deleteInvalidFileContent(invalidDirectories, removeDirectoryService); - logger.log(Level.INFO, "Creating Index"); + Instant beforeIndexing = Instant.now(); + logger.log( + Level.INFO, + "Phase 2 Inserting Time: " + + Duration.between(phase1Ending, beforeIndexing).getSeconds() + + "s"); + directoriesIndex.start(); - logger.log(Level.INFO, "Index Created"); + Instant afterIndexing = Instant.now(); + logger.log( + Level.INFO, + "Phase 2 Building Index Time: " + + Duration.between(beforeIndexing, afterIndexing).getSeconds() + + "s"); // Calculate Startup time Instant endTime = Instant.now(); @@ -1400,7 +1399,7 @@ private void processRootFile( // ignore our directories index database // indexes will be removed and rebuilt for compute - if (!basename.equals(directoriesIndexDbName)) { + if (!basename.startsWith(directoriesIndexDbName)) { FileStatus stat = stat(file, false); // mark directory for later key compute @@ -1458,7 +1457,7 @@ private List computeDirectories(CacheScanResults cacheScanResults) throws IOException, InterruptedException { // create thread pool - int nThreads = Runtime.getRuntime().availableProcessors(); + int nThreads = Runtime.getRuntime().availableProcessors() * 2; String threadNameFormat = "compute-cache-pool-%d"; ExecutorService pool = Executors.newFixedThreadPool( @@ -1466,6 +1465,7 @@ private List computeDirectories(CacheScanResults cacheScanResults) ImmutableList.Builder invalidDirectories = new ImmutableList.Builder<>(); + directoriesIndex.setBatchMode(true); for (Path path : cacheScanResults.computeDirs) { pool.execute( () -> { @@ -1482,9 +1482,7 @@ private List computeDirectories(CacheScanResults cacheScanResults) if (digest != null && getDirectoryPath(digest).equals(path)) { DirectoryEntry e = new DirectoryEntry(directory, Deadline.after(10, SECONDS)); directoriesIndex.put(digest, inputsBuilder.build()); - synchronized (this) { directoryStorage.put(digest, e); - } } else { synchronized (invalidDirectories) { invalidDirectories.add(path); @@ -1497,6 +1495,7 @@ private List computeDirectories(CacheScanResults cacheScanResults) } joinThreads(pool, "Populating Directories...", 1, MINUTES); + directoriesIndex.setBatchMode(false); return invalidDirectories.build(); } @@ -2226,9 +2225,7 @@ private ListenableFuture putDirectorySynchronized( ? Directory.getDefaultInstance() : directoriesByDigest.get(digest), Deadline.after(10, SECONDS)); - synchronized (this) { - directoryStorage.put(digest, e); - } + directoryStorage.put(digest, e); return path; }, service); diff --git a/src/main/java/build/buildfarm/cas/DirectoriesIndex.java b/src/main/java/build/buildfarm/cas/DirectoriesIndex.java index 9af93feb54..89884e41b0 100644 --- a/src/main/java/build/buildfarm/cas/DirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/DirectoriesIndex.java @@ -34,4 +34,6 @@ interface DirectoriesIndex { void remove(Digest directory); void start(); + + void setBatchMode(boolean batchMode) throws InterruptedException; } diff --git a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java index 01f0abab69..158c233df1 100644 --- a/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/FileDirectoriesIndex.java @@ -21,6 +21,7 @@ import build.buildfarm.common.DigestUtil; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; @@ -32,8 +33,16 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Queue; import java.util.Set; -import javax.annotation.concurrent.GuardedBy; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Ephemeral file manifestations of the entry/directory mappings Directory entries are stored in @@ -42,46 +51,92 @@ *

Sqlite db should be removed prior to using this index */ class FileDirectoriesIndex implements DirectoriesIndex { + private static final Logger logger = Logger.getLogger(CASFileCache.class.getName()); + + protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:"; + private static final Charset UTF_8 = Charset.forName("UTF-8"); + private static final int DEFAULT_NUM_OF_DB = Runtime.getRuntime().availableProcessors() * 2; + private static final int MAX_QUEUE_SIZE = 10 * 1000; - private final String dbUrl; private final Path root; + private final int numOfdb; - private boolean opened = false; - private Connection conn; + private String[] dbUrls; + private boolean[] isOpen; + private Connection[] conns; - FileDirectoriesIndex(String dbUrl, Path root) { - this.dbUrl = dbUrl; + private boolean batchMode = false; + private Queue[] queues; + private AtomicInteger queueSize = new AtomicInteger(0); + + FileDirectoriesIndex(String directoriesIndexDbName, Path root, int numOfdb) { this.root = root; + this.numOfdb = numOfdb; + this.dbUrls = new String[this.numOfdb]; + String directoriesIndexUrl = "jdbc:sqlite:"; + if (directoriesIndexDbName.equals(DIRECTORIES_INDEX_NAME_MEMORY)) { + directoriesIndexUrl += directoriesIndexDbName; + Arrays.fill(dbUrls, directoriesIndexUrl); + } else { + // db is ephemeral for now, no reuse occurs to match it, computation + // occurs each time anyway, and expected use of put is noop on collision + for (int i = 0; i < dbUrls.length; i++) { + Path path = root.resolve(directoriesIndexDbName + i); + try { + if (Files.exists(path)) { + Files.delete(path); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + dbUrls[i] = directoriesIndexUrl + path.toString(); + } + } + + isOpen = new boolean[this.numOfdb]; + conns = new Connection[this.numOfdb]; + queues = new Queue[this.numOfdb]; + + for (int i = 0; i < queues.length; i++) { + queues[i] = new LinkedList<>(); + } + + open(); + } + + FileDirectoriesIndex(String dbUrl, Path root) { + this(dbUrl, root, DEFAULT_NUM_OF_DB); } - @GuardedBy("this") private void open() { - if (!opened) { - try { - conn = DriverManager.getConnection(dbUrl); - try (Statement safetyStatement = conn.createStatement()) { - safetyStatement.execute("PRAGMA synchronous=OFF"); - safetyStatement.execute("PRAGMA journal_mode=OFF"); - safetyStatement.execute("PRAGMA cache_size=100000"); + for (int i = 0; i < isOpen.length; i++) { + if (!isOpen[i]) { + try { + conns[i] = DriverManager.getConnection(dbUrls[i]); + try (Statement safetyStatement = conns[i].createStatement()) { + safetyStatement.execute("PRAGMA synchronous=OFF"); + safetyStatement.execute("PRAGMA journal_mode=OFF"); + safetyStatement.execute("PRAGMA cache_size=100000"); + } + } catch (SQLException e) { + throw new RuntimeException(e); } - } catch (SQLException e) { - throw new RuntimeException(e); - } - String createEntriesSql = - "CREATE TABLE entries (\n" - + " path TEXT NOT NULL,\n" - + " directory TEXT NOT NULL\n" - + ")"; + String createEntriesSql = + "CREATE TABLE entries (\n" + + " path TEXT NOT NULL,\n" + + " directory TEXT NOT NULL\n" + + ")"; - try (Statement stmt = conn.createStatement()) { - stmt.execute(createEntriesSql); - } catch (SQLException e) { - throw new RuntimeException(e); - } + try (Statement stmt = conns[i].createStatement()) { + stmt.execute(createEntriesSql); + } catch (SQLException e) { + throw new RuntimeException(e); + } - opened = true; + isOpen[i] = true; + } } } @@ -90,27 +145,85 @@ public synchronized void start() { open(); String createIndexSql = "CREATE INDEX path_idx ON entries (path)"; - try (Statement stmt = conn.createStatement()) { - stmt.execute(createIndexSql); - } catch (SQLException e) { - throw new RuntimeException(e); + int nThread = Runtime.getRuntime().availableProcessors(); + String threadNameFormat = "create-sqlite-index-%d"; + ExecutorService pool = + Executors.newFixedThreadPool( + nThread, new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build()); + + for (Connection conn : conns) { + pool.execute( + () -> { + try (Statement stmt = conn.createStatement()) { + stmt.execute(createIndexSql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); + } + + pool.shutdown(); + while (!pool.isTerminated()) { + try { + pool.awaitTermination(10, TimeUnit.SECONDS); + logger.log(Level.INFO, "Creating Index ..."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void setBatchMode(boolean batchMode) throws InterruptedException { + this.batchMode = batchMode; + if (batchMode) { + return; + } + drainQueues(); + queueSize.set(0); + } + + private void drainQueues() { + int nThread = queues.length; + String threadNameFormat = "drain-queue-%d"; + ExecutorService pool = + Executors.newFixedThreadPool( + nThread, new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build()); + for (int i = 0; i < queues.length; i++) { + int index = i; + pool.execute(() -> addEntriesDirectory(index)); + } + + logger.log(Level.INFO, "Start to drain the queue."); + pool.shutdown(); + while (!pool.isTerminated()) { + try { + pool.awaitTermination(10, TimeUnit.SECONDS); + logger.log(Level.INFO, "Draining all queues ..."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + logger.log(Level.INFO, "Queue Empty"); } @Override public void close() { - try { - conn.close(); - } catch (SQLException e) { - throw new RuntimeException(e); + for (int i = 0; i < conns.length; i++) { + try { + conns[i].close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + isOpen[i] = false; } - opened = false; } - @GuardedBy("this") + // The method should be GuardedBy(conns[Math.abs(entry.hashCode()) % DATABASE_NUMBER]). private Set removeEntryDirectories(String entry) { open(); + Connection conn = conns[Math.abs(entry.hashCode()) % numOfdb]; String selectSql = "SELECT directory FROM entries WHERE path = ?"; String deleteSql = "DELETE FROM entries where path = ?"; @@ -136,8 +249,12 @@ Path path(Digest digest) { } @Override - public synchronized Set removeEntry(String entry) { - Set directories = removeEntryDirectories(entry); + public Set removeEntry(String entry) { + int dbIndex = Math.abs(entry.hashCode()) % numOfdb; + Set directories; + synchronized (conns[dbIndex]) { + directories = removeEntryDirectories(entry); + } try { for (Digest directory : directories) { Files.delete(path(directory)); @@ -159,23 +276,50 @@ public Iterable directoryEntries(Digest directory) { } } - private synchronized void addEntriesDirectory(Set entries, Digest directory) { + // The method should be GuardedBy(conns[Math.abs(entry.hashCode()) % DATABASE_NUMBER]). + private void addEntriesDirectory(String entry, Digest directory) { open(); String digest = DigestUtil.toString(directory); - String insertSql = "INSERT INTO entries (path, directory) VALUES (?,?)"; - try (PreparedStatement insertStatement = conn.prepareStatement(insertSql)) { - conn.setAutoCommit(false); + String insertSql = "INSERT INTO entries (path, directory)\n" + " VALUES (?,?)"; + int dbIndex = Math.abs(entry.hashCode()) % numOfdb; + try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { + conns[dbIndex].setAutoCommit(false); + insertStatement.setString(1, entry); insertStatement.setString(2, digest); - for (String entry : entries) { - insertStatement.setString(1, entry); + insertStatement.executeUpdate(); + conns[dbIndex].commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void addEntriesDirectory(int dbIndex) { + open(); + + if (queues[dbIndex].isEmpty()) { + return; + } + + logger.log(Level.INFO, "Start draining separate queue: " + dbIndex); + String insertSql = "INSERT INTO entries (path, directory)\n" + " VALUES (?,?)"; + try (PreparedStatement insertStatement = conns[dbIndex].prepareStatement(insertSql)) { + conns[dbIndex].setAutoCommit(false); + while (!queues[dbIndex].isEmpty()) { + String[] entry = queues[dbIndex].poll(); + if (entry == null) { + continue; + } + insertStatement.setString(1, entry[0]); + insertStatement.setString(2, entry[1]); insertStatement.addBatch(); } insertStatement.executeBatch(); - conn.commit(); + conns[dbIndex].commit(); } catch (SQLException e) { throw new RuntimeException(e); } + logger.log(Level.INFO, "Drained separate queue: " + dbIndex); } @Override @@ -185,24 +329,45 @@ public void put(Digest directory, Iterable entries) { } catch (IOException e) { throw new RuntimeException(e); } - addEntriesDirectory(ImmutableSet.copyOf(entries), directory); + + Set uniqueEntries = ImmutableSet.copyOf(entries); + + for (String entry : uniqueEntries) { + int index = Math.abs(entry.hashCode()) % numOfdb; + // BatchMode is only used in the worker startup. + if (batchMode) { + synchronized (queues[index]) { + queues[index].add(new String[]{entry, DigestUtil.toString(directory)}); + if (queues[index].size() > MAX_QUEUE_SIZE) { + addEntriesDirectory(index); + } + } + } else { + synchronized (conns[index]) { + addEntriesDirectory(entry, directory); + } + } + int current = queueSize.incrementAndGet(); + if (current % (1000000) == 0) { + logger.log(Level.INFO, current / 1000000 + " million"); + } + } } - @GuardedBy("this") - private void removeEntriesDirectory(Iterable entries, Digest directory) { + // The method should be GuardedBy(conns[Math.abs(entry.hashCode()) % DATABASE_NUMBER]). + private void removeEntriesDirectory(String entry, Digest directory) { open(); String digest = DigestUtil.toString(directory); String deleteSql = "DELETE FROM entries WHERE path = ? AND directory = ?"; - try (PreparedStatement deleteStatement = conn.prepareStatement(deleteSql)) { - conn.setAutoCommit(false); + int dbIndex = Math.abs(entry.hashCode()) % numOfdb; + try (PreparedStatement deleteStatement = conns[dbIndex].prepareStatement(deleteSql)) { + conns[dbIndex].setAutoCommit(false); // safe for multi delete + deleteStatement.setString(1, entry); deleteStatement.setString(2, digest); - for (String entry : entries) { - deleteStatement.setString(1, entry); - deleteStatement.executeUpdate(); - } - conn.commit(); + deleteStatement.executeUpdate(); + conns[dbIndex].commit(); } catch (SQLException e) { throw new RuntimeException(e); } @@ -218,6 +383,10 @@ public synchronized void remove(Digest directory) { } catch (IOException e) { throw new RuntimeException(e); } - removeEntriesDirectory(entries, directory); + for (String entry : entries) { + synchronized (conns[Math.abs(entry.hashCode()) % numOfdb]) { + removeEntriesDirectory(entry, directory); + } + } } } diff --git a/src/main/java/build/buildfarm/cas/MemoryDirectoriesIndex.java b/src/main/java/build/buildfarm/cas/MemoryDirectoriesIndex.java index 7e8f9b61f8..def4577a61 100644 --- a/src/main/java/build/buildfarm/cas/MemoryDirectoriesIndex.java +++ b/src/main/java/build/buildfarm/cas/MemoryDirectoriesIndex.java @@ -39,6 +39,9 @@ public void close() {} @Override public void start() {} + @Override + public void setBatchMode(boolean batchMode) {} + @Override public synchronized Set removeEntry(String entry) { Set directories = entryDirectories.removeAll(entry); diff --git a/src/test/java/build/buildfarm/cas/FileDirectoriesIndexTest.java b/src/test/java/build/buildfarm/cas/FileDirectoriesIndexTest.java index e83e9c6664..33323bdd84 100644 --- a/src/test/java/build/buildfarm/cas/FileDirectoriesIndexTest.java +++ b/src/test/java/build/buildfarm/cas/FileDirectoriesIndexTest.java @@ -36,7 +36,7 @@ public class FileDirectoriesIndexTest { private final DigestUtil DIGEST_UTIL = new DigestUtil(HashFunction.SHA256); - private final String jdbcIndexUrl = "jdbc:sqlite::memory:"; + private final String jdbcIndexUrl = ":memory:"; private Path root; private FileDirectoriesIndex directoriesIndex;