Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threaded validation #513

Merged
merged 8 commits into from
Mar 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.jobs.ProcessSingleFeedJob;
import com.conveyal.datatools.manager.jobs.ValidateFeedJob;
import com.conveyal.datatools.manager.jobs.ValidateMobilityDataFeedJob;
import com.conveyal.datatools.manager.models.Deployment;
import com.conveyal.datatools.manager.models.ExternalFeedSourceProperty;
import com.conveyal.datatools.manager.models.FeedRetrievalMethod;
Expand Down Expand Up @@ -354,18 +355,17 @@ public static boolean validateAll (boolean load, boolean force, String filterFee
// If the force option is not true and the validation result did not fail, re-validate.
continue;
}
MonitorableJob job;
if (filterFeedId != null && !version.feedSourceId.equals(filterFeedId)) {
// Skip all feeds except Cortland for now.
continue;
}
Auth0UserProfile systemUser = Auth0UserProfile.createSystemUser();
if (load) {
job = new ProcessSingleFeedJob(version, systemUser, false);
JobUtils.heavyExecutor.execute(new ProcessSingleFeedJob(version, systemUser, false));
} else {
job = new ValidateFeedJob(version, systemUser, false);
JobUtils.heavyExecutor.execute(new ValidateFeedJob(version, systemUser, false));
JobUtils.heavyExecutor.execute(new ValidateMobilityDataFeedJob(version, systemUser, false));
}
JobUtils.heavyExecutor.execute(job);
}
// ValidateAllFeedsJob validateAllFeedsJob = new ValidateAllFeedsJob("system", force, load);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void jobLogic() {

// Next, validate the feed.
addNextJob(new ValidateFeedJob(feedVersion, owner, isNewVersion));
addNextJob(new ValidateMobilityDataFeedJob(feedVersion, owner, isNewVersion));

