Skip to content

Commit

Permalink
Improve provenance
Browse files Browse the repository at this point in the history
  • Loading branch information
NicoLaval committed Nov 12, 2024
1 parent 4a543b8 commit 23ca5ca
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 51 deletions.
104 changes: 74 additions & 30 deletions vtl-prov/src/main/java/fr/insee/vtl/prov/ProvenanceListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
import org.antlr.v4.runtime.misc.Interval;
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicInteger;

/**
* ANTLR Listener that create provenance objects.
Expand All @@ -32,6 +30,9 @@ public class ProvenanceListener extends VtlBaseListener {
private boolean isInDatasetClause;

private String currentComponentID;

private String currentDataframeID;

private int stepIndex = 1;

private boolean rootAssignment = true;
Expand All @@ -41,6 +42,11 @@ public class ProvenanceListener extends VtlBaseListener {

private Map<String, String> currentAvailableDataframeUUID = new HashMap<>();

// Map of label/UUID id
private final Map<String, String> availableVariableUUID = new HashMap<>();

private Map<String, String> currentAvailableVariableUUID = new HashMap<>();

public ProvenanceListener(String id, String programName) {
program.setId(id);
program.setLabel(programName);
Expand Down Expand Up @@ -111,10 +117,14 @@ public void enterVarID(VtlParser.VarIDContext ctx) {
String dfId = ProvenanceUtils.getOrBuildUUID(availableDataframeUUID, label);
DataframeInstance df = new DataframeInstance(dfId, label);
consumedDataframe.add(df);
// Certainly don't need to reset? To check!
currentDataframeID = label;
}
if (isInDatasetClause && null != currentComponentID) {
Set<VariableInstance> usedVariables = programStep.getUsedVariables();
VariableInstance v = new VariableInstance(label);
String varUUID = ProvenanceUtils.getOrBuildUUID(availableVariableUUID, currentDataframeID + "|" + label);
VariableInstance v = new VariableInstance(varUUID, label);
v.setParentDataframe(currentDataframeID);
usedVariables.add(v);
}
} else {
Expand All @@ -137,8 +147,15 @@ public void enterComponentID(VtlParser.ComponentIDContext ctx) {
String label = ctx.getText();
ProgramStep programStep = program.getProgramStepByLabel(currentProgramStep);
Set<VariableInstance> assignedVariables = programStep.getAssignedVariables();
VariableInstance v = new VariableInstance(label);
String variableUUID = ProvenanceUtils.getOrBuildUUID(availableDataframeUUID, label);
VariableInstance v = new VariableInstance(variableUUID, label);
assignedVariables.add(v);
currentAvailableVariableUUID.put(currentDataframeID + "|" + label, variableUUID);
}

@Override
public void exitComponentID(VtlParser.ComponentIDContext ctx) {
System.out.println(ctx.IDENTIFIER());
}

@Override
Expand All @@ -149,6 +166,8 @@ public void enterCalcClauseItem(VtlParser.CalcClauseItemContext ctx) {
@Override
public void exitCalcClauseItem(VtlParser.CalcClauseItemContext ctx) {
currentComponentID = null;
availableVariableUUID.putAll(currentAvailableVariableUUID);
currentAvailableVariableUUID = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -181,33 +200,58 @@ public static Program run(String expr, String id, String programName) {
public static Program runWithBindings(ScriptEngine engine, String expr, String id, String programName) {
Program program = run(expr, id, programName);
// 0 check if input dataset are empty?
// 1 - Handle input dataset
ProgramStep initialStep = program.getProgramStepByIndex(1);
Set<DataframeInstance> consumedDataframe = initialStep.getConsumedDataframe();
consumedDataframe.forEach(d ->{
Dataset ds = (Dataset) engine.getContext().getAttribute(d.getLabel());
ds.getDataStructure().values().forEach(c -> {
VariableInstance variableInstance = new VariableInstance(c.getName());
variableInstance.setRole(c.getRole());
variableInstance.setType(c.getType());
d.getHasVariableInstances().add(variableInstance);
});
});

// 2 - Split script to loop over program steps and run them
List<String> scripts = Arrays.stream(expr.split(";"))
// Keep already handled dataset
List<String> dsHandled = new ArrayList<>();
// Split script to loop over program steps and run them
AtomicInteger index = new AtomicInteger(1);
Arrays.stream(expr.split(";"))
.map(e -> e + ";")
.collect(Collectors.toList());

scripts.forEach(s -> {
try {
engine.eval(s);
} catch (ScriptException e) {
throw new RuntimeException(e);
}
});
.forEach(stepScript -> {
int i = index.getAndIncrement();
// 1 - Handle input dataset
ProgramStep step = program.getProgramStepByIndex(i);
Set<DataframeInstance> consumedDataframe = step.getConsumedDataframe();
consumedDataframe.forEach(d -> {
if (!dsHandled.contains(d.getLabel())) {
Dataset ds = (Dataset) engine.getContext().getAttribute(d.getLabel());
ds.getDataStructure().values().forEach(c -> {
VariableInstance variableInstance = step.getUsedVariables()
.stream()
.filter(v ->
v.getParentDataframe().equals(d.getLabel()) &&
v.getLabel().equals(c.getName()))
.findFirst()
.orElse(new VariableInstance(c.getName()));
variableInstance.setRole(c.getRole());
variableInstance.setType(c.getType());
d.getHasVariableInstances().add(variableInstance);
});
}
});
try {
engine.eval(stepScript);
} catch (ScriptException e) {
throw new RuntimeException(e);
}
// Improve built variables attributes
DataframeInstance producedDataframe = step.getProducedDataframe();
Dataset ds = (Dataset) engine.getContext().getAttribute(producedDataframe.getLabel());

ds.getDataStructure().values().forEach(c -> {
VariableInstance variableInstance = step.getAssignedVariables()
.stream()
.filter(v -> v.getLabel().equals(c.getName()))
.findFirst()
// TODO: refine variable detection in usedVariable
.orElse(new VariableInstance(c.getName()));
variableInstance.setRole(c.getRole());
variableInstance.setType(c.getType());
producedDataframe.getHasVariableInstances().add(variableInstance);
});
dsHandled.add((producedDataframe.getLabel()));
// Correct usedVariables ID checking ds/var of last assignment
});

Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);

return program;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class VariableInstance {
String id;
String label;
Dataset.Role role;
String parentDataframe;
Class<?> type;

public VariableInstance(String label) {
Expand Down Expand Up @@ -51,4 +52,13 @@ public Class<?> getType() {
public void setType(Class<?> type) {
this.type = type;
}


public String getParentDataframe() {
return parentDataframe;
}

public void setParentDataframe(String parentDataframe) {
this.parentDataframe = parentDataframe;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

public class ProvenanceUtils {

public static String getOrBuildUUID(Map<String, String> availableDataframeUUID, String label) {
if (null != availableDataframeUUID.get(label)) {
return availableDataframeUUID.get(label);
public static String getOrBuildUUID(Map<String, String> availableUUID, String label) {
if (null != availableUUID.get(label)) {
return availableUUID.get(label);
}
return UUID.randomUUID().toString();
}
Expand Down
36 changes: 18 additions & 18 deletions vtl-prov/src/test/java/fr/insee/vtl/prov/RDFTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
Expand Down Expand Up @@ -82,48 +81,49 @@ public void simpleTest() throws IOException {
@Test
public void simpleTestWithBindings() throws IOException {
SparkSession spark = SparkSession.builder()
.appName("test")
.master("local")
.getOrCreate();
.appName("test")
.master("local")
.getOrCreate();

ScriptEngineManager mgr = new ScriptEngineManager();
ScriptEngine engine = mgr.getEngineByExtension("vtl");
engine.put(VtlScriptEngine.PROCESSING_ENGINE_NAMES, "spark");

InMemoryDataset ds1 = new InMemoryDataset(
Java8Helpers.listOf(
Java8Helpers.mapOf("id", "A", "var1", 0L, "var2", 0.1D),
Java8Helpers.mapOf("id", "B", "var1", 1L, "var2", 0.2D),
Java8Helpers.mapOf("id", "C", "var1", 2L, "var2", 0.3D)
Java8Helpers.mapOf("id1", "A", "var1", 0L, "var2", 100L),
Java8Helpers.mapOf("id1", "B", "var1", 1L, "var2", 200L),
Java8Helpers.mapOf("id1", "C", "var1", 2L, "var2", 300L)
),
Java8Helpers.mapOf("id", String.class, "var1", Long.class, "var2", Double.class),
Java8Helpers.mapOf("id", Dataset.Role.IDENTIFIER, "var1", Dataset.Role.MEASURE, "var2", Dataset.Role.MEASURE)
Java8Helpers.mapOf("id1", String.class, "var1", Long.class, "var2", Long.class),
Java8Helpers.mapOf("id1", Dataset.Role.IDENTIFIER, "var1", Dataset.Role.MEASURE, "var2", Dataset.Role.MEASURE)
);
InMemoryDataset ds2 = new InMemoryDataset(
Java8Helpers.listOf(
Java8Helpers.mapOf("id", "A", "var1", 10L, "var2", 1.1D),
Java8Helpers.mapOf("id", "B", "var1", 11L, "var2", 1.2D),
Java8Helpers.mapOf("id", "D", "var1", 12L, "var2", 1.3D)
Java8Helpers.mapOf("id1", "A", "var1", 10L, "var2", 1L),
Java8Helpers.mapOf("id1", "B", "var1", 11L, "var2", 2L),
Java8Helpers.mapOf("id1", "D", "var1", 12L, "var2", 3L)
),
Java8Helpers.mapOf("id", String.class, "var1", Long.class, "var2", Double.class),
Java8Helpers.mapOf("id", Dataset.Role.IDENTIFIER, "var1", Dataset.Role.MEASURE, "var2", Dataset.Role.MEASURE)
Java8Helpers.mapOf("id1", String.class, "var1", Long.class, "var2", Long.class),
Java8Helpers.mapOf("id1", Dataset.Role.IDENTIFIER, "var1", Dataset.Role.MEASURE, "var2", Dataset.Role.MEASURE)
);

engine.put("ds1", ds1);
engine.put("ds2", ds2);

String script = "ds_sum := ds1 + ds2;\n" +
String script = "ds1 := ds1[calc identifier id1 := id1, var1 := cast(var1, integer), var2 := cast(var2, integer)];\n" +
"ds2 := ds2[calc identifier id1 := id1, var1 := cast(var1, integer), var2 := cast(var2, integer)];\n" +
"ds_sum := ds1 + ds2;\n" +
"ds_mul := ds_sum * 3; \n" +
"ds_res <- ds_mul [filter mod(var1, 2) = 0]" +
" [calc var_sum := var1 + var2];";
"ds_res <- ds_mul[filter mod(var1, 2) = 0][calc var_sum := var1 + var2];";

Program program = ProvenanceListener.runWithBindings(engine, script, "trevas-simple-test", "Simple test from Trevas tests");
Model model = RDFUtils.buildModel(program);
String content = RDFUtils.serialize(model, "JSON-LD");
assertThat(content).isNotEmpty();
RDFUtils.loadModelWithCredentials(model, sparqlEndpoint, sparqlEndpointUser, sparlqEndpointPassword);
RDFUtils.writeJsonLdToFile(model, "src/test/resources/output/test-simple-with-bindings.json");
assertThat(program.getProgramSteps()).hasSize(3);
assertThat(program.getProgramSteps()).hasSize(5);
}

@Test
Expand Down

0 comments on commit 23ca5ca

Please sign in to comment.