Skip to content

Commit

Permalink
ALS-7014: Implement signed URL functionality for data exports (#119)
Browse files Browse the repository at this point in the history
* ALS-7165: Update table name to include dataset id
  • Loading branch information
ramari16 authored Sep 6, 2024
1 parent b222828 commit acf1147
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 31 deletions.
11 changes: 11 additions & 0 deletions data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<exclusions>
<!--Spring boot will complain about this dep on startup. It's not needed (we use SLF4J + Logback)-->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

</project>
16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dockerfile-maven-version>1.4.10</dockerfile-maven-version>
<aws.version>2.20.153</aws.version>
</properties>
<repositories>
<repository>
Expand Down Expand Up @@ -199,7 +200,7 @@
<dependency>
<groupId>edu.harvard.hms.dbmi.avillach</groupId>
<artifactId>pic-sure-resource-api</artifactId>
<version>2.1.0-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
Expand Down Expand Up @@ -320,7 +321,18 @@
<artifactId>snappy-java</artifactId>
<version>1.1.10.5</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws.version}</version>
<exclusions>
<!--Spring boot will complain about this dep on startup. It's not needed (we use SLF4J + Logback)-->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public MediaType getResponseType() {
return responseType;
}

public File getFile() {
return stream.getFile();
}

public static enum Status{
SUCCESS {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,8 @@ public long estimatedSize() {
public void closeWriter() {
writer.close();
}

public File getFile() {
return writer.getFile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.Codec;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
Expand All @@ -21,22 +20,30 @@

public class PfbWriter implements ResultWriter {

public static final String PATIENT_TABLE_PREFIX = "pic-sure-";
private Logger log = LoggerFactory.getLogger(PfbWriter.class);

private final Schema metadataSchema;
private final Schema nodeSchema;

private final String queryId;

private final String patientTableName;
private SchemaBuilder.FieldAssembler<Schema> entityFieldAssembler;

private List<String> fields;
private DataFileWriter<GenericRecord> dataFileWriter;
private File file;
private Schema entitySchema;
private Schema patientDataSchema;
private Schema relationSchema;

private static final Set<String> SINGULAR_FIELDS = Set.of("patient_id");

public PfbWriter(File tempFile) {
file = tempFile;
public PfbWriter(File tempFile, String queryId) {
this.file = tempFile;
this.queryId = queryId;
this.patientTableName = formatFieldName(PATIENT_TABLE_PREFIX + queryId);
entityFieldAssembler = SchemaBuilder.record("entity")
.namespace("edu.harvard.dbmi")
.fields();
Expand All @@ -53,12 +60,19 @@ public PfbWriter(File tempFile) {
metadataRecord.requiredString("misc");
metadataRecord = metadataRecord.name("nodes").type(SchemaBuilder.array().items(nodeSchema)).noDefault();
metadataSchema = metadataRecord.endRecord();


SchemaBuilder.FieldAssembler<Schema> relationRecord = SchemaBuilder.record("Relation")
.fields()
.requiredString("dst_name")
.requiredString("dst_id");
relationSchema = relationRecord.endRecord();
}

@Override
public void writeHeader(String[] data) {
fields = Arrays.stream(data.clone()).map(this::formatFieldName).collect(Collectors.toList());
SchemaBuilder.FieldAssembler<Schema> patientRecords = SchemaBuilder.record("patientData")
SchemaBuilder.FieldAssembler<Schema> patientRecords = SchemaBuilder.record(patientTableName)
.fields();

fields.forEach(field -> {
Expand All @@ -76,6 +90,7 @@ public void writeHeader(String[] data) {
entityFieldAssembler = entityFieldAssembler.name("object").type(objectSchema).noDefault();
entityFieldAssembler.nullableString("id", "null");
entityFieldAssembler.requiredString("name");
entityFieldAssembler = entityFieldAssembler.name("relations").type(SchemaBuilder.array().items(relationSchema)).noDefault();
entitySchema = entityFieldAssembler.endRecord();

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(entitySchema);
Expand Down Expand Up @@ -126,6 +141,7 @@ private void writeMetadata() {
entityRecord.put("object", metadata);
entityRecord.put("name", "metadata");
entityRecord.put("id", "null");
entityRecord.put("relations", List.of());

try {
dataFileWriter.append(entityRecord);
Expand Down Expand Up @@ -163,8 +179,9 @@ public void writeMultiValueEntity(Collection<List<List<String>>> entities) {

GenericRecord entityRecord = new GenericData.Record(entitySchema);
entityRecord.put("object", patientData);
entityRecord.put("name", "patientData");
entityRecord.put("name", patientTableName);
entityRecord.put("id", patientId);
entityRecord.put("relations", List.of());

try {
dataFileWriter.append(entityRecord);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.upload;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest;
import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest;

import java.io.File;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

@Component
public class SignUrlService {

private final String bucketName;
private final int signedUrlExpiryMinutes;
private final Region region;

private static Logger log = LoggerFactory.getLogger(SignUrlService.class);

@Autowired
public SignUrlService(
@Value("${data-export.s3.bucket-name:}") String bucketName,
@Value("${data-export.s3.region:us-east-1}") String region,
@Value("${data-export.s3.signedUrl-expiry-minutes:60}") int signedUrlExpiryMinutes
) {
this.bucketName = bucketName;
this.signedUrlExpiryMinutes = signedUrlExpiryMinutes;
this.region = Region.of(region);
}

public void uploadFile(File file, String objectKey) {
S3Client s3 = S3Client.builder()
.region(region)
.build();

putS3Object(s3, bucketName, objectKey, file);
s3.close();
}

// This example uses RequestBody.fromFile to avoid loading the whole file into
// memory.
public void putS3Object(S3Client s3, String bucketName, String objectKey, File file) {
Map<String, String> metadata = new HashMap<>();
PutObjectRequest putOb = PutObjectRequest.builder()
.bucket(bucketName)
.key(objectKey)
.metadata(metadata)
.build();

s3.putObject(putOb, RequestBody.fromFile(file));
log.info("Successfully placed " + objectKey + " into bucket " + bucketName);
}

public String createPresignedGetUrl(String keyName) {
PresignedGetObjectRequest presignedRequest;
try (S3Presigner presigner = S3Presigner.builder().region(region).build()) {
GetObjectRequest objectRequest = GetObjectRequest.builder()
.bucket(bucketName)
.key(keyName)
.build();

GetObjectPresignRequest presignRequest = GetObjectPresignRequest.builder()
.signatureDuration(Duration.ofMinutes(signedUrlExpiryMinutes)) // The URL will expire in 10 minutes.
.getObjectRequest(objectRequest)
.build();

presignedRequest = presigner.presignGetObject(presignRequest);
}
log.info("Presigned URL: [{}]", presignedRequest.url().toString());

return presignedRequest.url().toExternalForm();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.*;

Expand All @@ -15,7 +16,7 @@ public class PfbWriterTest {

@Test
public void writeValidPFB() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"));
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());

pfbWriter.writeHeader(new String[] {"patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"});
List<List<String>> nullableList = new ArrayList<>();
Expand All @@ -38,21 +39,21 @@ public void writeValidPFB() {

@Test
public void formatFieldName_spacesAndBackslashes_replacedWithUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"));
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
String formattedName = pfbWriter.formatFieldName("\\Topmed Study Accession with Subject ID\\\\");
assertEquals("_Topmed_Study_Accession_with_Subject_ID__", formattedName);
}

@Test
public void formatFieldName_startsWithDigit_prependUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"));
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
String formattedName = pfbWriter.formatFieldName("123Topmed Study Accession with Subject ID\\\\");
assertEquals("_123Topmed_Study_Accession_with_Subject_ID__", formattedName);
}

@Test
public void formatFieldName_randomGarbage_replaceWithUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"));
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
String formattedName = pfbWriter.formatFieldName("$$$my garbage @vro var!able nam#");
assertEquals("___my_garbage__vro_var_able_nam_", formattedName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package edu.harvard.hms.dbmi.avillach.hpds.service;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.InfoColumnMeta;
import edu.harvard.hms.dbmi.avillach.hpds.processing.upload.SignUrlService;
import edu.harvard.hms.dbmi.avillach.hpds.service.util.Paginator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -28,7 +28,6 @@
import edu.harvard.dbmi.avillach.domain.*;
import edu.harvard.dbmi.avillach.util.UUIDv5;
import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.processing.*;
Expand All @@ -41,13 +40,15 @@ public class PicSureService {

@Autowired
public PicSureService(QueryService queryService, TimelineProcessor timelineProcessor, CountProcessor countProcessor,
VariantListProcessor variantListProcessor, AbstractProcessor abstractProcessor, Paginator paginator) {
VariantListProcessor variantListProcessor, AbstractProcessor abstractProcessor, Paginator paginator,
SignUrlService signUrlService) {
this.queryService = queryService;
this.timelineProcessor = timelineProcessor;
this.countProcessor = countProcessor;
this.variantListProcessor = variantListProcessor;
this.abstractProcessor = abstractProcessor;
this.paginator = paginator;
this.signUrlService = signUrlService;
Crypto.loadDefaultKey();
}

Expand All @@ -67,6 +68,8 @@ public PicSureService(QueryService queryService, TimelineProcessor timelineProce

private final Paginator paginator;

private final SignUrlService signUrlService;

private static final String QUERY_METADATA_FIELD = "queryMetadata";
private static final int RESPONSE_CACHE_SIZE = 50;

Expand Down Expand Up @@ -213,19 +216,7 @@ private QueryStatus convertToQueryStatus(AsyncResult entity) {
public ResponseEntity queryResult(@PathVariable("resourceQueryId") UUID queryId, @RequestBody QueryRequest resultRequest) throws IOException {
AsyncResult result = queryService.getResultFor(queryId.toString());
if (result == null) {
// This happens sometimes when users immediately request the status for a query
// before it can be initialized. We wait a bit and try again before throwing an
// error.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
return ResponseEntity.status(500).build();
}

result = queryService.getResultFor(queryId.toString());
if (result == null) {
return ResponseEntity.status(404).build();
}
return ResponseEntity.status(404).build();
}
if (result.getStatus() == AsyncResult.Status.SUCCESS) {
result.open();
Expand All @@ -237,6 +228,25 @@ public ResponseEntity queryResult(@PathVariable("resourceQueryId") UUID queryId,
}
}

@PostMapping(value = "/query/{resourceQueryId}/signed-url")
public ResponseEntity querySignedURL(@PathVariable("resourceQueryId") UUID queryId, @RequestBody QueryRequest resultRequest) throws IOException {
AsyncResult result = queryService.getResultFor(queryId.toString());
if (result == null) {
return ResponseEntity.status(404).build();
}
if (result.getStatus() == AsyncResult.Status.SUCCESS) {
File file = result.getFile();
signUrlService.uploadFile(file, file.getName());
String presignedGetUrl = signUrlService.createPresignedGetUrl(file.getName());
log.info("Presigned url: " + presignedGetUrl);
return ResponseEntity.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(new SignedUrlResponse(presignedGetUrl));
} else {
return ResponseEntity.status(400).body("Status : " + result.getStatus().name());
}
}

@PostMapping("/query/{resourceQueryId}/status")
public QueryStatus queryStatus(@PathVariable("resourceQueryId") UUID queryId, @RequestBody QueryRequest request) {
return convertToQueryStatus(queryService.getStatusFor(queryId.toString()));
Expand Down
Loading

0 comments on commit acf1147

Please sign in to comment.