Skip to content

Commit

Permalink
Merge branch 'main' into maintainer_updates
Browse files Browse the repository at this point in the history
  • Loading branch information
sumobrian authored May 2, 2024
2 parents f24c6d2 + afa6599 commit 7f454df
Show file tree
Hide file tree
Showing 98 changed files with 2,593 additions and 1,089 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# W504 - Line break occurred after a binary operator
ignore = E265,E402,E999,W293,W504
max-line-length = 120
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv,*/cdk.out/*

# F401 - Unused imports -- this is the only way to have a file-wide rule exception
per-file-ignores =
Expand Down
4 changes: 2 additions & 2 deletions FetchMigration/python/tests/test_endpoint_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from typing import Optional
from unittest.mock import MagicMock, patch

from moto import mock_iam
from moto import mock_aws

import endpoint_utils
from endpoint_info import EndpointInfo
Expand Down Expand Up @@ -284,7 +284,7 @@ def test_derive_aws_region(self):
# Should return region successfully
self.assertEqual("test-region", endpoint_utils.get_aws_region(test_config))

@mock_iam
@mock_aws
def test_get_aws_sigv4_auth(self):
result = endpoint_utils.get_aws_sigv4_auth("test")
self.assertEqual(result.service, "es")
Expand Down
29 changes: 26 additions & 3 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
plugins {
id 'application'
id 'java'
id 'jacoco'
id "com.avast.gradle.docker-compose" version "0.17.4"
id 'com.bmuschko.docker-remote-api'
id 'io.freefair.lombok' version '8.6'
Expand All @@ -25,6 +26,11 @@ repositories {
mavenCentral()
}

ext {
awsSdkVersion = '2.25.16'
dataset = findProperty('dataset') ?: 'skip_dataset'
}

dependencies {
implementation 'com.beust:jcommander:1.81'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.2'
Expand All @@ -41,7 +47,11 @@ dependencies {
implementation platform('io.projectreactor:reactor-bom:2023.0.5')
implementation 'io.projectreactor.netty:reactor-netty-core'
implementation 'io.projectreactor.netty:reactor-netty-http'
implementation 'software.amazon.awssdk:s3:2.25.16'

implementation platform("software.amazon.awssdk:bom:$awsSdkVersion")
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:s3-transfer-manager'
implementation 'software.amazon.awssdk.crt:aws-crt:0.29.18'

testImplementation 'io.projectreactor:reactor-test:3.6.5'
testImplementation 'org.apache.logging.log4j:log4j-core:2.23.1'
Expand All @@ -61,8 +71,21 @@ clean.doFirst {
delete project.file("./docker/build")
}

ext {
dataset = findProperty('dataset') ?: 'skip_dataset'
// Utility task to allow copying required libraries into a 'dependencies' folder for security scanning
tasks.register('copyDependencies', Sync) {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE

from configurations.runtimeClasspath
into "${buildDir}/dependencies"
}

jacocoTestReport {
reports {
xml.required = true
xml.destination file("${buildDir}/reports/jacoco/test/jacocoTestReport.xml")
html.required = true
html.destination file("${buildDir}/reports/jacoco/test/html")
}
}

task demoPrintOutSnapshot (type: JavaExec) {
Expand Down
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static void main(String[] args) throws InterruptedException {
if (snapshotDirPath != null) {
repo = new FilesystemRepo(snapshotDirPath);
} else if (s3RepoUri != null && s3Region != null && s3LocalDirPath != null) {
repo = S3Repo.create(s3LocalDirPath, s3RepoUri, s3Region);
repo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region);
} else if (snapshotLocalRepoDirPath != null) {
repo = new FilesystemRepo(snapshotLocalRepoDirPath);
} else {
Expand Down
13 changes: 13 additions & 0 deletions RFS/src/main/java/com/rfs/common/FilesystemRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,56 @@ public FilesystemRepo(Path repoRootDir) {
this.repoRootDir = repoRootDir;
}

@Override
public Path getRepoRootDir() {
return repoRootDir;
}

@Override
public Path getSnapshotRepoDataFilePath() throws IOException {
return findRepoFile();
}

@Override
public Path getGlobalMetadataFilePath(String snapshotId) throws IOException {
String filePath = getRepoRootDir().toString() + "/meta-" + snapshotId + ".dat";
return Path.of(filePath);
}

@Override
public Path getSnapshotMetadataFilePath(String snapshotId) throws IOException {
String filePath = getRepoRootDir().toString() + "/snap-" + snapshotId + ".dat";
return Path.of(filePath);
}

@Override
public Path getIndexMetadataFilePath(String indexId, String indexFileId) throws IOException {
String filePath = getRepoRootDir().toString() + "/indices/" + indexId + "/meta-" + indexFileId + ".dat";
return Path.of(filePath);
}

@Override
public Path getShardDirPath(String indexId, int shardId) throws IOException {
String shardDirPath = getRepoRootDir().toString() + "/indices/" + indexId + "/" + shardId;
return Path.of(shardDirPath);
}

@Override
public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) throws IOException {
Path shardDirPath = getShardDirPath(indexId, shardId);
Path filePath = shardDirPath.resolve("snap-" + snapshotId + ".dat");
return filePath;
}

@Override
public Path getBlobFilePath(String indexId, int shardId, String blobName) throws IOException {
Path shardDirPath = getShardDirPath(indexId, shardId);
Path filePath = shardDirPath.resolve(blobName);
return filePath;
}

@Override
public void prepBlobFiles(ShardMetadata.Data shardMetadata) throws IOException {
// No work necessary for local filesystem
}
}
132 changes: 86 additions & 46 deletions RFS/src/main/java/com/rfs/common/S3Repo.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,37 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
import software.amazon.awssdk.transfer.s3.model.DirectoryDownload;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;

import java.util.Comparator;
import java.util.Optional;

public class S3Repo implements SourceRepo {
private static final Logger logger = LogManager.getLogger(S3Repo.class);
private static final double S3_TARGET_THROUGHPUT_GIBPS = 8.0; // Arbitrarily chosen
private static final long S3_MAX_MEMORY_BYTES = 1024L * 1024 * 1024; // Arbitrarily chosen
private static final long S3_MINIMUM_PART_SIZE_BYTES = 8L * 1024 * 1024; // Default, but be explicit

private final Path s3LocalDir;
private final String s3RepoUri;
private final S3Uri s3RepoUri;
private final String s3Region;
private final S3Client s3Client;
private final S3AsyncClient s3Client;

private static int extractVersion(String key) {
try {
Expand All @@ -35,130 +43,162 @@ private static int extractVersion(String key) {
}
}

protected String findRepoFileUri() {
String bucketName = getS3BucketName();
String prefix = getS3ObjectsPrefix();

protected S3Uri findRepoFileUri() {
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(prefix)
.bucket(s3RepoUri.bucketName)
.prefix(s3RepoUri.key)
.build();

ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest);
ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest).join();

Optional<S3Object> highestVersionedIndexFile = listResponse.contents().stream()
.filter(s3Object -> s3Object.key().matches(".*index-\\d+$")) // Regex to match index files
.max(Comparator.comparingInt(s3Object -> extractVersion(s3Object.key())));

return highestVersionedIndexFile
.map(s3Object -> "s3://" + bucketName + "/" + s3Object.key())
.orElse(null);
String rawUri = highestVersionedIndexFile
.map(s3Object -> "s3://" + s3RepoUri.bucketName + "/" + s3Object.key())
.orElse("");
return new S3Uri(rawUri);
}

protected void ensureS3LocalDirectoryExists(Path localPath) throws IOException {
Files.createDirectories(localPath);
}

private void downloadFile(String s3Uri, Path localPath) throws IOException {
logger.info("Downloading file from S3: " + s3Uri + " to " + localPath);
protected boolean doesFileExistLocally(Path localPath) {
return Files.exists(localPath);
}

private void ensureFileExistsLocally(S3Uri s3Uri, Path localPath) throws IOException {
ensureS3LocalDirectoryExists(localPath.getParent());

String bucketName = s3Uri.split("/")[2];
String key = s3Uri.split(bucketName + "/")[1];
if (doesFileExistLocally(localPath)) {
logger.debug("File already exists locally: " + localPath);
return;
}

logger.info("Downloading file from S3: " + s3Uri.uri + " to " + localPath);
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(bucketName)
.key(key)
.bucket(s3Uri.bucketName)
.key(s3Uri.key)
.build();

s3Client.getObject(getObjectRequest, ResponseTransformer.toFile(localPath));
s3Client.getObject(getObjectRequest, AsyncResponseTransformer.toFile(localPath)).join();
}

public static S3Repo create(Path s3LocalDir, String s3Uri, String s3Region) {
S3Client s3Client = S3Client.builder()
.region(Region.of(s3Region))
.credentialsProvider(DefaultCredentialsProvider.create())
.build();
public static S3Repo create(Path s3LocalDir, S3Uri s3Uri, String s3Region) {
S3AsyncClient s3Client = S3AsyncClient.crtBuilder()
.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.of(s3Region))
.retryConfiguration(r -> r.numRetries(3))
.targetThroughputInGbps(S3_TARGET_THROUGHPUT_GIBPS)
.maxNativeMemoryLimitInBytes(S3_MAX_MEMORY_BYTES)
.minimumPartSizeInBytes(S3_MINIMUM_PART_SIZE_BYTES)
.build();

return new S3Repo(s3LocalDir, s3Uri, s3Region, s3Client);
}

public S3Repo(Path s3LocalDir, String s3Uri, String s3Region, S3Client s3Client) {
public S3Repo(Path s3LocalDir, S3Uri s3Uri, String s3Region, S3AsyncClient s3Client) {
this.s3LocalDir = s3LocalDir;
this.s3RepoUri = s3Uri;
this.s3Region = s3Region;
this.s3Client = s3Client;
}

@Override
public Path getRepoRootDir() {
return s3LocalDir;
}

@Override
public Path getSnapshotRepoDataFilePath() throws IOException {
String repoFileS3Uri = findRepoFileUri();
S3Uri repoFileS3Uri = findRepoFileUri();

String relativeFileS3Uri = repoFileS3Uri.substring(s3RepoUri.length() + 1);
String relativeFileS3Uri = repoFileS3Uri.uri.substring(s3RepoUri.uri.length() + 1);

Path localFilePath = s3LocalDir.resolve(relativeFileS3Uri);
downloadFile(repoFileS3Uri, localFilePath);
ensureFileExistsLocally(repoFileS3Uri, localFilePath);

return localFilePath;
}

@Override
public Path getGlobalMetadataFilePath(String snapshotId) throws IOException {
String suffix = "meta-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

@Override
public Path getSnapshotMetadataFilePath(String snapshotId) throws IOException {
String suffix = "snap-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

@Override
public Path getIndexMetadataFilePath(String indexId, String indexFileId) throws IOException {
String suffix = "indices/" + indexId + "/meta-" + indexFileId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

@Override
public Path getShardDirPath(String indexId, int shardId) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId;
Path shardDirPath = s3LocalDir.resolve(suffix);
return shardDirPath;
}

@Override
public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId + "/snap-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

@Override
public Path getBlobFilePath(String indexId, int shardId, String blobName) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId + "/" + blobName;
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

public String getS3BucketName() {
String[] parts = s3RepoUri.substring(5).split("/", 2);
return parts[0];
}
@Override
public void prepBlobFiles(ShardMetadata.Data shardMetadata) throws IOException {
S3TransferManager transferManager = S3TransferManager.builder().s3Client(s3Client).build();

Path shardDirPath = getShardDirPath(shardMetadata.getIndexId(), shardMetadata.getShardId());
ensureS3LocalDirectoryExists(shardDirPath);

public String getS3ObjectsPrefix() {
String[] parts = s3RepoUri.substring(5).split("/", 2);
String prefix = parts.length > 1 ? parts[1] : "";
String blobFilesS3Prefix = s3RepoUri.key + "indices/" + shardMetadata.getIndexId() + "/" + shardMetadata.getShardId() + "/";

if (!prefix.isEmpty() && prefix.endsWith("/")) {
prefix = prefix.substring(0, prefix.length() - 1);
}
logger.info("Downloading blob files from S3: s3://" + s3RepoUri.bucketName + "/" + blobFilesS3Prefix + " to " + shardDirPath);
DirectoryDownload directoryDownload = transferManager.downloadDirectory(
DownloadDirectoryRequest.builder()
.destination(shardDirPath)
.bucket(s3RepoUri.bucketName)
.listObjectsV2RequestTransformer(l -> l.prefix(blobFilesS3Prefix))
.build()
);

// Wait for the transfer to complete
CompletedDirectoryDownload completedDirectoryDownload = directoryDownload.completionFuture().join();

logger.info("Blob file download(s) complete");

return prefix;
// Print out any failed downloads
completedDirectoryDownload.failedTransfers().forEach(logger::error);
}
}
Loading

0 comments on commit 7f454df

Please sign in to comment.