Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(FSADT1-1483): updated filter of hot/cold requests #1137

Merged
merged 5 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr-open.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ jobs:
ESCAPED_BRANCH_NAME=$(echo "$BRANCH_NAME" | sed 's/[\/&]/\\&/g')
oc login --token=${{ secrets.OC_TOKEN }} --server=${{ secrets.OC_SERVER }}
oc project ${{ secrets.OC_NAMESPACE }} # Safeguard!
oc create job --from=cronjob/nr-forest-client-tools-migratedb migrate-$(date +%s) --dry-run=client -o yaml | sed "s/value: main/value: ${ESCAPED_BRANCH_NAME}/" | sed "s/value: \"0\"/value: \"${{ github.event.number }}\"/" | oc apply -f -
oc create job --from=cronjob/nr-forest-client-tools-migratedb migrate-pr${{ github.event.number }}-${{ github.run_attempt }}-$(date +%s) --dry-run=client -o yaml | sed "s/value: main/value: ${ESCAPED_BRANCH_NAME}/" | sed "s/value: \"0\"/value: \"${{ github.event.number }}\"/" | oc apply -f -

deploy:
name: Deploy Application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class ApplicationConstant {
public static final String MATCH_PARAM_NAME = "corporationName";
public static final String MATCHING_REASON = "matching-reason";
public static final String MATCHING_INFO = "info";
public static final String MATCHING_KIND = "kind";
public static final String MATCHED_USER = "matched-user";
public static final String IS_DOING_BUSINESS_AS = "isDoingBusinessAs";
public static final String DOING_BUSINESS_AS = "isDoingBusinessAs";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package ca.bc.gov.app.dto;

public enum SubmissionProcessKindEnum {
HOT,
COLD
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ca.bc.gov.app.ApplicationConstant;
import ca.bc.gov.app.dto.MatcherResult;
import ca.bc.gov.app.dto.MessagingWrapper;
import ca.bc.gov.app.dto.SubmissionProcessKindEnum;
import ca.bc.gov.app.entity.SubmissionMatchDetailEntity;
import ca.bc.gov.app.entity.SubmissionStatusEnum;
import ca.bc.gov.app.entity.SubmissionTypeCodeEnum;
Expand Down Expand Up @@ -106,19 +107,38 @@ public <T> Mono<MessagingWrapper<T>> loadMatchingInfo(MessagingWrapper<T> messag
entity.getMatchingField()
)
)
.filter(not(SubmissionMatchDetailEntity::isBeingProcessed))
//This will add the current date to the processing time to prevent concurrency

.flatMap(entity ->
submissionMatchDetailRepository
.save(entity.withProcessingTime(LocalDateTime.now()))
Mono
.just(
message.withParameter(
ApplicationConstant.MATCHING_INFO,
entity.getMatchers().get(ApplicationConstant.MATCHING_INFO)
)
// This checks if a submission is HOT or COLD
// A HOT submission is one that can be processed
// A COLD submission is one that is being processed already
.withParameter(
ApplicationConstant.MATCHING_KIND,
(entity.isBeingProcessed()) ?
SubmissionProcessKindEnum.COLD :
SubmissionProcessKindEnum.HOT
)
)
//This will add the current date to the processing time to prevent concurrency
.flatMap(msg ->
submissionMatchDetailRepository
.save(entity.withProcessingTime(LocalDateTime.now()))
.thenReturn(msg)
)
)
.map(entity ->
// Submissions without a match details are always HOT
.defaultIfEmpty(
message.withParameter(
ApplicationConstant.MATCHING_INFO,
entity.getMatchers().get(ApplicationConstant.MATCHING_INFO)
ApplicationConstant.MATCHING_KIND,
SubmissionProcessKindEnum.HOT
)
)
.switchIfEmpty(Mono.just(message));
);
}

private void updateEntityMatchers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import ca.bc.gov.app.ApplicationConstant;
import ca.bc.gov.app.dto.MessagingWrapper;
import ca.bc.gov.app.dto.SubmissionProcessKindEnum;
import ca.bc.gov.app.dto.SubmissionProcessTypeEnum;
import ca.bc.gov.app.entity.SubmissionStatusEnum;
import ca.bc.gov.app.repository.SubmissionRepository;
Expand Down Expand Up @@ -80,36 +81,87 @@ public void submissionMessages() {
}

