Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ALS-7810: Add data dictionary table to PFB output #127

Merged
merged 16 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import java.util.Map;

@JsonIgnoreProperties(ignoreUnknown = true)
public record Concept(String conceptPath, String name, String display, String dataset, String description, Map<String, String> meta) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Service
@ConditionalOnProperty("dictionary.host")
public class DictionaryService {

public static final ParameterizedTypeReference<List<Concept>> CONCEPT_LIST_TYPE_REFERENCE = new ParameterizedTypeReference<>() {
};
private final String dictionaryHost;
private final RestTemplate restTemplate;

@Autowired
public DictionaryService(@Value("${dictionary.host}") String dictionaryHostTemplate, @Value("${TARGET_STACK:}") String targetStack) {
if (targetStack != null && !targetStack.isEmpty()) {
this.dictionaryHost = dictionaryHostTemplate.replace("___TARGET_STACK___", targetStack);
} else {
this.dictionaryHost = dictionaryHostTemplate;
}
this.restTemplate = new RestTemplate();
}

public List<Concept> getConcepts(List<String> conceptPaths) {
return restTemplate.exchange(dictionaryHost + "/pic-sure-api-2/PICSURE/proxy/dictionary-api/concepts/detail", HttpMethod.POST, new HttpEntity<>(conceptPaths), CONCEPT_LIST_TYPE_REFERENCE).getBody();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.io;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.Concept;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.CodecFactory;
Expand All @@ -16,34 +20,43 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PfbWriter implements ResultWriter {

public static final String PATIENT_TABLE_PREFIX = "pic-sure-";
public static final String PATIENT_TABLE_PREFIX = "pic-sure-patients-";
public static final String DATA_DICTIONARY_TABLE_PREFIX = "pic-sure-data-dictionary-";
private Logger log = LoggerFactory.getLogger(PfbWriter.class);

private final DictionaryService dictionaryService;

private final Schema metadataSchema;
private final Schema nodeSchema;

private final String queryId;

private final String patientTableName;
private final String dataDictionaryTableName;
private SchemaBuilder.FieldAssembler<Schema> entityFieldAssembler;

private List<String> fields;
private List<String> originalFields;
private List<String> formattedFields;
private DataFileWriter<GenericRecord> dataFileWriter;
private File file;
private Schema entitySchema;
private Schema patientDataSchema;
private Schema dataDictionarySchema;
private Schema relationSchema;

private static final Set<String> SINGULAR_FIELDS = Set.of("patient_id");

public PfbWriter(File tempFile, String queryId) {
public PfbWriter(File tempFile, String queryId, DictionaryService dictionaryService) {
this.file = tempFile;
this.queryId = queryId;
this.dictionaryService = dictionaryService;
this.patientTableName = formatFieldName(PATIENT_TABLE_PREFIX + queryId);
this.dataDictionaryTableName = formatFieldName(DATA_DICTIONARY_TABLE_PREFIX + queryId);
entityFieldAssembler = SchemaBuilder.record("entity")
.namespace("edu.harvard.dbmi")
.fields();
Expand Down Expand Up @@ -71,11 +84,21 @@ public PfbWriter(File tempFile, String queryId) {

@Override
public void writeHeader(String[] data) {
fields = Arrays.stream(data.clone()).map(this::formatFieldName).collect(Collectors.toList());
originalFields = List.of(data);
formattedFields = originalFields.stream().map(this::formatFieldName).collect(Collectors.toList());

dataDictionarySchema = SchemaBuilder.record(dataDictionaryTableName)
.fields()
.requiredString("concept_path")
.name("drs_uri").type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault()
.nullableString("display", "null")
.nullableString("dataset", "null")
.nullableString("description", "null")
.endRecord();

SchemaBuilder.FieldAssembler<Schema> patientRecords = SchemaBuilder.record(patientTableName)
.fields();

fields.forEach(field -> {
formattedFields.forEach(field -> {
if (isSingularField(field)) {
patientRecords.nullableString(field, "null");
} else {
Expand All @@ -85,7 +108,7 @@ public void writeHeader(String[] data) {
});
patientDataSchema = patientRecords.endRecord();

Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema);
Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema, dataDictionarySchema);

entityFieldAssembler = entityFieldAssembler.name("object").type(objectSchema).noDefault();
entityFieldAssembler.nullableString("id", "null");
Expand All @@ -104,6 +127,60 @@ public void writeHeader(String[] data) {
}

writeMetadata();
writeDataDictionary();
}

private void writeDataDictionary() {
GenericRecord entityRecord = new GenericData.Record(entitySchema);;
Map<String, Concept> conceptMap = Map.of();
try {
conceptMap = dictionaryService.getConcepts(originalFields).stream()
.collect(Collectors.toMap(Concept::conceptPath, Function.identity()));
} catch (RuntimeException e) {
log.error("Error fetching concepts from dictionary service", e);
}

for (int i = 0; i < formattedFields.size(); i++) {
String formattedField = formattedFields.get(i);
if ("patient_id".equals(formattedField)) {
continue;
}
GenericRecord dataDictionaryData = new GenericData.Record(dataDictionarySchema);
dataDictionaryData.put("concept_path", formattedField);

Concept concept = conceptMap.get(originalFields.get(i));
List<String> drsUris = List.of();
if (concept != null) {
Map<String, String> meta = concept.meta();
if (meta != null) {
String drsUriJson = meta.get("drs_uri");
if (drsUriJson != null) {
try {
String[] drsUriArray = new ObjectMapper().readValue(drsUriJson, String[].class);
drsUris = List.of(drsUriArray);
} catch (JsonProcessingException e) {
log.error("Error parsing drs_uri as json: " + drsUriJson);
}
}
}
dataDictionaryData.put("display", concept.display());
dataDictionaryData.put("dataset", concept.dataset());
dataDictionaryData.put("description", concept.description());
}
dataDictionaryData.put("drs_uri", drsUris);

log.info("Writing " + formattedField + " to data dictonary table with drs_uris: " + drsUris);
entityRecord.put("object", dataDictionaryData);
entityRecord.put("name", dataDictionaryTableName);
entityRecord.put("id", formattedField);
entityRecord.put("relations", List.of());

try {
dataFileWriter.append(entityRecord);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

private boolean isSingularField(String field) {
Expand All @@ -126,7 +203,7 @@ private void writeMetadata() {
GenericRecord entityRecord = new GenericData.Record(entitySchema);

List<GenericRecord> nodeList = new ArrayList<>();
for (String field : fields) {
for (String field : formattedFields) {
GenericRecord nodeData = new GenericData.Record(nodeSchema);
nodeData.put("name", field);
nodeData.put("ontology_reference", "");
Expand Down Expand Up @@ -158,21 +235,21 @@ public void writeEntity(Collection<String[]> entities) {
@Override
public void writeMultiValueEntity(Collection<List<List<String>>> entities) {
entities.forEach(entity -> {
if (entity.size() != fields.size()) {
if (entity.size() != formattedFields.size()) {
throw new IllegalArgumentException("Entity length much match the number of fields in this document");
}
GenericRecord patientData = new GenericData.Record(patientDataSchema);
String patientId = "";
for(int i = 0; i < fields.size(); i++) {
if ("patient_id".equals(fields.get(i))) {
for(int i = 0; i < formattedFields.size(); i++) {
if ("patient_id".equals(formattedFields.get(i))) {
patientId = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : "";
}
if (isSingularField(fields.get(i))) {
if (isSingularField(formattedFields.get(i))) {
String entityValue = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : "";
patientData.put(fields.get(i), entityValue);
patientData.put(formattedFields.get(i), entityValue);
} else {
List<String> fieldValue = entity.get(i) != null ? entity.get(i) : List.of();
patientData.put(fields.get(i), fieldValue);
patientData.put(formattedFields.get(i), fieldValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public SignUrlService(

public void uploadFile(File file, String objectKey) {
S3Client s3 = S3Client.builder()
.region(region)
.region(this.region)
.build();

putS3Object(s3, bucketName, objectKey, file);
s3.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

public class ConceptTest {

@Test
public void jsonSerialization() throws JsonProcessingException {
Concept[] concepts = new Concept[]{new Concept("\\demographics\\age\\", "age", "AGE", null, "patient age", Map.of("drs_uri", "[\"a-drs.uri\", \"another-drs.uri\"]"))};
ObjectMapper objectMapper = new ObjectMapper();

String serialized = objectMapper.writeValueAsString(concepts);
Concept[] deserialized = objectMapper.readValue(serialized, Concept[].class);

assertEquals(List.of(concepts), List.of(deserialized));
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.io;

import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.Concept;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.*;


@ExtendWith(MockitoExtension.class)
public class PfbWriterTest {

@Mock
private DictionaryService dictionaryService;

@Test
public void writeValidPFB() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);

Mockito.when(dictionaryService.getConcepts(List.of("patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\")))
.thenReturn(List.of(new Concept("\\demographics\\age\\", "age", "AGE", null, "patient age", Map.of("drs_uri", "[\"a-drs.uri\", \"another-drs.uri\"]"))));

pfbWriter.writeHeader(new String[] {"patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"});
List<List<String>> nullableList = new ArrayList<>();
Expand All @@ -34,26 +46,25 @@ public void writeValidPFB() {
List.of(List.of(), List.of("75"), List.of())
));
pfbWriter.close();
// todo: validate this programatically
}

@Test
public void formatFieldName_spacesAndBackslashes_replacedWithUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);
String formattedName = pfbWriter.formatFieldName("\\Topmed Study Accession with Subject ID\\\\");
assertEquals("_Topmed_Study_Accession_with_Subject_ID__", formattedName);
}

@Test
public void formatFieldName_startsWithDigit_prependUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);
String formattedName = pfbWriter.formatFieldName("123Topmed Study Accession with Subject ID\\\\");
assertEquals("_123Topmed_Study_Accession_with_Subject_ID__", formattedName);
}

@Test
public void formatFieldName_randomGarbage_replaceWithUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);
String formattedName = pfbWriter.formatFieldName("$$$my garbage @vro var!able nam#");
assertEquals("___my_garbage__vro_var_able_nam_", formattedName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.stream.Collectors;

import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.PfbWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class QueryService {
private final CountProcessor countProcessor;
private final MultiValueQueryProcessor multiValueQueryProcessor;

private final DictionaryService dictionaryService;

HashMap<String, AsyncResult> results = new HashMap<>();


Expand All @@ -57,6 +60,7 @@ public QueryService (AbstractProcessor abstractProcessor,
TimeseriesProcessor timeseriesProcessor,
CountProcessor countProcessor,
MultiValueQueryProcessor multiValueQueryProcessor,
@Autowired(required = false) DictionaryService dictionaryService,
@Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit,
@Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads,
@Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) {
Expand All @@ -65,6 +69,7 @@ public QueryService (AbstractProcessor abstractProcessor,
this.timeseriesProcessor = timeseriesProcessor;
this.countProcessor = countProcessor;
this.multiValueQueryProcessor = multiValueQueryProcessor;
this.dictionaryService = dictionaryService;

SMALL_JOB_LIMIT = smallJobLimit;
SMALL_TASK_THREADS = smallTaskThreads;
Expand Down Expand Up @@ -136,7 +141,7 @@ private AsyncResult initializeResult(Query query) throws IOException {
String queryId = UUIDv5.UUIDFromString(query.toString()).toString();
ResultWriter writer;
if (ResultType.DATAFRAME_PFB.equals(query.getExpectedResultType())) {
writer = new PfbWriter(File.createTempFile("result-" + System.nanoTime(), ".avro"), queryId);
writer = new PfbWriter(File.createTempFile("result-" + System.nanoTime(), ".avro"), queryId, dictionaryService);
} else {
writer = new CsvWriter(File.createTempFile("result-" + System.nanoTime(), ".sstmp"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/

data-export.s3.bucket-name=pic-sure-auth-dev-data-export
data-export.s3.region=us-east-1
data-export.s3.signedUrl-expiry-minutes=60
data-export.s3.signedUrl-expiry-minutes=60

dictionary.host = http://wildfly.___TARGET_STACK___:8080/
Loading
Loading