Skip to content

Commit

Permalink
Merge pull request #513 from ibi-group/threaded-validation
Browse files Browse the repository at this point in the history
Threaded validation
  • Loading branch information
Robin Beer authored Mar 3, 2023
2 parents 097295a + 346b97c commit 18767dd
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1
FROM maven:3.8.6-openjdk-11
FROM maven:3.8.7-openjdk-18-slim

COPY . /datatools

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.0.5</version>
<version>4.0.6</version>
</dependency>

<!-- Miscellaneous utilities -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.jobs.ProcessSingleFeedJob;
import com.conveyal.datatools.manager.jobs.ValidateFeedJob;
import com.conveyal.datatools.manager.jobs.ValidateMobilityDataFeedJob;
import com.conveyal.datatools.manager.models.Deployment;
import com.conveyal.datatools.manager.models.ExternalFeedSourceProperty;
import com.conveyal.datatools.manager.models.FeedRetrievalMethod;
Expand Down Expand Up @@ -354,18 +355,17 @@ public static boolean validateAll (boolean load, boolean force, String filterFee
// If the force option is not true and the validation result did not fail, re-validate.
continue;
}
MonitorableJob job;
if (filterFeedId != null && !version.feedSourceId.equals(filterFeedId)) {
// Skip all feeds except Cortland for now.
continue;
}
Auth0UserProfile systemUser = Auth0UserProfile.createSystemUser();
if (load) {
job = new ProcessSingleFeedJob(version, systemUser, false);
JobUtils.heavyExecutor.execute(new ProcessSingleFeedJob(version, systemUser, false));
} else {
job = new ValidateFeedJob(version, systemUser, false);
JobUtils.heavyExecutor.execute(new ValidateFeedJob(version, systemUser, false));
JobUtils.heavyExecutor.execute(new ValidateMobilityDataFeedJob(version, systemUser, false));
}
JobUtils.heavyExecutor.execute(job);
}
// ValidateAllFeedsJob validateAllFeedsJob = new ValidateAllFeedsJob("system", force, load);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void jobLogic() {

// Next, validate the feed.
addNextJob(new ValidateFeedJob(feedVersion, owner, isNewVersion));
addNextJob(new ValidateMobilityDataFeedJob(feedVersion, owner, isNewVersion));

// We only need to snapshot the feed if there are transformations at the database level. In the case that there
// are, the snapshot namespace will be the target of these modifications. If we were to apply the modifications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@ public void jobFinished () {
// such as BuildTransportNetwork, to finish. If those subsequent jobs fail,
// the version won't get loaded into MongoDB (even though it exists in postgres).
feedVersion.storeUser(owner);
if (isNewVersion) {
int count = feedVersion.parentFeedSource().feedVersionCount();
feedVersion.version = count + 1;
Persistence.feedVersions.create(feedVersion);
} else {
Persistence.feedVersions.replace(feedVersion.id, feedVersion);
}
feedVersion.persistFeedVersionAfterValidation(isNewVersion);
// Schedule expiration notification jobs.
Scheduler.scheduleExpirationNotifications(feedVersion.parentFeedSource());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.conveyal.datatools.manager.jobs;