/**
* Process external approved and rejected submissions each 30 seconds,
* but with an initial delay after the application starts of 10 seconds
* Process external approved and rejected submissions each 30 seconds, but with an initial delay
* after the application starts of 10 seconds
*/
@Scheduled(fixedDelay = 30_000, initialDelay = 10_000)
public void processedMessages() {
//Load the submissions that were processed
submissionRepository
.loadProcessedSubmissions()
.map(submissionId ->
new MessagingWrapper<>(
submissionId,
Map.of(
ApplicationConstant.SUBMISSION_ID, submissionId,
ApplicationConstant.SUBMISSION_STARTER, SubmissionProcessTypeEnum.EXTERNAL
)
)
)
.flatMap(autoProcessingService::loadMatchingInfo)
//Call the processedMessage method
.flatMap(submissionId -> processedMessage(submissionId, SubmissionProcessTypeEnum.EXTERNAL))
.flatMap(this::processedMessage)
.subscribe();
}

/**
* Process staff submitted submissions that failed to complete each 10 seconds,
* but with an initial delay after the application starts of 5 seconds.
* Process staff submitted submissions that failed to complete each 10 seconds, but with an
* initial delay after the application starts of 5 seconds.
* <p>The failed to complete part here is important</p>
* It means that the submission was processed, but failed in the middle of the process,
* so it needs to be reprocessed.
* It means that the submission was processed, but failed in the middle of the process, so it
* needs to be reprocessed.
*/
@Scheduled(fixedDelay = 10_000, initialDelay = 5_000)
public void processStaffSubmitted() {
//Load the submissions that were processed
submissionRepository
.loadStaffSubmissions()
.map(submissionId ->
new MessagingWrapper<>(
submissionId,
Map.of(
ApplicationConstant.SUBMISSION_ID, submissionId,
ApplicationConstant.SUBMISSION_STARTER, SubmissionProcessTypeEnum.STAFF
)
)
)
.flatMap(autoProcessingService::loadMatchingInfo)
//Call the processedMessage method
.flatMap(submissionId -> processedMessage(submissionId, SubmissionProcessTypeEnum.STAFF))
.flatMap(this::processedMessage)
.subscribe();
}

/**
* Processes a submission message based on its ID and type.
* <p>
* This method wraps the submission ID and type into a {@link MessagingWrapper} object, then calls
* the overloaded {@code processedMessage} method to handle the processing.
* </p>
*
* @param submissionId the ID of the submission to be processed
* @param submissionType the type of the submission process, indicating the origin or nature of
* the submission
* @return a {@link Mono<String>} that completes when the processing is done. The Mono emits the
* ID of the processed submission or an error signal if an error occurs during processing.
*/
public Mono<String> processedMessage(
Integer submissionId,
SubmissionProcessTypeEnum submissionType
) {
return Mono.just(
new MessagingWrapper<>(
submissionId,
Map.of(
ApplicationConstant.SUBMISSION_ID, submissionId,
ApplicationConstant.SUBMISSION_STARTER, submissionType
)
)
)
.flatMap(autoProcessingService::loadMatchingInfo)
//Call the processedMessage method
.flatMap(this::processedMessage);
}

/**
* Processes a submission message based on its ID and type.
* <p>
Expand All @@ -123,30 +175,24 @@ public void processStaffSubmitted() {
* If any error occurs during the processing, it logs the error and continues with the next
* operations.
*
* @param submissionId the ID of the submission to be processed
* @param submissionType the type of the submission process, indicating the origin or nature of
* the submission
* @param submissionWrapper a {@link MessagingWrapper} object containing the submission ID and
* type
* @return a {@link Mono<String>} that completes when the processing is done. The Mono emits the
* ID of the processed submission or an error signal if an error occurs during processing.
*/
public Mono<String> processedMessage(
Integer submissionId,
SubmissionProcessTypeEnum submissionType
) {
log.info("Processing submission {} of type {}", submissionId, submissionType);
private Mono<String> processedMessage(MessagingWrapper<Integer> submissionWrapper) {
log.info("Processing submission {} of type {}",
submissionWrapper.payload(),
submissionWrapper.getParameter(ApplicationConstant.SUBMISSION_STARTER, String.class)
);
return
//Little wrapper to make it easier to pass around
Mono
.just(
new MessagingWrapper<>(
submissionId,
Map.of(
ApplicationConstant.SUBMISSION_ID, submissionId,
ApplicationConstant.SUBMISSION_STARTER, submissionType
)
)
.just(submissionWrapper)
//Only process HOT submissions, see ClientSubmissionAutoProcessingService.loadMatchingInfo for info
.filter(submission -> submission.getParameter(ApplicationConstant.MATCHING_KIND,
SubmissionProcessKindEnum.class) == SubmissionProcessKindEnum.HOT
)
.flatMap(autoProcessingService::loadMatchingInfo)
.doOnNext(submission -> log.info("Loaded submission for processing {}", submission))
//Process the submission by loading some information
.flatMap(submissionProcessingService::processSubmission)
Expand Down
Loading