// We only need to snapshot the feed if there are transformations at the database level. In the case that there
// are, the snapshot namespace will be the target of these modifications. If we were to apply the modifications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@ public void jobFinished () {
// such as BuildTransportNetwork, to finish. If those subsequent jobs fail,
// the version won't get loaded into MongoDB (even though it exists in postgres).
feedVersion.storeUser(owner);
if (isNewVersion) {
int count = feedVersion.parentFeedSource().feedVersionCount();
feedVersion.version = count + 1;
Persistence.feedVersions.create(feedVersion);
} else {
Persistence.feedVersions.replace(feedVersion.id, feedVersion);
}
feedVersion.persistFeedVersionAfterValidation(isNewVersion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written, it looks like the new version is shown in the UI (on the mobility-data-validator branch) after the first validator completes. Is that the desired effect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's ok as long as we refresh the UI when the mobility data results come in, which is something we can do on the client.

// Schedule expiration notification jobs.
Scheduler.scheduleExpirationNotifications(feedVersion.parentFeedSource());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.conveyal.datatools.manager.jobs;

import com.conveyal.datatools.common.status.FeedVersionJob;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.models.FeedVersion;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This job handles the MobilityData validation of a given feed version. If the version is not new, it will simply
* replace the existing version with the version object that has updated validation info.
*/
public class ValidateMobilityDataFeedJob extends FeedVersionJob {
public static final Logger LOG = LoggerFactory.getLogger(ValidateMobilityDataFeedJob.class);

private final FeedVersion feedVersion;
private final boolean isNewVersion;

public ValidateMobilityDataFeedJob(FeedVersion version, Auth0UserProfile owner, boolean isNewVersion) {
super(owner, "Validating Feed using MobilityData", JobType.VALIDATE_FEED);
feedVersion = version;
this.isNewVersion = isNewVersion;
status.update("Waiting to begin MobilityData validation...", 0);
}

@Override
public void jobLogic () {
LOG.info("Running ValidateMobilityDataFeedJob for {}", feedVersion.id);
feedVersion.validateMobility(status);
}

@Override
public void jobFinished () {
if (!status.error) {
if (parentJobId != null && JobType.PROCESS_FEED.equals(parentJobType)) {
// Validate stage is happening as part of an overall process feed job.
// At this point all GTFS data has been loaded and validated, so we record
// the FeedVersion into mongo.
// This happens here because otherwise we would have to wait for other jobs,
// such as BuildTransportNetwork, to finish. If those subsequent jobs fail,
// the version won't get loaded into MongoDB (even though it exists in postgres).
feedVersion.persistFeedVersionAfterValidation(isNewVersion);
}
status.completeSuccessfully("MobilityData validation finished!");
} else {
// If the version was not stored successfully, call FeedVersion#delete to reset things to before the version
// was uploaded/fetched. Note: delete calls made to MongoDB on the version ID will not succeed, but that is
// expected.
feedVersion.delete();
}
}

/**
* Getter that allows a client to know the ID of the feed version that will be created as soon as the upload is
* initiated; however, we will not store the FeedVersion in the mongo application database until the upload and
* processing is completed. This prevents clients from manipulating GTFS data before it is entirely imported.
*/
@JsonProperty
public String getFeedVersionId () {
return feedVersion.id;
}

@JsonProperty
public String getFeedSourceId () {
return feedVersion.parentFeedSource().id;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.conveyal.datatools.common.status.MonitorableJob;
import com.conveyal.datatools.common.utils.Scheduler;
import com.conveyal.datatools.manager.DataManager;
import com.conveyal.datatools.manager.jobs.ValidateFeedJob;
import com.conveyal.datatools.manager.jobs.ValidateMobilityDataFeedJob;
import com.conveyal.datatools.manager.jobs.validation.RouteTypeValidatorBuilder;
import com.conveyal.datatools.manager.persistence.FeedStore;
import com.conveyal.datatools.manager.persistence.Persistence;
Expand Down Expand Up @@ -354,9 +356,41 @@ public void validate(MonitorableJob.Status status) {
if (status == null) status = new MonitorableJob.Status();

// VALIDATE GTFS feed
FileReader fr = null;
try {
LOG.info("Beginning validation...");

// FIXME: pass status to validate? Or somehow listen to events?
status.update("Validating feed...", 33);

// Validate the feed version.
// Certain extensions, if enabled, have extra validators
if (isExtensionEnabled("mtc")) {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator,
MTCValidator::new
);
} else {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator
);
}
} catch (Exception e) {
status.fail(String.format("Unable to validate feed %s", this.id), e);
// FIXME create validation result with new constructor?
validationResult = new ValidationResult();
validationResult.fatalException = "failure!";
}
}

public void validateMobility(MonitorableJob.Status status) {

// Sometimes this method is called when no status object is available.
if (status == null) status = new MonitorableJob.Status();

// VALIDATE GTFS feed
FileReader fr = null;
try {
LOG.info("Beginning MobilityData validation...");
status.update("MobilityData Analysis...", 11);

File gtfsZip = this.retrieveGtfsFile();
Expand All @@ -382,22 +416,6 @@ public void validate(MonitorableJob.Status status) {

// This will persist the document to Mongo
this.mobilityDataResult = Document.parse(json);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "regular" parsing step was previously persisting the this to mongo. If the mobility data parser finishes first, this still happens. If it finishes second, this fails to happen and the data isn't persisted! I think the solution is to add a mongo persistence line right here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miles-grant-ibigroup We need to be careful here because we don't want to create a new Mongo object if the other validator is expecting to do that. See ValidateFeedJob -> jobFinished(). Both validators need to be aware of the Mongo object state, so that either can create or update. Let me take a look. Good spot btw.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miles-grant-ibigroup This should now be addressed. Unit tests look good. Lemme know how you get on.


// FIXME: pass status to validate? Or somehow listen to events?
status.update("Validating feed...", 33);

// Validate the feed version.
// Certain extensions, if enabled, have extra validators
if (isExtensionEnabled("mtc")) {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator,
MTCValidator::new
);
} else {
validationResult = GTFS.validate(feedLoadResult.uniqueIdentifier, DataManager.GTFS_DATA_SOURCE,
RouteTypeValidatorBuilder::buildRouteValidator
);
}
} catch (Exception e) {
status.fail(String.format("Unable to validate feed %s", this.id), e);
// FIXME create validation result with new constructor?
Expand Down Expand Up @@ -582,4 +600,19 @@ public void assignGtfsFileAttributes(File newGtfsFile) {
public boolean isSameAs(FeedVersion otherVersion) {
return otherVersion != null && this.hash.equals(otherVersion.hash);
}

/**
* {@link ValidateFeedJob} and {@link ValidateMobilityDataFeedJob} both require to save a feed version after their
* subsequent validation checks have completed. Either could finish first, therefore this method makes sure that
* only one instance is saved (the last to finish updates).
*/
public void persistFeedVersionAfterValidation(boolean isNewVersion) {
if (isNewVersion && Persistence.feedVersions.getById(id) == null) {
int count = parentFeedSource().feedVersionCount();
version = count + 1;
Persistence.feedVersions.create(this);
} else {
Persistence.feedVersions.replace(id, this);
}
}
}