From 985c27c115174c17637af5996f0287c949c2e573 Mon Sep 17 00:00:00 2001 From: Paulo Gomes da Cruz Junior Date: Tue, 3 Sep 2024 15:38:57 -0700 Subject: [PATCH] fix(FSADT1-1483): updated filter of hot/cold requests (#1137) * chore: updating code * fix(FSADT1-1483): updated filter of hot/cold requests - Added filter to only allow hot requests to proceed - Added hot/cold filter to the processor This change will check if a submission is HOT or COLD. A HOT submission is one that can be processed, by checking if the timing of processing is bigger than 2 minutes. A COLD submission is one that is being processed already, and we check that by the processing time if it is smaller than 2 minutes. * chore: updating migration info * chore: renamed job --- .github/workflows/pr-open.yml | 2 +- .../ca/bc/gov/app/ApplicationConstant.java | 1 + .../app/dto/SubmissionProcessKindEnum.java | 6 ++ ...ClientSubmissionAutoProcessingService.java | 38 ++++++-- .../service/processor/ProcessorService.java | 96 ++++++++++++++----- 5 files changed, 108 insertions(+), 35 deletions(-) create mode 100644 processor/src/main/java/ca/bc/gov/app/dto/SubmissionProcessKindEnum.java diff --git a/.github/workflows/pr-open.yml b/.github/workflows/pr-open.yml index 30bae1e9af..8a5801ee7f 100644 --- a/.github/workflows/pr-open.yml +++ b/.github/workflows/pr-open.yml @@ -159,7 +159,7 @@ jobs: ESCAPED_BRANCH_NAME=$(echo "$BRANCH_NAME" | sed 's/[\/&]/\\&/g') oc login --token=${{ secrets.OC_TOKEN }} --server=${{ secrets.OC_SERVER }} oc project ${{ secrets.OC_NAMESPACE }} # Safeguard! - oc create job --from=cronjob/nr-forest-client-tools-migratedb migrate-$(date +%s) --dry-run=client -o yaml | sed "s/value: main/value: ${ESCAPED_BRANCH_NAME}/" | sed "s/value: \"0\"/value: \"${{ github.event.number }}\"/" | oc apply -f - + oc create job --from=cronjob/nr-forest-client-tools-migratedb migrate-pr${{ github.event.number }}-${{ github.run_attempt }}-$(date +%s) --dry-run=client -o yaml | sed "s/value: main/value: ${ESCAPED_BRANCH_NAME}/" | sed "s/value: \"0\"/value: \"${{ github.event.number }}\"/" | oc apply -f - deploy: name: Deploy Application diff --git a/processor/src/main/java/ca/bc/gov/app/ApplicationConstant.java b/processor/src/main/java/ca/bc/gov/app/ApplicationConstant.java index 72a70c0f65..588106bb1c 100644 --- a/processor/src/main/java/ca/bc/gov/app/ApplicationConstant.java +++ b/processor/src/main/java/ca/bc/gov/app/ApplicationConstant.java @@ -33,6 +33,7 @@ public final class ApplicationConstant { public static final String MATCH_PARAM_NAME = "corporationName"; public static final String MATCHING_REASON = "matching-reason"; public static final String MATCHING_INFO = "info"; + public static final String MATCHING_KIND = "kind"; public static final String MATCHED_USER = "matched-user"; public static final String IS_DOING_BUSINESS_AS = "isDoingBusinessAs"; public static final String DOING_BUSINESS_AS = "isDoingBusinessAs"; diff --git a/processor/src/main/java/ca/bc/gov/app/dto/SubmissionProcessKindEnum.java b/processor/src/main/java/ca/bc/gov/app/dto/SubmissionProcessKindEnum.java new file mode 100644 index 0000000000..e84f4bcaf1 --- /dev/null +++ b/processor/src/main/java/ca/bc/gov/app/dto/SubmissionProcessKindEnum.java @@ -0,0 +1,6 @@ +package ca.bc.gov.app.dto; + +public enum SubmissionProcessKindEnum { + HOT, + COLD +} diff --git a/processor/src/main/java/ca/bc/gov/app/service/client/ClientSubmissionAutoProcessingService.java b/processor/src/main/java/ca/bc/gov/app/service/client/ClientSubmissionAutoProcessingService.java index 37cbf7ab92..f60de02bc8 100644 --- a/processor/src/main/java/ca/bc/gov/app/service/client/ClientSubmissionAutoProcessingService.java +++ b/processor/src/main/java/ca/bc/gov/app/service/client/ClientSubmissionAutoProcessingService.java @@ -5,6 +5,7 @@ import ca.bc.gov.app.ApplicationConstant; import ca.bc.gov.app.dto.MatcherResult; import ca.bc.gov.app.dto.MessagingWrapper; +import ca.bc.gov.app.dto.SubmissionProcessKindEnum; import ca.bc.gov.app.entity.SubmissionMatchDetailEntity; import ca.bc.gov.app.entity.SubmissionStatusEnum; import ca.bc.gov.app.entity.SubmissionTypeCodeEnum; @@ -106,19 +107,38 @@ public Mono> loadMatchingInfo(MessagingWrapper messag entity.getMatchingField() ) ) - .filter(not(SubmissionMatchDetailEntity::isBeingProcessed)) - //This will add the current date to the processing time to prevent concurrency + .flatMap(entity -> - submissionMatchDetailRepository - .save(entity.withProcessingTime(LocalDateTime.now())) + Mono + .just( + message.withParameter( + ApplicationConstant.MATCHING_INFO, + entity.getMatchers().get(ApplicationConstant.MATCHING_INFO) + ) + // This checks if a submission is HOT or COLD + // A HOT submission is one that can be processed + // A COLD submission is one that is being processed already + .withParameter( + ApplicationConstant.MATCHING_KIND, + (entity.isBeingProcessed()) ? + SubmissionProcessKindEnum.COLD : + SubmissionProcessKindEnum.HOT + ) + ) + //This will add the current date to the processing time to prevent concurrency + .flatMap(msg -> + submissionMatchDetailRepository + .save(entity.withProcessingTime(LocalDateTime.now())) + .thenReturn(msg) + ) ) - .map(entity -> + // Submissions without a match details are always HOT + .defaultIfEmpty( message.withParameter( - ApplicationConstant.MATCHING_INFO, - entity.getMatchers().get(ApplicationConstant.MATCHING_INFO) + ApplicationConstant.MATCHING_KIND, + SubmissionProcessKindEnum.HOT ) - ) - .switchIfEmpty(Mono.just(message)); + ); } private void updateEntityMatchers( diff --git a/processor/src/main/java/ca/bc/gov/app/service/processor/ProcessorService.java b/processor/src/main/java/ca/bc/gov/app/service/processor/ProcessorService.java index f08308a1d9..0d73785155 100644 --- a/processor/src/main/java/ca/bc/gov/app/service/processor/ProcessorService.java +++ b/processor/src/main/java/ca/bc/gov/app/service/processor/ProcessorService.java @@ -2,6 +2,7 @@ import ca.bc.gov.app.ApplicationConstant; import ca.bc.gov.app.dto.MessagingWrapper; +import ca.bc.gov.app.dto.SubmissionProcessKindEnum; import ca.bc.gov.app.dto.SubmissionProcessTypeEnum; import ca.bc.gov.app.entity.SubmissionStatusEnum; import ca.bc.gov.app.repository.SubmissionRepository; @@ -80,36 +81,87 @@ public void submissionMessages() { } /** - * Process external approved and rejected submissions each 30 seconds, - * but with an initial delay after the application starts of 10 seconds + * Process external approved and rejected submissions each 30 seconds, but with an initial delay + * after the application starts of 10 seconds */ @Scheduled(fixedDelay = 30_000, initialDelay = 10_000) public void processedMessages() { //Load the submissions that were processed submissionRepository .loadProcessedSubmissions() + .map(submissionId -> + new MessagingWrapper<>( + submissionId, + Map.of( + ApplicationConstant.SUBMISSION_ID, submissionId, + ApplicationConstant.SUBMISSION_STARTER, SubmissionProcessTypeEnum.EXTERNAL + ) + ) + ) + .flatMap(autoProcessingService::loadMatchingInfo) //Call the processedMessage method - .flatMap(submissionId -> processedMessage(submissionId, SubmissionProcessTypeEnum.EXTERNAL)) + .flatMap(this::processedMessage) .subscribe(); } /** - * Process staff submitted submissions that failed to complete each 10 seconds, - * but with an initial delay after the application starts of 5 seconds. + * Process staff submitted submissions that failed to complete each 10 seconds, but with an + * initial delay after the application starts of 5 seconds. *

The failed to complete part here is important

- * It means that the submission was processed, but failed in the middle of the process, - * so it needs to be reprocessed. + * It means that the submission was processed, but failed in the middle of the process, so it + * needs to be reprocessed. */ @Scheduled(fixedDelay = 10_000, initialDelay = 5_000) public void processStaffSubmitted() { //Load the submissions that were processed submissionRepository .loadStaffSubmissions() + .map(submissionId -> + new MessagingWrapper<>( + submissionId, + Map.of( + ApplicationConstant.SUBMISSION_ID, submissionId, + ApplicationConstant.SUBMISSION_STARTER, SubmissionProcessTypeEnum.STAFF + ) + ) + ) + .flatMap(autoProcessingService::loadMatchingInfo) //Call the processedMessage method - .flatMap(submissionId -> processedMessage(submissionId, SubmissionProcessTypeEnum.STAFF)) + .flatMap(this::processedMessage) .subscribe(); } + /** + * Processes a submission message based on its ID and type. + *

+ * This method wraps the submission ID and type into a {@link MessagingWrapper} object, then calls + * the overloaded {@code processedMessage} method to handle the processing. + *

+ * + * @param submissionId the ID of the submission to be processed + * @param submissionType the type of the submission process, indicating the origin or nature of + * the submission + * @return a {@link Mono} that completes when the processing is done. The Mono emits the + * ID of the processed submission or an error signal if an error occurs during processing. + */ + public Mono processedMessage( + Integer submissionId, + SubmissionProcessTypeEnum submissionType + ) { + return Mono.just( + new MessagingWrapper<>( + submissionId, + Map.of( + ApplicationConstant.SUBMISSION_ID, submissionId, + ApplicationConstant.SUBMISSION_STARTER, submissionType + ) + ) + ) + .flatMap(autoProcessingService::loadMatchingInfo) + //Call the processedMessage method + .flatMap(this::processedMessage); + } + /** * Processes a submission message based on its ID and type. *

@@ -123,30 +175,24 @@ public void processStaffSubmitted() { * If any error occurs during the processing, it logs the error and continues with the next * operations. * - * @param submissionId the ID of the submission to be processed - * @param submissionType the type of the submission process, indicating the origin or nature of - * the submission + * @param submissionWrapper a {@link MessagingWrapper} object containing the submission ID and + * type * @return a {@link Mono} that completes when the processing is done. The Mono emits the * ID of the processed submission or an error signal if an error occurs during processing. */ - public Mono processedMessage( - Integer submissionId, - SubmissionProcessTypeEnum submissionType - ) { - log.info("Processing submission {} of type {}", submissionId, submissionType); + private Mono processedMessage(MessagingWrapper submissionWrapper) { + log.info("Processing submission {} of type {}", + submissionWrapper.payload(), + submissionWrapper.getParameter(ApplicationConstant.SUBMISSION_STARTER, String.class) + ); return //Little wrapper to make it easier to pass around Mono - .just( - new MessagingWrapper<>( - submissionId, - Map.of( - ApplicationConstant.SUBMISSION_ID, submissionId, - ApplicationConstant.SUBMISSION_STARTER, submissionType - ) - ) + .just(submissionWrapper) + //Only process HOT submissions, see ClientSubmissionAutoProcessingService.loadMatchingInfo for info + .filter(submission -> submission.getParameter(ApplicationConstant.MATCHING_KIND, + SubmissionProcessKindEnum.class) == SubmissionProcessKindEnum.HOT ) - .flatMap(autoProcessingService::loadMatchingInfo) .doOnNext(submission -> log.info("Loaded submission for processing {}", submission)) //Process the submission by loading some information .flatMap(submissionProcessingService::processSubmission)