import com.conveyal.datatools.common.status.FeedVersionJob;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.models.FeedVersion;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This job handles the MobilityData validation of a given feed version. If the version is not new, it will simply
* replace the existing version with the version object that has updated validation info.
*/
public class ValidateMobilityDataFeedJob extends FeedVersionJob {
public static final Logger LOG = LoggerFactory.getLogger(ValidateMobilityDataFeedJob.class);

private final FeedVersion feedVersion;
private final boolean isNewVersion;

public ValidateMobilityDataFeedJob(FeedVersion version, Auth0UserProfile owner, boolean isNewVersion) {
super(owner, "Validating Feed using MobilityData", JobType.VALIDATE_FEED);
feedVersion = version;
this.isNewVersion = isNewVersion;
status.update("Waiting to begin MobilityData validation...", 0);
}

@Override
public void jobLogic () {
LOG.info("Running ValidateMobilityDataFeedJob for {}", feedVersion.id);
feedVersion.validateMobility(status);
}

@Override
public void jobFinished () {
if (!status.error) {
if (parentJobId != null && JobType.PROCESS_FEED.equals(parentJobType)) {
// Validate stage is happening as part of an overall process feed job.
// At this point all GTFS data has been loaded and validated, so we record
// the FeedVersion into mongo.
// This happens here because otherwise we would have to wait for other jobs,
// such as BuildTransportNetwork, to finish. If those subsequent jobs fail,
// the version won't get loaded into MongoDB (even though it exists in postgres).
feedVersion.persistFeedVersionAfterValidation(isNewVersion);
}
status.completeSuccessfully("MobilityData validation finished!");
} else {
// If the version was not stored successfully, call FeedVersion#delete to reset things to before the version
// was uploaded/fetched. Note: delete calls made to MongoDB on the version ID will not succeed, but that is
// expected.
feedVersion.delete();
}
}

/**
* Getter that allows a client to know the ID of the feed version that will be created as soon as the upload is
* initiated; however, we will not store the FeedVersion in the mongo application database until the upload and
* processing is completed. This prevents clients from manipulating GTFS data before it is entirely imported.
*/
@JsonProperty
public String getFeedVersionId () {
return feedVersion.id;
}

@JsonProperty
public String getFeedSourceId () {
return feedVersion.parentFeedSource().id;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.conveyal.datatools.common.status.MonitorableJob;
import com.conveyal.datatools.common.utils.Scheduler;
import com.conveyal.datatools.manager.DataManager;
import com.conveyal.datatools.manager.jobs.ValidateFeedJob;
import com.conveyal.datatools.manager.jobs.ValidateMobilityDataFeedJob;
import com.conveyal.datatools.manager.jobs.validation.RouteTypeValidatorBuilder;
import com.conveyal.datatools.manager.persistence.FeedStore;
import com.conveyal.datatools.manager.persistence.Persistence;
Expand Down Expand Up @@ -354,9 +356,41 @@ public void validate(MonitorableJob.Status status) {
if (status == null) status = new MonitorableJob.Status();

// VALIDATE GTFS feed
FileReader fr = null;
try {
LOG.info("Beginning validation...");

// FIXME: pass status to validate? Or somehow listen to events?
status.update("Validating feed...", 33);

// Validate the feed version.
// Certain extensions, if enabled, have extra validators
if (isExtensionEnabled("mtc")) {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator,
MTCValidator::new
);
} else {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator
);
}
} catch (Exception e) {
status.fail(String.format("Unable to validate feed %s", this.id), e);
// FIXME create validation result with new constructor?
validationResult = new ValidationResult();
validationResult.fatalException = "failure!";
}
}

public void validateMobility(MonitorableJob.Status status) {

// Sometimes this method is called when no status object is available.
if (status == null) status = new MonitorableJob.Status();

// VALIDATE GTFS feed
FileReader fr = null;
try {
LOG.info("Beginning MobilityData validation...");
status.update("MobilityData Analysis...", 11);

File gtfsZip = this.retrieveGtfsFile();
Expand All @@ -382,22 +416,6 @@ public void validate(MonitorableJob.Status status) {

// This will persist the document to Mongo
this.mobilityDataResult = Document.parse(json);

// FIXME: pass status to validate? Or somehow listen to events?
status.update("Validating feed...", 33);

// Validate the feed version.
// Certain extensions, if enabled, have extra validators
if (isExtensionEnabled("mtc")) {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator,
MTCValidator::new
);
} else {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator
);
}
} catch (Exception e) {
status.fail(String.format("Unable to validate feed %s", this.id), e);
// FIXME create validation result with new constructor?
Expand Down Expand Up @@ -582,4 +600,19 @@ public void assignGtfsFileAttributes(File newGtfsFile) {
public boolean isSameAs(FeedVersion otherVersion) {
return otherVersion != null && this.hash.equals(otherVersion.hash);
}

/**
* {@link ValidateFeedJob} and {@link ValidateMobilityDataFeedJob} both require to save a feed version after their
* subsequent validation checks have completed. Either could finish first, therefore this method makes sure that
* only one instance is saved (the last to finish updates).
*/
public void persistFeedVersionAfterValidation(boolean isNewVersion) {
if (isNewVersion && Persistence.feedVersions.getById(id) == null) {
int count = parentFeedSource().feedVersionCount();
version = count + 1;
Persistence.feedVersions.create(this);
} else {
Persistence.feedVersions.replace(id, this);
}
}
}

0 comments on commit 18767dd

Please sign in to comment.