From 5365a094684dc641f9e53f0e16391e2691784894 Mon Sep 17 00:00:00 2001 From: ramari16 Date: Wed, 20 Nov 2024 12:35:52 -0500 Subject: [PATCH] ALS-7810: Add data dictionary table to PFB output (#127) --- .../hpds/processing/dictionary/Concept.java | 9 ++ .../dictionary/DictionaryService.java | 37 ++++++ .../hpds/processing/io/PfbWriter.java | 105 +++++++++++++++--- .../processing/upload/SignUrlService.java | 3 +- .../processing/dictionary/ConceptTest.java | 24 ++++ .../hpds/processing/io/PfbWriterTest.java | 21 +++- .../avillach/hpds/service/QueryService.java | 7 +- .../application-bdc-auth-dev.properties | 4 +- .../application-bdc-auth-prod.properties | 4 +- .../application-development.properties | 3 +- 10 files changed, 191 insertions(+), 26 deletions(-) create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/Concept.java create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/DictionaryService.java create mode 100644 processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/ConceptTest.java diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/Concept.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/Concept.java new file mode 100644 index 00000000..0e9da98c --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/Concept.java @@ -0,0 +1,9 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.Map; + +@JsonIgnoreProperties(ignoreUnknown = true) +public record Concept(String conceptPath, String name, String display, String dataset, String description, Map meta) { +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/DictionaryService.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/DictionaryService.java new file mode 100644 index 00000000..c6c10bef --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/DictionaryService.java @@ -0,0 +1,37 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpMethod; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; + +import java.util.List; + +@Service +@ConditionalOnProperty("dictionary.host") +public class DictionaryService { + + public static final ParameterizedTypeReference> CONCEPT_LIST_TYPE_REFERENCE = new ParameterizedTypeReference<>() { + }; + private final String dictionaryHost; + private final RestTemplate restTemplate; + + @Autowired + public DictionaryService(@Value("${dictionary.host}") String dictionaryHostTemplate, @Value("${TARGET_STACK:}") String targetStack) { + if (targetStack != null && !targetStack.isEmpty()) { + this.dictionaryHost = dictionaryHostTemplate.replace("___TARGET_STACK___", targetStack); + } else { + this.dictionaryHost = dictionaryHostTemplate; + } + this.restTemplate = new RestTemplate(); + } + + public List getConcepts(List conceptPaths) { + return restTemplate.exchange(dictionaryHost + "/pic-sure-api-2/PICSURE/proxy/dictionary-api/concepts/detail", HttpMethod.POST, new HttpEntity<>(conceptPaths), CONCEPT_LIST_TYPE_REFERENCE).getBody(); + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java index b10fc08c..55e8e3c7 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java @@ -1,5 +1,9 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing.io; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.Concept; +import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.CodecFactory; @@ -16,34 +20,43 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; public class PfbWriter implements ResultWriter { - public static final String PATIENT_TABLE_PREFIX = "pic-sure-"; + public static final String PATIENT_TABLE_PREFIX = "pic-sure-patients-"; + public static final String DATA_DICTIONARY_TABLE_PREFIX = "pic-sure-data-dictionary-"; private Logger log = LoggerFactory.getLogger(PfbWriter.class); + private final DictionaryService dictionaryService; + private final Schema metadataSchema; private final Schema nodeSchema; private final String queryId; private final String patientTableName; + private final String dataDictionaryTableName; private SchemaBuilder.FieldAssembler entityFieldAssembler; - private List fields; + private List originalFields; + private List formattedFields; private DataFileWriter dataFileWriter; private File file; private Schema entitySchema; private Schema patientDataSchema; + private Schema dataDictionarySchema; private Schema relationSchema; private static final Set SINGULAR_FIELDS = Set.of("patient_id"); - public PfbWriter(File tempFile, String queryId) { + public PfbWriter(File tempFile, String queryId, DictionaryService dictionaryService) { this.file = tempFile; this.queryId = queryId; + this.dictionaryService = dictionaryService; this.patientTableName = formatFieldName(PATIENT_TABLE_PREFIX + queryId); + this.dataDictionaryTableName = formatFieldName(DATA_DICTIONARY_TABLE_PREFIX + queryId); entityFieldAssembler = SchemaBuilder.record("entity") .namespace("edu.harvard.dbmi") .fields(); @@ -71,11 +84,21 @@ public PfbWriter(File tempFile, String queryId) { @Override public void writeHeader(String[] data) { - fields = Arrays.stream(data.clone()).map(this::formatFieldName).collect(Collectors.toList()); + originalFields = List.of(data); + formattedFields = originalFields.stream().map(this::formatFieldName).collect(Collectors.toList()); + + dataDictionarySchema = SchemaBuilder.record(dataDictionaryTableName) + .fields() + .requiredString("concept_path") + .name("drs_uri").type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault() + .nullableString("display", "null") + .nullableString("dataset", "null") + .nullableString("description", "null") + .endRecord(); + SchemaBuilder.FieldAssembler patientRecords = SchemaBuilder.record(patientTableName) .fields(); - - fields.forEach(field -> { + formattedFields.forEach(field -> { if (isSingularField(field)) { patientRecords.nullableString(field, "null"); } else { @@ -85,7 +108,7 @@ public void writeHeader(String[] data) { }); patientDataSchema = patientRecords.endRecord(); - Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema); + Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema, dataDictionarySchema); entityFieldAssembler = entityFieldAssembler.name("object").type(objectSchema).noDefault(); entityFieldAssembler.nullableString("id", "null"); @@ -104,6 +127,60 @@ public void writeHeader(String[] data) { } writeMetadata(); + writeDataDictionary(); + } + + private void writeDataDictionary() { + GenericRecord entityRecord = new GenericData.Record(entitySchema);; + Map conceptMap = Map.of(); + try { + conceptMap = dictionaryService.getConcepts(originalFields).stream() + .collect(Collectors.toMap(Concept::conceptPath, Function.identity())); + } catch (RuntimeException e) { + log.error("Error fetching concepts from dictionary service", e); + } + + for (int i = 0; i < formattedFields.size(); i++) { + String formattedField = formattedFields.get(i); + if ("patient_id".equals(formattedField)) { + continue; + } + GenericRecord dataDictionaryData = new GenericData.Record(dataDictionarySchema); + dataDictionaryData.put("concept_path", formattedField); + + Concept concept = conceptMap.get(originalFields.get(i)); + List drsUris = List.of(); + if (concept != null) { + Map meta = concept.meta(); + if (meta != null) { + String drsUriJson = meta.get("drs_uri"); + if (drsUriJson != null) { + try { + String[] drsUriArray = new ObjectMapper().readValue(drsUriJson, String[].class); + drsUris = List.of(drsUriArray); + } catch (JsonProcessingException e) { + log.error("Error parsing drs_uri as json: " + drsUriJson); + } + } + } + dataDictionaryData.put("display", concept.display()); + dataDictionaryData.put("dataset", concept.dataset()); + dataDictionaryData.put("description", concept.description()); + } + dataDictionaryData.put("drs_uri", drsUris); + + log.info("Writing " + formattedField + " to data dictonary table with drs_uris: " + drsUris); + entityRecord.put("object", dataDictionaryData); + entityRecord.put("name", dataDictionaryTableName); + entityRecord.put("id", formattedField); + entityRecord.put("relations", List.of()); + + try { + dataFileWriter.append(entityRecord); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } private boolean isSingularField(String field) { @@ -126,7 +203,7 @@ private void writeMetadata() { GenericRecord entityRecord = new GenericData.Record(entitySchema); List nodeList = new ArrayList<>(); - for (String field : fields) { + for (String field : formattedFields) { GenericRecord nodeData = new GenericData.Record(nodeSchema); nodeData.put("name", field); nodeData.put("ontology_reference", ""); @@ -158,21 +235,21 @@ public void writeEntity(Collection entities) { @Override public void writeMultiValueEntity(Collection>> entities) { entities.forEach(entity -> { - if (entity.size() != fields.size()) { + if (entity.size() != formattedFields.size()) { throw new IllegalArgumentException("Entity length much match the number of fields in this document"); } GenericRecord patientData = new GenericData.Record(patientDataSchema); String patientId = ""; - for(int i = 0; i < fields.size(); i++) { - if ("patient_id".equals(fields.get(i))) { + for(int i = 0; i < formattedFields.size(); i++) { + if ("patient_id".equals(formattedFields.get(i))) { patientId = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : ""; } - if (isSingularField(fields.get(i))) { + if (isSingularField(formattedFields.get(i))) { String entityValue = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : ""; - patientData.put(fields.get(i), entityValue); + patientData.put(formattedFields.get(i), entityValue); } else { List fieldValue = entity.get(i) != null ? entity.get(i) : List.of(); - patientData.put(fields.get(i), fieldValue); + patientData.put(formattedFields.get(i), fieldValue); } } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/upload/SignUrlService.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/upload/SignUrlService.java index f80ddce3..d6d950f3 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/upload/SignUrlService.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/upload/SignUrlService.java @@ -41,9 +41,8 @@ public SignUrlService( public void uploadFile(File file, String objectKey) { S3Client s3 = S3Client.builder() - .region(region) + .region(this.region) .build(); - putS3Object(s3, bucketName, objectKey, file); s3.close(); } diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/ConceptTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/ConceptTest.java new file mode 100644 index 00000000..a9dd4b4e --- /dev/null +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/dictionary/ConceptTest.java @@ -0,0 +1,24 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class ConceptTest { + + @Test + public void jsonSerialization() throws JsonProcessingException { + Concept[] concepts = new Concept[]{new Concept("\\demographics\\age\\", "age", "AGE", null, "patient age", Map.of("drs_uri", "[\"a-drs.uri\", \"another-drs.uri\"]"))}; + ObjectMapper objectMapper = new ObjectMapper(); + + String serialized = objectMapper.writeValueAsString(concepts); + Concept[] deserialized = objectMapper.readValue(serialized, Concept[].class); + + assertEquals(List.of(concepts), List.of(deserialized)); + } +} \ No newline at end of file diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java index d1819741..36aa67b3 100644 --- a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java @@ -1,22 +1,34 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing.io; +import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.Concept; +import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.junit.jupiter.api.Assertions.*; +@ExtendWith(MockitoExtension.class) public class PfbWriterTest { + @Mock + private DictionaryService dictionaryService; + @Test public void writeValidPFB() { - PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString()); + PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService); + + Mockito.when(dictionaryService.getConcepts(List.of("patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"))) + .thenReturn(List.of(new Concept("\\demographics\\age\\", "age", "AGE", null, "patient age", Map.of("drs_uri", "[\"a-drs.uri\", \"another-drs.uri\"]")))); pfbWriter.writeHeader(new String[] {"patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"}); List> nullableList = new ArrayList<>(); @@ -34,26 +46,25 @@ public void writeValidPFB() { List.of(List.of(), List.of("75"), List.of()) )); pfbWriter.close(); - // todo: validate this programatically } @Test public void formatFieldName_spacesAndBackslashes_replacedWithUnderscore() { - PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString()); + PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService); String formattedName = pfbWriter.formatFieldName("\\Topmed Study Accession with Subject ID\\\\"); assertEquals("_Topmed_Study_Accession_with_Subject_ID__", formattedName); } @Test public void formatFieldName_startsWithDigit_prependUnderscore() { - PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString()); + PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService); String formattedName = pfbWriter.formatFieldName("123Topmed Study Accession with Subject ID\\\\"); assertEquals("_123Topmed_Study_Accession_with_Subject_ID__", formattedName); } @Test public void formatFieldName_randomGarbage_replaceWithUnderscore() { - PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString()); + PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService); String formattedName = pfbWriter.formatFieldName("$$$my garbage @vro var!able nam#"); assertEquals("___my_garbage__vro_var_able_nam_", formattedName); } diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java index a00a8ad0..31952b49 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java @@ -9,6 +9,7 @@ import java.util.stream.Collectors; import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType; +import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService; import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter; import edu.harvard.hms.dbmi.avillach.hpds.processing.io.PfbWriter; import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter; @@ -48,6 +49,8 @@ public class QueryService { private final CountProcessor countProcessor; private final MultiValueQueryProcessor multiValueQueryProcessor; + private final DictionaryService dictionaryService; + HashMap results = new HashMap<>(); @@ -57,6 +60,7 @@ public QueryService (AbstractProcessor abstractProcessor, TimeseriesProcessor timeseriesProcessor, CountProcessor countProcessor, MultiValueQueryProcessor multiValueQueryProcessor, + @Autowired(required = false) DictionaryService dictionaryService, @Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit, @Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads, @Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) { @@ -65,6 +69,7 @@ public QueryService (AbstractProcessor abstractProcessor, this.timeseriesProcessor = timeseriesProcessor; this.countProcessor = countProcessor; this.multiValueQueryProcessor = multiValueQueryProcessor; + this.dictionaryService = dictionaryService; SMALL_JOB_LIMIT = smallJobLimit; SMALL_TASK_THREADS = smallTaskThreads; @@ -136,7 +141,7 @@ private AsyncResult initializeResult(Query query) throws IOException { String queryId = UUIDv5.UUIDFromString(query.toString()).toString(); ResultWriter writer; if (ResultType.DATAFRAME_PFB.equals(query.getExpectedResultType())) { - writer = new PfbWriter(File.createTempFile("result-" + System.nanoTime(), ".avro"), queryId); + writer = new PfbWriter(File.createTempFile("result-" + System.nanoTime(), ".avro"), queryId, dictionaryService); } else { writer = new CsvWriter(File.createTempFile("result-" + System.nanoTime(), ".sstmp")); } diff --git a/service/src/main/resources/application-bdc-auth-dev.properties b/service/src/main/resources/application-bdc-auth-dev.properties index 7b9dce89..81aa5ca3 100644 --- a/service/src/main/resources/application-bdc-auth-dev.properties +++ b/service/src/main/resources/application-bdc-auth-dev.properties @@ -7,4 +7,6 @@ HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ data-export.s3.bucket-name=pic-sure-auth-dev-data-export data-export.s3.region=us-east-1 -data-export.s3.signedUrl-expiry-minutes=60 \ No newline at end of file +data-export.s3.signedUrl-expiry-minutes=60 + +dictionary.host = http://wildfly.___TARGET_STACK___:8080/ \ No newline at end of file diff --git a/service/src/main/resources/application-bdc-auth-prod.properties b/service/src/main/resources/application-bdc-auth-prod.properties index a63bc6e3..a2269d45 100644 --- a/service/src/main/resources/application-bdc-auth-prod.properties +++ b/service/src/main/resources/application-bdc-auth-prod.properties @@ -7,4 +7,6 @@ HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ data-export.s3.bucket-name=pic-sure-auth-prod-data-export data-export.s3.region=us-east-1 -data-export.s3.signedUrl-expiry-minutes=60 \ No newline at end of file +data-export.s3.signedUrl-expiry-minutes=60 + +dictionary.host = http://wildfly.___TARGET_STACK___:8080/ \ No newline at end of file diff --git a/service/src/main/resources/application-development.properties b/service/src/main/resources/application-development.properties index 16c335d2..5768ef1b 100644 --- a/service/src/main/resources/application-development.properties +++ b/service/src/main/resources/application-development.properties @@ -2,5 +2,4 @@ SMALL_JOB_LIMIT = 100 SMALL_TASK_THREADS = 1 LARGE_TASK_THREADS = 1 -hpds.genomicProcessor.impl=localDistributed -HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/ \ No newline at end of file +dictionary.host = http://wildfly.___TARGET_STACK___:8080/ \ No newline at end of file