Skip to content

Commit

Permalink
issue-34: adding array support for codeableconcept in_valueset (cerne…
Browse files Browse the repository at this point in the history
  • Loading branch information
bdrillard authored and rbrush committed Aug 13, 2018
1 parent be8874a commit 7d0204f
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 91 deletions.
111 changes: 76 additions & 35 deletions bunsen-core/src/main/java/com/cerner/bunsen/ValueSetUdfs.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.cerner.bunsen.codes.broadcast.BroadcastableValueSets;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Row;
Expand All @@ -15,65 +16,105 @@
* Support for user-defined functions that use valuesets. This class offers users a stack
* to push and pop valuesets used in the in_valuesets UDF, allowing callers to temporarily
* override valuesets and then simply pop them to revert to the previous state.
*
* <p>
* The {@code in_valueset} UDF allows the user to check whether a given CodeableConcept (or
* sequence of CodeableConcepts) have a Coding field in a ValueSet specified by a Reference name. It
* can be called SparkSQL as {@code in_valueset(codeProperty, 'Reference')}.
* </p>
*/
public class ValueSetUdfs {

/**
* UDF implementation that checks if a given codeable concept is
* in a named valueset reference.
* Returns true if the given CodeableConcept row has a Coding belonging to the ValueSet having the
* given reference name, or false otherwise.
*/
static class InValuesetUdf implements UDF2<Row, String, Boolean> {
private static Boolean inValueSet(Row codeableRow,
String referenceName,
BroadcastableValueSets valueSets) {

private Broadcast<BroadcastableValueSets> broadcast;
boolean found = false;

InValuesetUdf(Broadcast<BroadcastableValueSets> broadcast) {
this.broadcast = broadcast;
}
if (codeableRow != null) {

@Override
public Boolean call(Row row, String referenceName) throws Exception {
List<Row> codingArray = codeableRow.getList(1);

// A null code or concept never matches.
if (row == null || referenceName == null) {
return false;
}
if (codingArray != null) {

BroadcastableValueSets valuesets = broadcast.getValue();
for (Row coding : codingArray) {

// Check if we are dealing with a codeable concept or directly with a coding.
boolean isCodeable = false;
String system = coding.getAs("system");
String code = coding.getAs("code");

for (String fieldName : row.schema().fieldNames()) {
if ("coding".equals(fieldName)) {
isCodeable = true;
break;
// If there exists a matching code, return true.
if (valueSets.hasCode(referenceName, system, code)) {

found = true;

break;
}
}
}
}

if (isCodeable) {
return found;
}

IndexedSeq<Row> array = row.<IndexedSeq<Row>>getAs("coding");
/**
* Returns true if the given input CodeableConcept, or sequence of CodeableConcept, has a Coding
* contained in the ValueSet having the given reference name, or false otherwise. This method
* is dynamically typed as it may be invoked over either a structure or sequence of structures in
* SparkSQL.
*/
private static Boolean inValueSet(Object input,
String referenceName,
Broadcast<BroadcastableValueSets> valueSets) {

for (int i = 0; i < array.length(); ++i) {
// A null code never matches.
if (input == null) {

Row coding = array.apply(i);
return false;
}

String sourceSystem = coding.getAs("system");
String sourceValue = coding.getAs("code");
if (input instanceof Row) {

if (valuesets.hasCode(referenceName, sourceSystem, sourceValue)) {
return true;
}
}
return inValueSet((Row) input, referenceName, valueSets.getValue());
} else {

return false;
} else {
IndexedSeq<Row> codeableConceptSeq = (IndexedSeq<Row>) input;

boolean found = false;

for (int i = 0; i < codeableConceptSeq.size(); i++) {

String sourceSystem = row.getAs("system");
String sourceValue = row.getAs("code");
if (inValueSet(codeableConceptSeq.apply(i), referenceName, valueSets.getValue())) {

return valuesets.hasCode(referenceName, sourceSystem, sourceValue);
found = true;

break;
}
}

return found;
}
}

/**
* Spark UDF to check FHIR resources' code-property fields for membership in a valueset. The input
* code-field can either be a CodeableConcept or an array of CodeableConcept structures.
*/
static class InValuesetUdf implements UDF2<Object, String, Boolean> {

private Broadcast<BroadcastableValueSets> broadcast;

InValuesetUdf(Broadcast<BroadcastableValueSets> broadcast) {
this.broadcast = broadcast;
}

@Override
public Boolean call(Object input, String referenceName) throws Exception {

return inValueSet(input, referenceName, broadcast);
}
}

Expand Down
38 changes: 25 additions & 13 deletions bunsen-core/src/test/java/com/cerner/bunsen/MockValueSets.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.cerner.bunsen.codes.UrlAndVersion;
import com.cerner.bunsen.codes.Value;
import com.cerner.bunsen.codes.base.AbstractValueSets;
import java.util.Collections;
import com.google.common.collect.ImmutableList;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.hl7.fhir.dstu3.model.ValueSet;
Expand Down Expand Up @@ -46,26 +46,38 @@ public MockValueSets(SparkSession spark,
public static MockValueSets createWithTestValue(SparkSession spark, FhirEncoders encoders) {

Dataset<UrlAndVersion> urlAndVersion = spark.createDataset(
Collections.singletonList(new UrlAndVersion(
"urn:cerner:bunsen:valueset:married_maritalstatus",
"0.0.1")),
ImmutableList.of(new UrlAndVersion(
"http://hl7.org/fhir/us/core/ValueSet/us-core-encounter-type",
"1.1.0"),
new UrlAndVersion(
"http://hl7.org/fhir/ValueSet/v3-ActPriority",
"2017-04-19")),
AbstractValueSets.getUrlAndVersionEncoder());

Dataset<ValueSet> valueSet = spark.createDataset(
Collections.singletonList(new ValueSet()
.setUrl("urn:cerner:bunsen:valueset:married_maritalstatus")
.setVersion("0.0.1")),
ImmutableList.of(new ValueSet()
.setUrl("http://hl7.org/fhir/us/core/ValueSet/us-core-encounter-type")
.setVersion("1.1.0"),
new ValueSet()
.setUrl("http://hl7.org/fhir/ValueSet/v3-ActPriority")
.setVersion("2017-04-19")),
encoders.of(ValueSet.class))
.withColumn("timestamp", lit("20180101120000").cast("timestamp"))
.as(encoders.of(ValueSet.class));

Dataset<Value> values = spark.createDataset(
Collections.singletonList(new Value(
"urn:cerner:bunsen:valueset:married_maritalstatus",
"0.0.1",
"http://hl7.org/fhir/v3/MaritalStatus",
"2016-11-11",
"M")),
ImmutableList.of(new Value(
"http://hl7.org/fhir/us/core/ValueSet/us-core-encounter-type",
"1.1.0",
"http://www.ama-assn.org/go/cpt",
"0.0.1",
"99200"),
new Value(
"http://hl7.org/fhir/ValueSet/v3-ActPriority",
"2017-04-19",
"http://hl7.org/fhir/v3/ActPriority",
"2017-04-19",
"EM")),
AbstractValueSets.getValueEncoder());

return new MockValueSets(spark,
Expand Down
56 changes: 37 additions & 19 deletions bunsen-core/src/test/java/com/cerner/bunsen/ValueSetUdfsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.apache.spark.sql.SparkSession;
import org.hl7.fhir.dstu3.model.CodeableConcept;
import org.hl7.fhir.dstu3.model.Condition;
import org.hl7.fhir.dstu3.model.Encounter;
import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.Patient;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -30,8 +30,6 @@ public class ValueSetUdfsTest {

private static SparkSession spark;



private static CodeableConcept codeable(String system, String value) {

CodeableConcept concept = new CodeableConcept();
Expand All @@ -54,6 +52,7 @@ private static Observation observation(String id, String code) {
}

private static Condition condition(String id, String code) {

Condition condition = new Condition();

// Condition based on example from FHIR:
Expand All @@ -65,14 +64,19 @@ private static Condition condition(String id, String code) {
return condition;
}

private static Patient patient(String id, String marritalStatus) {
Patient patient = new Patient();
private static Encounter encounter(String id, String type, String priority) {

patient.setId(id);
Encounter encounter = new Encounter();

patient.setMaritalStatus(codeable("http://hl7.org/fhir/v3/MaritalStatus", marritalStatus));
encounter.setId(id);

return patient;
// CodeableConcept array field
encounter.setType(ImmutableList.of(codeable("http://www.ama-assn.org/go/cpt", type)));

// CodeableConcept singleton field
encounter.setPriority(codeable("http://hl7.org/fhir/v3/ActPriority", priority));

return encounter;
}

/**
Expand Down Expand Up @@ -117,8 +121,10 @@ public static void setUp() throws IOException {
.addCode("albumin",
Loinc.LOINC_CODE_SYSTEM_URI,
"14959-1")
.addReference("married",
"urn:cerner:bunsen:valueset:married_maritalstatus")
.addReference("types",
"http://hl7.org/fhir/us/core/ValueSet/us-core-encounter-type")
.addReference("priorities",
"http://hl7.org/fhir/ValueSet/v3-ActPriority")
.addDescendantsOf("leukocytes",
Loinc.LOINC_CODE_SYSTEM_URI,
"LP14419-3",
Expand Down Expand Up @@ -160,13 +166,15 @@ public static void setUp() throws IOException {

conditions.createOrReplaceTempView("test_snomed_cond");

Dataset<Patient> patients = spark.createDataset(
Dataset<Encounter> encounters = spark.createDataset(
ImmutableList.of(
patient("married", "M"),
patient("unmarried", "U")),
encoders.of(Patient.class));
encounter("emergency", null, "EM"),
encounter("routine", null, "R"),
encounter("encounter", "99200", null),
encounter("non_encounter", "99199", null)),
encoders.of(Encounter.class));

patients.createOrReplaceTempView("test_valueset_patient");
encounters.createOrReplaceTempView("test_valueset_encounter");
}

/**
Expand Down Expand Up @@ -236,10 +244,20 @@ public void testHasCyclicAncestor() {
@Test
public void testHasValueSetCode() {

Dataset<Row> results = spark.sql("select id from test_valueset_patient "
+ "where in_valueset(maritalStatus, 'married')");
Dataset<Row> encounters = spark.sql("select id from test_valueset_encounter "
+ "where in_valueset(priority, 'priorities')");

Assert.assertEquals(1, results.count());
Assert.assertEquals("married", results.head().get(0));
Assert.assertEquals(1, encounters.count());
Assert.assertEquals("emergency", encounters.head().get(0));
}

@Test
public void testArrayHasValueSetCode() {

Dataset<Row> encounters = spark.sql("select id from test_valueset_encounter "
+ "where in_valueset(type, 'types')");

Assert.assertEquals(1, encounters.count());
Assert.assertEquals("encounter", encounters.head().get(0));
}
}
Loading

0 comments on commit 7d0204f

Please sign in to comment.