Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threaded validation #513

Merged
merged 8 commits into from
Mar 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.jobs.ProcessSingleFeedJob;
import com.conveyal.datatools.manager.jobs.ValidateFeedJob;
import com.conveyal.datatools.manager.jobs.ValidateMobilityDataFeedJob;
import com.conveyal.datatools.manager.models.Deployment;
import com.conveyal.datatools.manager.models.ExternalFeedSourceProperty;
import com.conveyal.datatools.manager.models.FeedRetrievalMethod;
Expand Down Expand Up @@ -354,18 +355,17 @@ public static boolean validateAll (boolean load, boolean force, String filterFee
// If the force option is not true and the validation result did not fail, re-validate.
continue;
}
MonitorableJob job;
if (filterFeedId != null && !version.feedSourceId.equals(filterFeedId)) {
// Skip all feeds except Cortland for now.
continue;
}
Auth0UserProfile systemUser = Auth0UserProfile.createSystemUser();
if (load) {
job = new ProcessSingleFeedJob(version, systemUser, false);
JobUtils.heavyExecutor.execute(new ProcessSingleFeedJob(version, systemUser, false));
} else {
job = new ValidateFeedJob(version, systemUser, false);
JobUtils.heavyExecutor.execute(new ValidateFeedJob(version, systemUser, false));
JobUtils.heavyExecutor.execute(new ValidateMobilityDataFeedJob(version, systemUser));
}
JobUtils.heavyExecutor.execute(job);
}
// ValidateAllFeedsJob validateAllFeedsJob = new ValidateAllFeedsJob("system", force, load);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void jobLogic() {

// Next, validate the feed.
addNextJob(new ValidateFeedJob(feedVersion, owner, isNewVersion));
addNextJob(new ValidateMobilityDataFeedJob(feedVersion, owner));

// We only need to snapshot the feed if there are transformations at the database level. In the case that there
// are, the snapshot namespace will be the target of these modifications. If we were to apply the modifications
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.conveyal.datatools.manager.jobs;

import com.conveyal.datatools.common.status.FeedVersionJob;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.models.FeedVersion;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This job handles the MobilityData validation of a given feed version. If the version is not new, it will simply
* replace the existing version with the version object that has updated validation info.
*/
public class ValidateMobilityDataFeedJob extends FeedVersionJob {
public static final Logger LOG = LoggerFactory.getLogger(ValidateMobilityDataFeedJob.class);

private FeedVersion feedVersion;

public ValidateMobilityDataFeedJob(FeedVersion version, Auth0UserProfile owner) {
super(owner, "Validating Feed using MobilityData", JobType.VALIDATE_FEED);
feedVersion = version;
status.update("Waiting to begin MobilityData validation...", 0);
}

@Override
public void jobLogic () {
LOG.info("Running ValidateMobilityDataFeedJob for {}", feedVersion.id);
feedVersion.validateMobility(status);
}

@Override
public void jobFinished () {
status.completeSuccessfully("MobilityData validation finished!");
}

/**
* Getter that allows a client to know the ID of the feed version that will be created as soon as the upload is
* initiated; however, we will not store the FeedVersion in the mongo application database until the upload and
* processing is completed. This prevents clients from manipulating GTFS data before it is entirely imported.
*/
@JsonProperty
public String getFeedVersionId () {
return feedVersion.id;
}

@JsonProperty
public String getFeedSourceId () {
return feedVersion.parentFeedSource().id;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,41 @@ public void validate(MonitorableJob.Status status) {
if (status == null) status = new MonitorableJob.Status();

// VALIDATE GTFS feed
FileReader fr = null;
try {
LOG.info("Beginning validation...");

// FIXME: pass status to validate? Or somehow listen to events?
status.update("Validating feed...", 33);

// Validate the feed version.
// Certain extensions, if enabled, have extra validators
if (isExtensionEnabled("mtc")) {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator,
MTCValidator::new
);
} else {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator
);
}
} catch (Exception e) {
status.fail(String.format("Unable to validate feed %s", this.id), e);
// FIXME create validation result with new constructor?
validationResult = new ValidationResult();
validationResult.fatalException = "failure!";
}
}

public void validateMobility(MonitorableJob.Status status) {

// Sometimes this method is called when no status object is available.
if (status == null) status = new MonitorableJob.Status();

// VALIDATE GTFS feed
FileReader fr = null;
try {
LOG.info("Beginning MobilityData validation...");
status.update("MobilityData Analysis...", 11);

File gtfsZip = this.retrieveGtfsFile();
Expand All @@ -382,22 +414,6 @@ public void validate(MonitorableJob.Status status) {

// This will persist the document to Mongo
this.mobilityDataResult = Document.parse(json);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "regular" parsing step was previously persisting the this to mongo. If the mobility data parser finishes first, this still happens. If it finishes second, this fails to happen and the data isn't persisted! I think the solution is to add a mongo persistence line right here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miles-grant-ibigroup We need to be careful here because we don't want to create a new Mongo object if the other validator is expecting to do that. See ValidateFeedJob -> jobFinished(). Both validators need to be aware of the Mongo object state, so that either can create or update. Let me take a look. Good spot btw.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miles-grant-ibigroup This should now be addressed. Unit tests look good. Lemme know how you get on.


// FIXME: pass status to validate? Or somehow listen to events?
status.update("Validating feed...", 33);

// Validate the feed version.
// Certain extensions, if enabled, have extra validators
if (isExtensionEnabled("mtc")) {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator,
MTCValidator::new
);
} else {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator
);
}
} catch (Exception e) {
status.fail(String.format("Unable to validate feed %s", this.id), e);
// FIXME create validation result with new constructor?
Expand Down