Skip to content

Commit

Permalink
fix(FSADT1-1483): updated filter of hot/cold requests (#1137)
Browse files Browse the repository at this point in the history
* chore: updating code

* fix(FSADT1-1483): updated filter of hot/cold requests

- Added filter to only allow hot requests to proceed
- Added hot/cold filter to the processor

 This change will check if a submission is HOT or COLD.

 A HOT submission is one that can be processed, by checking if the timing of processing is bigger than 2 minutes.

 A COLD submission is one that is being processed already, and we check that by the processing time if it is smaller than 2 minutes.

* chore: updating migration info

* chore: renamed job
  • Loading branch information
paulushcgcj authored Sep 3, 2024
1 parent eaf15cb commit 985c27c
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-open.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ jobs:
ESCAPED_BRANCH_NAME=$(echo "$BRANCH_NAME" | sed 's/[\/&]/\\&/g')
oc login --token=${{ secrets.OC_TOKEN }} --server=${{ secrets.OC_SERVER }}
oc project ${{ secrets.OC_NAMESPACE }} # Safeguard!
oc create job --from=cronjob/nr-forest-client-tools-migratedb migrate-$(date +%s) --dry-run=client -o yaml | sed "s/value: main/value: ${ESCAPED_BRANCH_NAME}/" | sed "s/value: \"0\"/value: \"${{ github.event.number }}\"/" | oc apply -f -
oc create job --from=cronjob/nr-forest-client-tools-migratedb migrate-pr${{ github.event.number }}-${{ github.run_attempt }}-$(date +%s) --dry-run=client -o yaml | sed "s/value: main/value: ${ESCAPED_BRANCH_NAME}/" | sed "s/value: \"0\"/value: \"${{ github.event.number }}\"/" | oc apply -f -
deploy:
name: Deploy Application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class ApplicationConstant {
public static final String MATCH_PARAM_NAME = "corporationName";
public static final String MATCHING_REASON = "matching-reason";
public static final String MATCHING_INFO = "info";
public static final String MATCHING_KIND = "kind";
public static final String MATCHED_USER = "matched-user";
public static final String IS_DOING_BUSINESS_AS = "isDoingBusinessAs";
public static final String DOING_BUSINESS_AS = "isDoingBusinessAs";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package ca.bc.gov.app.dto;

public enum SubmissionProcessKindEnum {
HOT,
COLD
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ca.bc.gov.app.ApplicationConstant;
import ca.bc.gov.app.dto.MatcherResult;
import ca.bc.gov.app.dto.MessagingWrapper;
import ca.bc.gov.app.dto.SubmissionProcessKindEnum;
import ca.bc.gov.app.entity.SubmissionMatchDetailEntity;
import ca.bc.gov.app.entity.SubmissionStatusEnum;
import ca.bc.gov.app.entity.SubmissionTypeCodeEnum;
Expand Down Expand Up @@ -106,19 +107,38 @@ public <T> Mono<MessagingWrapper<T>> loadMatchingInfo(MessagingWrapper<T> messag
entity.getMatchingField()
)
)
.filter(not(SubmissionMatchDetailEntity::isBeingProcessed))
//This will add the current date to the processing time to prevent concurrency

.flatMap(entity ->
submissionMatchDetailRepository
.save(entity.withProcessingTime(LocalDateTime.now()))
Mono
.just(
message.withParameter(
ApplicationConstant.MATCHING_INFO,
entity.getMatchers().get(ApplicationConstant.MATCHING_INFO)
)
// This checks if a submission is HOT or COLD
// A HOT submission is one that can be processed
// A COLD submission is one that is being processed already
.withParameter(
ApplicationConstant.MATCHING_KIND,
(entity.isBeingProcessed()) ?
SubmissionProcessKindEnum.COLD :
SubmissionProcessKindEnum.HOT
)
)
//This will add the current date to the processing time to prevent concurrency
.flatMap(msg ->
submissionMatchDetailRepository
.save(entity.withProcessingTime(LocalDateTime.now()))
.thenReturn(msg)
)
)
.map(entity ->
// Submissions without a match details are always HOT
.defaultIfEmpty(
message.withParameter(
ApplicationConstant.MATCHING_INFO,
entity.getMatchers().get(ApplicationConstant.MATCHING_INFO)
ApplicationConstant.MATCHING_KIND,
SubmissionProcessKindEnum.HOT
)
)
.switchIfEmpty(Mono.just(message));
);
}

private void updateEntityMatchers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import ca.bc.gov.app.ApplicationConstant;
import ca.bc.gov.app.dto.MessagingWrapper;
import ca.bc.gov.app.dto.SubmissionProcessKindEnum;
import ca.bc.gov.app.dto.SubmissionProcessTypeEnum;
import ca.bc.gov.app.entity.SubmissionStatusEnum;
import ca.bc.gov.app.repository.SubmissionRepository;
Expand Down Expand Up @@ -80,36 +81,87 @@ public void submissionMessages() {
}

/**
* Process external approved and rejected submissions each 30 seconds,
* but with an initial delay after the application starts of 10 seconds
* Process external approved and rejected submissions each 30 seconds, but with an initial delay
* after the application starts of 10 seconds
*/
@Scheduled(fixedDelay = 30_000, initialDelay = 10_000)
public void processedMessages() {
//Load the submissions that were processed
submissionRepository
.loadProcessedSubmissions()
.map(submissionId ->
new MessagingWrapper<>(
submissionId,
Map.of(
ApplicationConstant.SUBMISSION_ID, submissionId,
ApplicationConstant.SUBMISSION_STARTER, SubmissionProcessTypeEnum.EXTERNAL
)
)
)
.flatMap(autoProcessingService::loadMatchingInfo)
//Call the processedMessage method
.flatMap(submissionId -> processedMessage(submissionId, SubmissionProcessTypeEnum.EXTERNAL))
.flatMap(this::processedMessage)
.subscribe();
}

/**
* Process staff submitted submissions that failed to complete each 10 seconds,
* but with an initial delay after the application starts of 5 seconds.
* Process staff submitted submissions that failed to complete each 10 seconds, but with an
* initial delay after the application starts of 5 seconds.
* <p>The failed to complete part here is important</p>
* It means that the submission was processed, but failed in the middle of the process,
* so it needs to be reprocessed.
* It means that the submission was processed, but failed in the middle of the process, so it
* needs to be reprocessed.
*/
@Scheduled(fixedDelay = 10_000, initialDelay = 5_000)
public void processStaffSubmitted() {
//Load the submissions that were processed
submissionRepository
.loadStaffSubmissions()
.map(submissionId ->
new MessagingWrapper<>(
submissionId,
Map.of(
ApplicationConstant.SUBMISSION_ID, submissionId,
ApplicationConstant.SUBMISSION_STARTER, SubmissionProcessTypeEnum.STAFF
)
)
)
.flatMap(autoProcessingService::loadMatchingInfo)
//Call the processedMessage method
.flatMap(submissionId -> processedMessage(submissionId, SubmissionProcessTypeEnum.STAFF))
.flatMap(this::processedMessage)
.subscribe();
}

/**
* Processes a submission message based on its ID and type.
* <p>
* This method wraps the submission ID and type into a {@link MessagingWrapper} object, then calls
* the overloaded {@code processedMessage} method to handle the processing.
* </p>
*
* @param submissionId the ID of the submission to be processed
* @param submissionType the type of the submission process, indicating the origin or nature of
* the submission
* @return a {@link Mono<String>} that completes when the processing is done. The Mono emits the
* ID of the processed submission or an error signal if an error occurs during processing.
*/
public Mono<String> processedMessage(
Integer submissionId,
SubmissionProcessTypeEnum submissionType
) {
return Mono.just(
new MessagingWrapper<>(
submissionId,
Map.of(
ApplicationConstant.SUBMISSION_ID, submissionId,
ApplicationConstant.SUBMISSION_STARTER, submissionType
)
)
)
.flatMap(autoProcessingService::loadMatchingInfo)
//Call the processedMessage method
.flatMap(this::processedMessage);
}

/**
* Processes a submission message based on its ID and type.
* <p>
Expand All @@ -123,30 +175,24 @@ public void processStaffSubmitted() {
* If any error occurs during the processing, it logs the error and continues with the next
* operations.
*
* @param submissionId the ID of the submission to be processed
* @param submissionType the type of the submission process, indicating the origin or nature of
* the submission
* @param submissionWrapper a {@link MessagingWrapper} object containing the submission ID and
* type
* @return a {@link Mono<String>} that completes when the processing is done. The Mono emits the
* ID of the processed submission or an error signal if an error occurs during processing.
*/
public Mono<String> processedMessage(
Integer submissionId,
SubmissionProcessTypeEnum submissionType
) {
log.info("Processing submission {} of type {}", submissionId, submissionType);
private Mono<String> processedMessage(MessagingWrapper<Integer> submissionWrapper) {
log.info("Processing submission {} of type {}",
submissionWrapper.payload(),
submissionWrapper.getParameter(ApplicationConstant.SUBMISSION_STARTER, String.class)
);
return
//Little wrapper to make it easier to pass around
Mono
.just(
new MessagingWrapper<>(
submissionId,
Map.of(
ApplicationConstant.SUBMISSION_ID, submissionId,
ApplicationConstant.SUBMISSION_STARTER, submissionType
)
)
.just(submissionWrapper)
//Only process HOT submissions, see ClientSubmissionAutoProcessingService.loadMatchingInfo for info
.filter(submission -> submission.getParameter(ApplicationConstant.MATCHING_KIND,
SubmissionProcessKindEnum.class) == SubmissionProcessKindEnum.HOT
)
.flatMap(autoProcessingService::loadMatchingInfo)
.doOnNext(submission -> log.info("Loaded submission for processing {}", submission))
//Process the submission by loading some information
.flatMap(submissionProcessingService::processSubmission)
Expand Down

0 comments on commit 985c27c

Please sign in to comment.