Skip to content

Commit

Permalink
Compare fingerprints by hash
Browse files Browse the repository at this point in the history
  • Loading branch information
johnaohara committed Nov 30, 2023
1 parent 6d8fdc3 commit 3d5bc4d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ public class FingerprintDAO extends PanacheEntityBase {
@Column(columnDefinition = "jsonb")
public JsonNode fingerprint;

public Integer fp_hash;

@Override
public String toString() {
return "FP{" +
"datasetId=" + datasetId +
", fingerprint=" + fingerprint +
", fp_hash=" + fp_hash +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,18 +546,19 @@ void onNewDataPoint(DataPoint.Event event) {
log.debugf("Processing new datapoint for dataset %d at %s, variable %d (%s), value %f",
dataPoint.datasetId, dataPoint.timestamp,
variable.id, variable.name, dataPoint.value);
JsonNode fingerprint = FingerprintDAO.<FingerprintDAO>findByIdOptional(dataPoint.datasetId).map(fp -> fp.fingerprint).orElse(null);
FingerprintDAO fingerprint = FingerprintDAO.<FingerprintDAO>findByIdOptional(dataPoint.datasetId).orElse(null);
Integer fpHash = fingerprint != null ? fingerprint.fp_hash : null;

VarAndFingerprint key = new VarAndFingerprint(variable.id, fingerprint);
log.debugf("Invalidating variable %d FP %s timestamp %s, current value is %s", variable.id, fingerprint, dataPoint.timestamp, validUpTo.get(key));
VarAndFingerprint key = new VarAndFingerprint(variable.id, fpHash);
log.debugf("Invalidating variable %d FP %s timestamp %s, current value is %s", variable.id, fingerprint == null ? null : fingerprint.fingerprint, dataPoint.timestamp, validUpTo.get(key));
validUpTo.compute(key, (ignored, current) -> {
if (current == null || !dataPoint.timestamp.isAfter(current.timestamp)) {
return new UpTo(dataPoint.timestamp, false);
} else {
return current;
}
});
runChangeDetection(VariableDAO.findById(variable.id), fingerprint, event.notify, true);
runChangeDetection(VariableDAO.findById(variable.id), fpHash, event.notify, true);
} else {
log.warnf("Could not process new datapoint for dataset %d at %s, could not find variable by id %d ",
dataPoint.datasetId, dataPoint.timestamp, dataPoint.variable == null ? -1 : dataPoint.variable.id);
Expand All @@ -570,19 +571,19 @@ void onNewDataPoint(DataPoint.Event event) {

@WithRoles(extras = Roles.HORREUM_SYSTEM)
@Transactional
void tryRunChangeDetection(VariableDAO variable, JsonNode fingerprint, boolean notify) {
runChangeDetection(variable, fingerprint, notify, false);
void tryRunChangeDetection(VariableDAO variable, Integer fpHash, boolean notify) {
runChangeDetection(variable, fpHash, notify, false);
}

private void runChangeDetection(VariableDAO variable, JsonNode fingerprint, boolean notify, boolean expectExists) {
UpTo valid = validUpTo.get(new VarAndFingerprint(variable.id, fingerprint));
private void runChangeDetection(VariableDAO variable, Integer fpHash, boolean notify, boolean expectExists) {
UpTo valid = validUpTo.get(new VarAndFingerprint(variable.id, fpHash));
Instant nextTimestamp = session.createNativeQuery(
"SELECT MIN(timestamp) FROM datapoint dp LEFT JOIN fingerprint fp ON dp.dataset_id = fp.dataset_id " +
"WHERE dp.variable_id = ?1 AND (timestamp > ?2 OR (timestamp = ?2 AND ?3)) AND json_equals(fp.fingerprint, ?4)", Instant.class)
"WHERE dp.variable_id = ?1 AND (timestamp > ?2 OR (timestamp = ?2 AND ?3)) AND fp_hash = ?4", Instant.class)
.setParameter(1, variable.id)
.setParameter(2, valid != null ? valid.timestamp : LONG_TIME_AGO, StandardBasicTypes.INSTANT)
.setParameter(3, valid == null || !valid.inclusive)
.setParameter(4, fingerprint, JsonBinaryType.INSTANCE)
.setParameter(4, fpHash)
.getResultStream().filter(Objects::nonNull).findFirst().orElse(null);
if (nextTimestamp == null) {
log.debugf("No further datapoints for change detection");
Expand All @@ -594,24 +595,24 @@ private void runChangeDetection(VariableDAO variable, JsonNode fingerprint, bool
int numDeleted = session.createNativeQuery("DELETE FROM change cc WHERE cc.id IN (" +
"SELECT id FROM change c LEFT JOIN fingerprint fp ON c.dataset_id = fp.dataset_id " +
"WHERE NOT c.confirmed AND c.variable_id = ?1 AND (c.timestamp > ?2 OR (c.timestamp = ?2 AND ?3)) " +
"AND json_equals(fp.fingerprint, ?4))", int.class)
"AND fp.fp_hash = ?4)", int.class)
.setParameter(1, variable.id)
.setParameter(2, valid.timestamp, StandardBasicTypes.INSTANT)
.setParameter(3, !valid.inclusive)
.setParameter(4, fingerprint, JsonBinaryType.INSTANCE)
.setParameter(4, fpHash)
.executeUpdate();
log.debugf("Deleted %d changes %s %s for variable %d, fingerprint %s", numDeleted, valid.inclusive ? ">" : ">=", valid.timestamp, variable.id, fingerprint);
log.debugf("Deleted %d changes %s %s for variable %d, fingerprint %s", numDeleted, valid.inclusive ? ">" : ">=", valid.timestamp, variable.id, fpHash);
}

var changeQuery = session.createQuery("SELECT c FROM Change c LEFT JOIN Fingerprint fp ON c.dataset.id = fp.dataset.id " +
"WHERE c.variable = ?1 AND (c.timestamp < ?2 OR (c.timestamp = ?2 AND ?3 = TRUE)) AND " +
"TRUE = function('json_equals', fp.fingerprint, ?4) " +
"fp.fp_hash = ?4 " +
"ORDER by c.timestamp DESC", ChangeDAO.class);
changeQuery
.setParameter(1, variable)
.setParameter(2, valid != null ? valid.timestamp : VERY_DISTANT_FUTURE)
.setParameter(3, valid == null || valid.inclusive)
.setParameter(4, fingerprint, JsonBinaryType.INSTANCE);
.setParameter(4, fpHash);
ChangeDAO lastChange = changeQuery.setMaxResults(1).getResultStream().findFirst().orElse(null);

Instant changeTimestamp = LONG_TIME_AGO;
Expand All @@ -624,12 +625,12 @@ private void runChangeDetection(VariableDAO variable, JsonNode fingerprint, bool
"SELECT dp FROM DataPoint dp LEFT JOIN Fingerprint fp ON dp.dataset.id = fp.dataset.id " +
"JOIN dp.dataset " + // ignore datapoints (that were not deleted yet) from deleted datasets
"WHERE dp.variable = ?1 AND dp.timestamp BETWEEN ?2 AND ?3 " +
"AND TRUE = function('json_equals', fp.fingerprint, ?4) " +
"AND fp.fp_hash = ?4 " +
"ORDER BY dp.timestamp DESC, dp.dataset.id DESC", DataPointDAO.class)
.setParameter(1, variable)
.setParameter(2, changeTimestamp)
.setParameter(3, nextTimestamp)
.setParameter(4, fingerprint, JsonBinaryType.INSTANCE)
.setParameter(4, fpHash)
.getResultList();
// Last datapoint is already in the list
if (dataPoints.isEmpty()) {
Expand Down Expand Up @@ -669,13 +670,13 @@ private void runChangeDetection(VariableDAO variable, JsonNode fingerprint, bool
}
}
Util.doAfterCommit(tm, () -> {
validateUpTo(variable, fingerprint, nextTimestamp);
messageBus.executeForTest(variable.testId, () -> tryRunChangeDetection(variable, fingerprint, notify));
validateUpTo(variable, fpHash, nextTimestamp);
messageBus.executeForTest(variable.testId, () -> tryRunChangeDetection(variable, fpHash, notify));
});
}

private void validateUpTo(VariableDAO variable, JsonNode fingerprint, Instant timestamp) {
validUpTo.compute(new VarAndFingerprint(variable.id, fingerprint), (ignored, current) -> {
private void validateUpTo(VariableDAO variable, Integer fpHash, Instant timestamp) {
validUpTo.compute(new VarAndFingerprint(variable.id, fpHash), (ignored, current) -> {
log.debugf("Attempt %s, valid up to %s, ", timestamp, current);
if (current == null || !current.timestamp.isAfter(timestamp)) {
return new UpTo(timestamp, true);
Expand Down Expand Up @@ -1277,24 +1278,25 @@ public static class Recalculation {

static final class VarAndFingerprint {
final int varId;
final JsonNode fingerprint;
final Integer fp_hash;

VarAndFingerprint(int varId, JsonNode fingerprint) {
VarAndFingerprint(int varId, Integer fp_hash) {
this.varId = varId;
this.fingerprint = fingerprint;
this.fp_hash = fp_hash;
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
VarAndFingerprint that = (VarAndFingerprint) o;
return varId == that.varId && Objects.equals(fingerprint, that.fingerprint);
return varId == that.varId && Objects.equals(fp_hash, that.fp_hash);
}

@Override
public int hashCode() {
return Objects.hash(varId, fingerprint);
return Objects.hash(varId, fp_hash);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ private void createFingerprint(int datasetId, int testId) {
fp.datasetId = datasetId;
fp.dataset = DatasetDAO.findById(datasetId);
fp.fingerprint = fpNode;
fp.fp_hash = fpNode.hashCode();
if(fp.datasetId > 0 && fp.dataset != null)
fp.persist();
}
Expand Down
10 changes: 10 additions & 0 deletions horreum-backend/src/main/resources/db/changeLog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4342,5 +4342,15 @@
CREATE INDEX label_values_label_id ON label_values (label_id);
</sql>
</changeSet>
<changeSet id="114" author="johara">
<validCheckSum>ANY</validCheckSum>
<addColumn tableName="fingerprint">
<column name="fp_hash"
type="integer"/>
</addColumn>
<sql>
CREATE INDEX fingerprint_fp_hash ON fingerprint (fp_hash);
</sql>
</changeSet>

</databaseChangeLog>

0 comments on commit 3d5bc4d

Please sign in to comment.