From 926852ad1e33b776ebcf87981d6bc570785792a4 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Mon, 5 Aug 2024 11:14:43 +0530 Subject: [PATCH 01/14] Fix Celery Worker and Beat Producer Queue issue --- .../celerybeat-schedule.db | Bin 16384 -> 16384 bytes .../app.py | 2 +- .../openg2p_g2p_bridge_celery_workers/app.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openg2p-g2p-bridge-celery-beat-producers/celerybeat-schedule.db b/openg2p-g2p-bridge-celery-beat-producers/celerybeat-schedule.db index a092bf0a7d3b1f0e0b4cc6c392af1089a609089e..f9c5157799c2ee25fe750a454f67e88ff2a844e9 100644 GIT binary patch delta 451 zcmZo@U~Fh$+#n#p7R%$#6U(#lqCJmD0|>Bm=ryoR^=D=PgWAcBvKo^+<+|8eS+xXM z7u=nEUN(?XWU`{1;^ba=jrtz7l+5C!(xT$j__Wfzl;ZfbqWs+Wq{O`J_@vatlK6t6 z{FKt<)S@XFEE+_sC0b#I6sI#6`wI@R+s^4s$q;Ls5;Vn|FGD&*cJdr~$@(7o+{A){ z)S~#J)Z+Y{(vr;lJS^@asE7z-kQ{PB9p(^LxJy`jl#?@3Cp*e2a)3et%28s0iArZ6 f8F0A~W&m72XQ^IlUP)1AYVqU-8O6=_Bm=uN&WuRM95yduj~e`W?4hj}VHGmOnO z^#BtC7}QSQAfq{Xl8hTu2J7U1GP3L}9EJj{O{XR+$p$hqPA-vERAcMmEXmAGO-aou zNt`lyiZ_Edmv;lp)G$V<2~&EMlXCJW3(6}_o+#4=G{9PksWt#?04vY{PsZZS+vL3z E0Lz0j{{R30 diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py index 3bc1216..3844c10 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py @@ -31,7 +31,7 @@ def get_engine(): celery_app = Celery( - "g2p_bridge_celery_tasks", + "g2p_bridge_celery_beat_producer", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0", include=["openg2p_g2p_bridge_celery_beat_producers.tasks"], diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/app.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/app.py index d2a12e6..f41f6d7 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/app.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/app.py @@ -34,7 +34,7 @@ def get_engine(): celery_app = Celery( - "g2p_bridge_celery_worker_tasks", + "g2p_bridge_celery_worker", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0", include=["openg2p_g2p_bridge_celery_workers.tasks"], From d610a198ed47f807cd0211f562b6a36b65ed055e Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Tue, 6 Aug 2024 00:19:20 +0530 Subject: [PATCH 02/14] Minor code refactoring --- .../bank_connectors/example_bank_connector.py | 8 ++++---- .../bank_interface/bank_connector_interface.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py index 33fd60b..e7e9d1f 100644 --- a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py +++ b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py @@ -51,11 +51,11 @@ class BankPaymentPayload(BaseModel): class ExampleBankConnector(BankConnectorInterface): - def check_funds(self, account_no, currency, amount) -> CheckFundsResponse: + def check_funds(self, account_number, currency, amount) -> CheckFundsResponse: try: with httpx.Client() as client: request_data = { - "account_number": account_no, + "account_number": account_number, "account_currency": currency, "total_funds_needed": amount, } @@ -77,11 +77,11 @@ def check_funds(self, account_no, currency, amount) -> CheckFundsResponse: status=FundsAvailableWithBankEnum.PENDING_CHECK, error_code=str(e) ) - def block_funds(self, account_no, currency, amount) -> BlockFundsResponse: + def block_funds(self, account_number, currency, amount) -> BlockFundsResponse: try: with httpx.Client() as client: request_data = { - "account_no": account_no, + "account_number": account_number, "currency": currency, "amount": amount, } diff --git a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_interface/bank_connector_interface.py b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_interface/bank_connector_interface.py index 34e7064..54171d3 100644 --- a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_interface/bank_connector_interface.py +++ b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_interface/bank_connector_interface.py @@ -60,10 +60,10 @@ class PaymentResponse(BaseModel): class BankConnectorInterface(BaseService): - def check_funds(self, account_no, currency, amount) -> CheckFundsResponse: + def check_funds(self, account_number, currency, amount) -> CheckFundsResponse: raise NotImplementedError() - def block_funds(self, account_no, currency, amount) -> BlockFundsResponse: + def block_funds(self, account_number, currency, amount) -> BlockFundsResponse: raise NotImplementedError() def initiate_payment( From 6cdf692c528392b8d754ae4628b374cceaa94e31 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Tue, 6 Aug 2024 00:22:57 +0530 Subject: [PATCH 03/14] Minor code refactoring --- .../.gitignore => .gitignore | 0 .../.gitignore | 1 + .../celerybeat-schedule.db | Bin 16384 -> 0 bytes .../app.py | 4 ++++ .../config.py | 2 +- 5 files changed, 6 insertions(+), 1 deletion(-) rename openg2p-g2p-bridge-celery-workers/.gitignore => .gitignore (100%) delete mode 100644 openg2p-g2p-bridge-celery-beat-producers/celerybeat-schedule.db diff --git a/openg2p-g2p-bridge-celery-workers/.gitignore b/.gitignore similarity index 100% rename from openg2p-g2p-bridge-celery-workers/.gitignore rename to .gitignore diff --git a/openg2p-g2p-bridge-celery-beat-producers/.gitignore b/openg2p-g2p-bridge-celery-beat-producers/.gitignore index f5e0368..c1c2ea5 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/.gitignore +++ b/openg2p-g2p-bridge-celery-beat-producers/.gitignore @@ -79,3 +79,4 @@ docs/_build/ # Ignore secret files and env .secrets.* .env +*.db \ No newline at end of file diff --git a/openg2p-g2p-bridge-celery-beat-producers/celerybeat-schedule.db b/openg2p-g2p-bridge-celery-beat-producers/celerybeat-schedule.db deleted file mode 100644 index f9c5157799c2ee25fe750a454f67e88ff2a844e9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16384 zcmeI%O>5LZ7zgmF&AQpTnmwo&5f24jMS=(Y1Vuyw3cg%~GUR2}gv}<)WTLdNP%pl~ z+&aDcExh>=MC!Q*@#4X+;L9^>N{c9)Q^E2dn4KrdvolYA$y|2X$#WrN>;z-ZE@J}< zS)Z{b8f<576VUbfiRES-(&*TDr%I5hdF%Z<{dM{U01W~VfB*y_009U<00IzzKr4YQ+nm?_ zma}u;{$ogOW`=q-%FB4nM|F`@{CTEQ9)-o2M8efP~h`+0R{W=2tPVK8zoDkiuytIOA|}AFs45Zk3+L!WYyQ z*ebDI*5N`wy=Yfx?F-rsnW$)k^?vU!raQO%oBj>sbN)=o%EEHH2Wu5`QG`)063XuG zXU|_sUu`zW7$5)v2tWV=5P$##AOHafKmY;|IK~3sf1*JE0uX=z1Rwwb2tWV=5P$## JAaL9TegPjpV|V}n diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py index 3844c10..440a052 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py @@ -54,5 +54,9 @@ def get_engine(): "task": "disburse_funds_from_bank_beat_producer", "schedule": _config.funds_disbursement_frequency, }, + "mt940_processor_beat_producer": { + "task": "mt940_processor_beat_producer", + "schedule": _config.mt940_processor_frequency, + }, } celery_app.conf.timezone = "UTC" diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py index a6ebba1..9e3915e 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py @@ -32,7 +32,7 @@ class Settings(BaseSettings): funds_available_check_frequency: int = 10 funds_blocked_frequency: int = 10 funds_disbursement_frequency: int = 10 - statement_process_frequency: int = 3600 + mt940_processor_frequency: int = 10 bank_fa_deconstruct_strategy: str = "" mobile_wallet_deconstruct_strategy: str = "" From 9dc1c75946204b4e2e9349c192b1c09bc9e7fb1f Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Tue, 6 Aug 2024 17:22:19 +0530 Subject: [PATCH 04/14] Decode statement_file before storing as text --- .../src/openg2p_g2p_bridge_api/services/account_statement.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py index 6869345..ff4eb78 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py @@ -39,7 +39,7 @@ async def upload_mt940(self, statement_file: UploadFile) -> str: statement_lob = AccountStatementLob( statement_id=statement_id, - statement_lob=str(statement_file), + statement_lob=str(statement_file.decode('utf-8')), active=True, ) session.add(statement_lob) From bfe8b5bbefd82bb523bc46793e6087e4effc824d Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Tue, 6 Aug 2024 17:24:08 +0530 Subject: [PATCH 05/14] Fix Mt940 processor --- .../tasks/mt940_processor.py | 126 +++++++++--------- 1 file changed, 60 insertions(+), 66 deletions(-) diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py index 328424d..0ac75c3 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py @@ -28,6 +28,7 @@ @celery_app.task(name="mt940_processor_worker") def mt940_processor_worker(statement_id: str): + _logger.info(f"Processing account statement with statement_id: {statement_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -50,16 +51,12 @@ def mt940_processor_worker(statement_id: str): return try: - # Set BalanceBase scope to Transaction - mt940.tags.BalanceBase.scope = mt940.models.Transaction - # Parsing header section account_number_parser = mt940.tags.AccountIdentification() statement_number_parser = mt940.tags.StatementNumber() transaction_reference_parser = mt940.tags.TransactionReferenceNumber() statement_parser = mt940.tags.Statement() - mt940_statement = mt940.models.Transactions( processors={ "pre_statement": [mt940.processors.add_currency_pre_processor("")], @@ -75,27 +72,18 @@ def mt940_processor_worker(statement_id: str): mt940_statement.parse(lob.statement_lob) account_statement.account_number = mt940_statement.data.get( - "account_number", "" + "account_identification", "" ) account_statement.reference_number = mt940_statement.data.get( - "reference", "" + "transaction_reference", "" ) - statement_number_and_sequence = mt940_statement.data.get( - "number", "" - ).split("/") - account_statement.statement_number = ( - statement_number_and_sequence[0] - if statement_number_and_sequence - else "" + account_statement.statement_number = mt940_statement.data.get( + "statement_number", "" ) - account_statement.sequence_number = ( - statement_number_and_sequence[1] - if len(statement_number_and_sequence) > 1 - else "" + account_statement.sequence_number = mt940_statement.data.get( + "sequence_number", "" ) - - # TODO: Refactor code - + _logger.info("Parsed account statement header") # Get the benefit program configuration benefit_program_configuration = ( session.query(BenefitProgramConfiguration) @@ -105,11 +93,10 @@ def mt940_processor_worker(statement_id: str): ) .first() ) - if not benefit_program_configuration: account_statement.statement_process_status = ProcessStatus.ERROR account_statement.statement_process_error_code = ( - G2PBridgeErrorCodes.INVALID_ACCOUNT_NUMBER + G2PBridgeErrorCodes.INVALID_ACCOUNT_NUMBER.value ) account_statement.statement_process_timestamp = datetime.utcnow() account_statement.statement_process_attempts += 1 @@ -125,7 +112,7 @@ def mt940_processor_worker(statement_id: str): # Parsing transactions parsed_transactions = [] entry_sequence = 0 - for transaction in mt940_statement.transactions: + for transaction in mt940_statement: entry_sequence += 1 debit_credit_indicator = transaction.data["status"] @@ -139,7 +126,6 @@ def mt940_processor_worker(statement_id: str): parsed_transactions.append(parsed_transaction) # End of for loop of mt940 statement transactions - disbursement_error_recons = [] disbursement_recons = [] for parsed_transaction in parsed_transactions: @@ -198,11 +184,18 @@ def mt940_processor_worker(statement_id: str): disbursement_recon = construct_new_disbursement_recon( bank_disbursement_batch_id, parsed_transaction, + statement_id, + account_statement.statement_number, + account_statement.sequence_number, ) disbursement_recons.append(disbursement_recon) elif parsed_transaction["debit_credit_indicator"] == "RD": update_existing_disbursement_recon( - disbursement_recon, parsed_transaction + disbursement_recon, + parsed_transaction, + statement_id, + account_statement.statement_number, + account_statement.sequence_number, ) disbursement_recons.append(disbursement_recon) @@ -218,8 +211,15 @@ def mt940_processor_worker(statement_id: str): session.add_all(disbursement_recons) session.add_all(disbursement_error_recons) session.commit() + _logger.info( + f"Processed account statement for account number: {account_statement.account_number}" + ) except Exception as e: + _logger.error( + f"Error processing account statement for statement id: {statement_id}" + f" with error: {str(e)}", + ) account_statement.statement_process_status = ProcessStatus.PENDING account_statement.statement_process_error_code = str(e) account_statement.statement_process_timestamp = datetime.utcnow() @@ -238,20 +238,21 @@ def construct_disbursement_error_recon(parsed_transaction, g2p_bridge_error_code entry_date=parsed_transaction["remittance_entry_date"], value_date=parsed_transaction["remittance_value_date"], error_reason=g2p_bridge_error_code, + active=True, ) -def update_existing_disbursement_recon(disbursement_recon, parsed_transaction): +def update_existing_disbursement_recon( + disbursement_recon, + parsed_transaction, + statement_id, + statement_number, + statement_sequence, +): disbursement_recon.reversal_found = True - disbursement_recon.reversal_statement_id = parsed_transaction[ - "reversal_statement_number" - ] - disbursement_recon.reversal_statement_number = parsed_transaction[ - "reversal_statement_number" - ] - disbursement_recon.reversal_statement_sequence = parsed_transaction[ - "reversal_statement_sequence" - ] + disbursement_recon.reversal_statement_id = statement_id + disbursement_recon.reversal_statement_number = statement_number + disbursement_recon.reversal_statement_sequence = statement_sequence disbursement_recon.reversal_entry_sequence = parsed_transaction[ "reversal_entry_sequence" ] @@ -260,20 +261,25 @@ def update_existing_disbursement_recon(disbursement_recon, parsed_transaction): disbursement_recon.reversal_reason = parsed_transaction["reversal_reason"] -def construct_new_disbursement_recon(bank_disbursement_batch_id, parsed_transaction): +def construct_new_disbursement_recon( + bank_disbursement_batch_id, + parsed_transaction, + statement_id, + statement_number, + statement_sequence, +): disbursement_recon = DisbursementRecon( bank_disbursement_batch_id=bank_disbursement_batch_id, disbursement_id=parsed_transaction["disbursement_id"], beneficiary_name_from_bank=parsed_transaction["beneficiary_name_from_bank"], remittance_reference_number=parsed_transaction["remittance_reference_number"], - remittance_statement_id=parsed_transaction["remittance_statement_number"], - remittance_statement_number=parsed_transaction["remittance_statement_number"], - remittance_statement_sequence=parsed_transaction[ - "remittance_statement_sequence" - ], + remittance_statement_id=statement_id, + remittance_statement_number=statement_number, + remittance_statement_sequence=statement_sequence, remittance_entry_sequence=parsed_transaction["remittance_entry_sequence"], remittance_entry_date=parsed_transaction["remittance_entry_date"], remittance_value_date=parsed_transaction["remittance_value_date"], + active=True, ) return disbursement_recon @@ -293,15 +299,11 @@ def construct_parsed_transaction( remittance_reference_number, customer_reference, narratives ) beneficiary_name_from_bank = None - remittance_statement_number = None - remittance_statement_sequence = None remittance_entry_sequence = None remittance_entry_date = None remittance_value_date = None reversal_found = False - reversal_statement_number = None - reversal_statement_sequence = None reversal_entry_sequence = None reversal_entry_date = None reversal_value_date = None @@ -312,16 +314,12 @@ def construct_parsed_transaction( beneficiary_name_from_bank = bank_connector.retrieve_beneficiary_name( narratives ) - remittance_statement_number = transaction.data["statement_number"] - remittance_statement_sequence = transaction.data["sequence_number"] remittance_entry_sequence = entry_sequence remittance_entry_date = transaction.data["entry_date"] remittance_value_date = transaction.data["date"] if debit_credit_indicator == "RD": reversal_found = True - reversal_statement_number = transaction.data["statement_number"] - reversal_statement_sequence = transaction.data["sequence_number"] reversal_entry_sequence = entry_sequence reversal_entry_date = transaction.data["entry_date"] reversal_value_date = transaction.data["date"] @@ -329,23 +327,19 @@ def construct_parsed_transaction( parsed_transaction.update( { - disbursement_id: disbursement_id, - transaction_amount: transaction_amount, - debit_credit_indicator: debit_credit_indicator, - beneficiary_name_from_bank: beneficiary_name_from_bank, - remittance_reference_number: remittance_reference_number, - remittance_statement_number: remittance_statement_number, - remittance_statement_sequence: remittance_statement_sequence, - remittance_entry_sequence: remittance_entry_sequence, - remittance_entry_date: remittance_entry_date, - remittance_value_date: remittance_value_date, - reversal_found: reversal_found, - reversal_statement_number: reversal_statement_number, - reversal_statement_sequence: reversal_statement_sequence, - reversal_entry_sequence: reversal_entry_sequence, - reversal_entry_date: reversal_entry_date, - reversal_value_date: reversal_value_date, - reversal_reason: reversal_reason, + "disbursement_id": disbursement_id, + "transaction_amount": transaction_amount, + "debit_credit_indicator": debit_credit_indicator, + "beneficiary_name_from_bank": beneficiary_name_from_bank, + "remittance_reference_number": remittance_reference_number, + "remittance_entry_sequence": remittance_entry_sequence, + "remittance_entry_date": remittance_entry_date, + "remittance_value_date": remittance_value_date, + "reversal_found": reversal_found, + "reversal_entry_sequence": reversal_entry_sequence, + "reversal_entry_date": reversal_entry_date, + "reversal_value_date": reversal_value_date, + "reversal_reason": reversal_reason, } ) return parsed_transaction From 3d964979989c8306c1471e65a83da7d308afcbb2 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Tue, 6 Aug 2024 17:24:29 +0530 Subject: [PATCH 06/14] Pre-commit fixes --- .../src/openg2p_g2p_bridge_api/services/account_statement.py | 2 +- openg2p-g2p-bridge-celery-beat-producers/.gitignore | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py index ff4eb78..15ffa31 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py @@ -39,7 +39,7 @@ async def upload_mt940(self, statement_file: UploadFile) -> str: statement_lob = AccountStatementLob( statement_id=statement_id, - statement_lob=str(statement_file.decode('utf-8')), + statement_lob=str(statement_file.decode("utf-8")), active=True, ) session.add(statement_lob) diff --git a/openg2p-g2p-bridge-celery-beat-producers/.gitignore b/openg2p-g2p-bridge-celery-beat-producers/.gitignore index c1c2ea5..380911c 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/.gitignore +++ b/openg2p-g2p-bridge-celery-beat-producers/.gitignore @@ -79,4 +79,4 @@ docs/_build/ # Ignore secret files and env .secrets.* .env -*.db \ No newline at end of file +*.db From 5f1450f9345386967f5aa9d59f408554e08858e1 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Wed, 7 Aug 2024 17:13:21 +0530 Subject: [PATCH 07/14] Fix disbursement id not being unique inside the loop --- .../src/openg2p_g2p_bridge_api/services/disbursement.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py index b17ed81..19185a4 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py @@ -141,7 +141,7 @@ async def construct_disbursements( disbursements: List[Disbursement] = [] for disbursement_payload in disbursement_payloads: disbursement = Disbursement( - disbursement_id=str(int(time.time() * 1000)), + disbursement_id=str(int(time.time() * 1000000)), disbursement_envelope_id=str( disbursement_payload.disbursement_envelope_id ), From 1a57fd97ad49855c03fef3715597be5c9a3a3f62 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Wed, 7 Aug 2024 17:13:46 +0530 Subject: [PATCH 08/14] Fix initiate payment request data --- .../bank_connectors/example_bank_connector.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py index e7e9d1f..cbc25c1 100644 --- a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py +++ b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py @@ -143,7 +143,7 @@ def initiate_payment( ) bank_payment_payloads.append(bank_payment_payload.model_dump()) - request_data = {"initiate_payment_payloads": bank_payment_payloads} + request_data = bank_payment_payloads response = client.post( _config.funds_disbursement_url_example_bank, json=request_data @@ -165,7 +165,7 @@ def retrieve_disbursement_id( return customer_reference def retrieve_beneficiary_name(self, narratives: str) -> str: - return narratives[0] + return narratives[3] def retrieve_reversal_reason(self, narratives: str) -> str: - return narratives[1] + return narratives[5] From ac5789a03eaf060ed137dfad29b810e124012517 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Wed, 7 Aug 2024 17:14:37 +0530 Subject: [PATCH 09/14] Process debit and debit reversals separately --- .../tasks/mt940_processor.py | 150 ++++++++++++------ 1 file changed, 102 insertions(+), 48 deletions(-) diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py index 0ac75c3..5195d97 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py @@ -110,86 +110,107 @@ def mt940_processor_worker(statement_id: str): ) # Parsing transactions - parsed_transactions = [] + parsed_transactions_d = [] + parsed_transactions_rd = [] entry_sequence = 0 for transaction in mt940_statement: entry_sequence += 1 debit_credit_indicator = transaction.data["status"] - if debit_credit_indicator in ["D", "RD"]: + if debit_credit_indicator in ["D"]: parsed_transaction = construct_parsed_transaction( bank_connector, debit_credit_indicator, entry_sequence, transaction, ) - parsed_transactions.append(parsed_transaction) + parsed_transactions_d.append(parsed_transaction) + + if debit_credit_indicator in ["RD"]: + parsed_transaction = construct_parsed_transaction( + bank_connector, + debit_credit_indicator, + entry_sequence, + transaction, + ) + parsed_transactions_rd.append(parsed_transaction) # End of for loop of mt940 statement transactions disbursement_error_recons = [] disbursement_recons = [] - for parsed_transaction in parsed_transactions: - bank_disbursement_batch_id = ( - session.query(DisbursementBatchControl) - .filter( - DisbursementBatchControl.disbursement_id - == parsed_transaction["disbursement_id"] - ) - .first() - .bank_disbursement_batch_id - ) + + # Process only debit transactions + for parsed_transaction in parsed_transactions_d: + bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session) if not bank_disbursement_batch_id: disbursement_error_recons.append( construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, parsed_transaction, G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, ) ) continue - disbursement_recon = ( - session.query(DisbursementRecon) - .filter( - DisbursementRecon.disbursement_id - == parsed_transaction["disbursement_id"] - ) - .first() - ) + disbursement_recon = get_disbursement_recon(parsed_transaction, session) - if ( - disbursement_recon - and parsed_transaction["debit_credit_indicator"] == "D" - ): + if disbursement_recon: disbursement_error_recons.append( construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, parsed_transaction, G2PBridgeErrorCodes.DUPLICATE_DISBURSEMENT, ) ) continue - if ( - not disbursement_recon - and parsed_transaction["debit_credit_indicator"] == "RD" - ): + disbursement_recon = construct_new_disbursement_recon( + bank_disbursement_batch_id, + parsed_transaction, + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + ) + disbursement_recons.append(disbursement_recon) + + # End of for loop for parsed transactions - debit + session.add_all(disbursement_recons) + session.add_all(disbursement_error_recons) + + # Start processing reversal transactions - rd + for parsed_transaction in parsed_transactions_rd: + bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session) + + if not bank_disbursement_batch_id: disbursement_error_recons.append( construct_disbursement_error_recon( - parsed_transaction, G2PBridgeErrorCodes.INVALID_REVERSAL + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + parsed_transaction, + G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, ) ) continue - if parsed_transaction["debit_credit_indicator"] == "D": - disbursement_recon = construct_new_disbursement_recon( - bank_disbursement_batch_id, - parsed_transaction, - statement_id, - account_statement.statement_number, - account_statement.sequence_number, + disbursement_recon = get_disbursement_recon(parsed_transaction, session) + + if not disbursement_recon: + disbursement_error_recons.append( + construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + parsed_transaction, + G2PBridgeErrorCodes.INVALID_REVERSAL, + ) ) - disbursement_recons.append(disbursement_recon) - elif parsed_transaction["debit_credit_indicator"] == "RD": + else: update_existing_disbursement_recon( disbursement_recon, parsed_transaction, @@ -199,7 +220,9 @@ def mt940_processor_worker(statement_id: str): ) disbursement_recons.append(disbursement_recon) - # End of for loop for parsed transactions + # End of for loop for parsed transactions - rd + session.add_all(disbursement_recons) + session.add_all(disbursement_error_recons) # Update account statement with parsed data account_statement.statement_process_status = ProcessStatus.PROCESSED @@ -208,8 +231,7 @@ def mt940_processor_worker(statement_id: str): account_statement.statement_process_attempts += 1 session.add(account_statement) - session.add_all(disbursement_recons) - session.add_all(disbursement_error_recons) + session.commit() _logger.info( f"Processed account statement for account number: {account_statement.account_number}" @@ -225,19 +247,51 @@ def mt940_processor_worker(statement_id: str): account_statement.statement_process_timestamp = datetime.utcnow() account_statement.statement_process_attempts += 1 session.commit() + raise e + + +def get_disbursement_recon(parsed_transaction, session): + disbursement_recon = ( + session.query(DisbursementRecon) + .filter( + DisbursementRecon.disbursement_id + == parsed_transaction["disbursement_id"] + ) + .first() + ) + return disbursement_recon + +def get_bank_batch_id(parsed_transaction, session): + bank_disbursement_batch_id = ( + session.query(DisbursementBatchControl) + .filter( + DisbursementBatchControl.disbursement_id + == parsed_transaction["disbursement_id"] + ) + .first() + .bank_disbursement_batch_id + ) + return bank_disbursement_batch_id -def construct_disbursement_error_recon(parsed_transaction, g2p_bridge_error_code): + +def construct_disbursement_error_recon( + statement_id, + statement_number, + statement_sequence, + parsed_transaction, + g2p_bridge_error_code, +): return DisbursementErrorRecon( - disbursement_id="", - bank_reference_number=parsed_transaction["remittance_reference_number"], - statement_id=parsed_transaction["remittance_statement_number"], - statement_number=parsed_transaction["remittance_statement_number"], - statement_sequence=parsed_transaction["remittance_statement_sequence"], + statement_id=statement_id, + statement_number=statement_number, + statement_sequence=statement_sequence, entry_sequence=parsed_transaction["remittance_entry_sequence"], entry_date=parsed_transaction["remittance_entry_date"], value_date=parsed_transaction["remittance_value_date"], error_reason=g2p_bridge_error_code, + disbursement_id=parsed_transaction["disbursement_id"], + bank_reference_number=parsed_transaction["remittance_reference_number"], active=True, ) From df71fa6c8d839d33f0a4c938b75db5a0d73ad2f8 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Wed, 7 Aug 2024 17:14:55 +0530 Subject: [PATCH 10/14] Add disbursement_id to DisbursementPaymentPayload --- .../tasks/disburse_funds_from_bank.py | 1 + 1 file changed, 1 insertion(+) diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py index 5975706..029fd55 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py @@ -110,6 +110,7 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): payment_payloads.append( DisbursementPaymentPayload( + disbursement_id=disbursement.disbursement_id, remitting_account=benefit_program_configuration.sponsor_bank_account_number, remitting_account_currency=benefit_program_configuration.sponsor_bank_account_currency, payment_amount=disbursement.disbursement_amount, From 0a0e0ba812e7e41e8d752ece4ae09f744e8eb5d6 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Wed, 7 Aug 2024 17:15:33 +0530 Subject: [PATCH 11/14] Remove unique constraints in DisbursementBatchControl bank_disbursement_batch_id and mapper_resolution_batch_id --- .../src/openg2p_g2p_bridge_models/models/disbursement.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement.py index f6ffe14..dedd13e 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement.py @@ -49,10 +49,10 @@ class DisbursementBatchControl(BaseORMModelWithTimes): disbursement_envelope_id: Mapped[str] = mapped_column(String, index=True) beneficiary_id: Mapped[str] = mapped_column(String) bank_disbursement_batch_id = mapped_column( - UUID, nullable=True, default=None, index=True, unique=True + UUID, nullable=True, default=None, index=True ) mapper_resolution_batch_id = mapped_column( - UUID, nullable=True, default=None, index=True, unique=True + UUID, nullable=True, default=None, index=True ) From 8c204a125de1080fda45be2485a1f736056dbf77 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Wed, 7 Aug 2024 17:16:19 +0530 Subject: [PATCH 12/14] Update .env and config.py --- .../.env.example | 10 ++++++++++ .../config.py | 16 +++++----------- .../tasks/disburse_funds_from_bank.py | 1 - openg2p-g2p-bridge-celery-workers/.env.example | 9 ++++++--- .../openg2p_g2p_bridge_celery_workers/config.py | 6 ------ 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/openg2p-g2p-bridge-celery-beat-producers/.env.example b/openg2p-g2p-bridge-celery-beat-producers/.env.example index 6d3466b..95f705f 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/.env.example +++ b/openg2p-g2p-bridge-celery-beat-producers/.env.example @@ -8,3 +8,13 @@ G2P_BRIDGE_CELERY_TASKS_DB_DBNAME=openg2p_g2p_bridge_db G2P_BRIDGE_BANK_DECONSTRUCT_STRATEGY="bank_(?P\d+)_(?P\d+)_(?P\d+)_(?P\w+)" G2P_BRIDGE_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P\d+)_(?P\w+)" G2P_BRIDGE_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P\w+)_(?P\w+)" +G2P_BRIDGE_MAPPER_RESOLVE_ATTEMPTS=3 +G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_ATTEMPTS=3 +G2P_BRIDGE_FUNDS_BLOCKED_ATTEMPTS=3 +G2P_BRIDGE_FUNDS_DISBURSEMENT_ATTEMPTS=3 +G2P_BRIDGE_STATEMENT_PROCESS_ATTEMPTS=3 +G2P_BRIDGE_MAPPER_RESOLVE_FREQUENCY=3600 +G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_FREQUENCY=3600 +G2P_BRIDGE_FUNDS_BLOCKED_FREQUENCY=3600 +G2P_BRIDGE_FUNDS_DISBURSEMENT_FREQUENCY=3600 +G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600 \ No newline at end of file diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py index 9e3915e..c3ce2a4 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py @@ -20,20 +20,14 @@ class Settings(BaseSettings): db_dbname: str = "openg2p_g2p_bridge_db" db_driver: str = "postgresql" - mapper_resolve_api_url: str = "" - mapper_resolve_attempts: int = 3 funds_available_check_attempts: int = 3 funds_blocked_attempts: int = 3 funds_disbursement_attempts: int = 3 statement_process_attempts: int = 3 - mapper_resolve_frequency: int = 10 - funds_available_check_frequency: int = 10 - funds_blocked_frequency: int = 10 - funds_disbursement_frequency: int = 10 - mt940_processor_frequency: int = 10 - - bank_fa_deconstruct_strategy: str = "" - mobile_wallet_deconstruct_strategy: str = "" - email_wallet_deconstruct_strategy: str = "" + mapper_resolve_frequency: int = 3600 + funds_available_check_frequency: int = 3600 + funds_blocked_frequency: int = 3600 + funds_disbursement_frequency: int = 3600 + mt940_processor_frequency: int = 3600 diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py index ae17c81..e91ece9 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py @@ -48,7 +48,6 @@ def disburse_funds_from_bank_beat_producer(): .scalars() .all() ) - for envelope in envelopes: pending_batches = ( session.execute( diff --git a/openg2p-g2p-bridge-celery-workers/.env.example b/openg2p-g2p-bridge-celery-workers/.env.example index 6d3466b..d729b54 100644 --- a/openg2p-g2p-bridge-celery-workers/.env.example +++ b/openg2p-g2p-bridge-celery-workers/.env.example @@ -5,6 +5,9 @@ G2P_BRIDGE_CELERY_TASKS_PORT=8001 G2P_BRIDGE_CELERY_TASKS_WORKER_TYPE=gunicorn G2P_BRIDGE_CELERY_TASKS_NO_OF_WORKERS=1 G2P_BRIDGE_CELERY_TASKS_DB_DBNAME=openg2p_g2p_bridge_db -G2P_BRIDGE_BANK_DECONSTRUCT_STRATEGY="bank_(?P\d+)_(?P\d+)_(?P\d+)_(?P\w+)" -G2P_BRIDGE_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P\d+)_(?P\w+)" -G2P_BRIDGE_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P\w+)_(?P\w+)" +G2P_BRIDGE_CELERY_TASKS_BANK_FA_DECONSTRUCT_STRATEGY="bank_(?P\d+)_(?P\d+)_(?P\d+)_(?P\w+)" +G2P_BRIDGE_CELERY_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P\d+)_(?P\w+)" +G2P_BRIDGE_CELERY_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P\w+)_(?P\w+)" +G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_AVAILABLE_CHECK_URL_EXAMPLE_BANK="http://127.0.0.1:8003/check_funds" +G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_BLOCK_URL_EXAMPLE_BANK="http://127.0.0.1:8003/block_funds" +G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_DISBURSEMENT_URL_EXAMPLE_BANK="http://127.0.0.1:8003/initiate_payment" diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/config.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/config.py index 6eea2fd..19e6504 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/config.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/config.py @@ -22,12 +22,6 @@ class Settings(BaseSettings): mapper_resolve_api_url: str = "" - mapper_resolve_attempts: int = 3 - funds_available_check_attempts: int = 3 - funds_blocked_attempts: int = 3 - funds_disbursement_attempts: int = 3 - statement_process_attempts: int = 3 - bank_fa_deconstruct_strategy: str = "" mobile_wallet_deconstruct_strategy: str = "" email_wallet_deconstruct_strategy: str = "" From 8e85cfd1bb8d60eebb6bf23be46dcc197757c6e4 Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Wed, 7 Aug 2024 17:27:49 +0530 Subject: [PATCH 13/14] Add logging to all methods --- .../.env.example | 2 +- .../tasks/block_funds_with_bank.py | 1 + .../tasks/check_funds_with_bank_task.py | 7 ++++++- .../tasks/disburse_funds_from_bank.py | 8 ++++++++ .../tasks/mapper_resolution_task.py | 7 ++++++- .../tasks/mt940_processor.py | 6 ++++++ .../helpers/resolve_helper.py | 19 +++++++++++++++---- .../tasks/check_funds_with_bank_task.py | 5 ++++- .../tasks/disburse_funds_from_bank.py | 12 ++++++++++++ .../tasks/mapper_resolution_task.py | 6 ++++++ .../tasks/mt940_processor.py | 14 ++++++++++---- 11 files changed, 75 insertions(+), 12 deletions(-) diff --git a/openg2p-g2p-bridge-celery-beat-producers/.env.example b/openg2p-g2p-bridge-celery-beat-producers/.env.example index 95f705f..389735a 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/.env.example +++ b/openg2p-g2p-bridge-celery-beat-producers/.env.example @@ -17,4 +17,4 @@ G2P_BRIDGE_MAPPER_RESOLVE_FREQUENCY=3600 G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_FREQUENCY=3600 G2P_BRIDGE_FUNDS_BLOCKED_FREQUENCY=3600 G2P_BRIDGE_FUNDS_DISBURSEMENT_FREQUENCY=3600 -G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600 \ No newline at end of file +G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600 diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py index e424be8..fbcd519 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py @@ -73,4 +73,5 @@ def block_funds_with_bank_beat_producer(): args=(envelope.disbursement_envelope_id,), queue="g2p_bridge_celery_worker_tasks", ) + _logger.info("Completed checking for envelopes to block funds with bank") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py index 2d855d6..8383c82 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py @@ -20,8 +20,8 @@ @celery_app.task(name="check_funds_with_bank_beat_producer") def check_funds_with_bank_beat_producer(): + _logger.info("Checking funds with bank") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) - with session_maker() as session: envelopes = ( session.execute( @@ -62,8 +62,13 @@ def check_funds_with_bank_beat_producer(): ) for envelope in envelopes: + _logger.info( + f"Sending task to check funds with bank for envelope {envelope.disbursement_envelope_id}" + ) celery_app.send_task( "check_funds_with_bank_worker", args=(envelope.disbursement_envelope_id,), queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Checking funds with bank beat tasks push completed") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py index e91ece9..930a03a 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py @@ -22,6 +22,7 @@ @celery_app.task(name="disburse_funds_from_bank_beat_producer") def disburse_funds_from_bank_beat_producer(): + _logger.info("Running disburse_funds_from_bank_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: envelopes = ( @@ -67,8 +68,15 @@ def disburse_funds_from_bank_beat_producer(): ) for batch in pending_batches: + _logger.info( + f"Sending task to disburse funds for batch {batch.bank_disbursement_batch_id}" + ) celery_app.send_task( "disburse_funds_from_bank_worker", (batch.bank_disbursement_batch_id,), queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info( + f"Sent tasks to disburse funds for {len(pending_batches)} batches" + ) diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py index f4fa723..0c8d928 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py @@ -17,8 +17,8 @@ @celery_app.task(name="mapper_resolution_beat_producer") def mapper_resolution_beat_producer(): + _logger.info("Running mapper_resolution_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) - with session_maker() as session: mapper_resolution_batch_statuses = ( session.execute( @@ -36,8 +36,13 @@ def mapper_resolution_beat_producer(): ) for mapper_resolution_batch_status in mapper_resolution_batch_statuses: + _logger.info( + f"Sending mapper_resolution_worker task for mapper_resolution_batch_id: {mapper_resolution_batch_status.mapper_resolution_batch_id}" + ) celery_app.send_task( "mapper_resolution_worker", args=[mapper_resolution_batch_status.mapper_resolution_batch_id], queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Finished mapper_resolution_beat_producer") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py index e8d0ff3..c21d38d 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py @@ -17,6 +17,7 @@ @celery_app.task(name="mt940_processor_beat_producer") def mt940_processor_beat_producer(): + _logger.info("Running mt940_processor_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: account_statements = ( @@ -35,8 +36,13 @@ def mt940_processor_beat_producer(): ) for statement in account_statements: + _logger.info( + f"Sending mt940_processor_worker task for statement_id: {statement.statement_id}" + ) celery_app.send_task( "mt940_processor_worker", args=[statement.statement_id], queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Finished mt940_processor_beat_producer") diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py index 4c4bf2e..d3db5d0 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py @@ -3,6 +3,7 @@ import uuid from datetime import datetime from typing import List +import logging from openg2p_fastapi_common.service import BaseService from openg2p_g2p_bridge_models.models import MapperResolvedFaType @@ -17,7 +18,7 @@ from ..config import Settings _config = Settings.get_config() - +_logger = logging.getLogger(_config.logging_default_logger_name) class FAKeys(enum.Enum): account_number = "account_number" @@ -38,17 +39,20 @@ class KeyValuePair(BaseModel): class ResolveHelper(BaseService): def construct_single_resolve_request(self, id: str) -> SingleResolveRequest: + _logger.info(f"Constructing single resolve request for ID: {id}") single_resolve_request = SingleResolveRequest( reference_id=str(uuid.uuid4()), timestamp=datetime.now(), id=id, scope="details", ) + _logger.info(f"Constructed single resolve request for ID: {id}") return single_resolve_request def construct_resolve_request( self, single_resolve_requests: List[SingleResolveRequest] ) -> ResolveRequest: + _logger.info(f"Constructing resolve request for {len(single_resolve_requests)} single resolve requests") resolve_request_message = ResolveRequestMessage( transaction_id=str(uuid.uuid4()), resolve_request=single_resolve_requests, @@ -66,10 +70,11 @@ def construct_resolve_request( ), message=resolve_request_message, ) - + _logger.info(f"Constructed resolve request for {len(single_resolve_requests)} single resolve requests") return resolve_request def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]: + _logger.info(f"Deconstructing ID/FA: {value}") regex_res = re.match(strategy, value) deconstructed_list = [] if regex_res: @@ -79,24 +84,30 @@ def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]: KeyValuePair(key=k, value=v) for k, v in regex_res.items() ] except Exception as e: + _logger.error(f"Error while deconstructing ID/FA: {e}") raise ValueError("Error while deconstructing ID/FA") from e + _logger.info(f"Deconstructed ID/FA: {value}") return deconstructed_list def deconstruct_fa(self, fa: str) -> dict: - deconstruct_strategy = self.get_deconstruct_strategy(fa) + _logger.info(f"Deconstructing FA: {fa}") + deconstruct_strategy = self._get_deconstruct_strategy(fa) if deconstruct_strategy: deconstructed_pairs = self._deconstruct(fa, deconstruct_strategy) deconstructed_fa = { pair.key.value: pair.value for pair in deconstructed_pairs } + _logger.info(f"Deconstructed FA: {fa}") return deconstructed_fa return {} - def get_deconstruct_strategy(self, fa: str) -> str: + def _get_deconstruct_strategy(self, fa: str) -> str: + _logger.info(f"Getting deconstruction strategy for FA: {fa}") if fa.endswith(MapperResolvedFaType.BANK_ACCOUNT.value): return _config.bank_fa_deconstruct_strategy elif fa.endswith(MapperResolvedFaType.MOBILE_WALLET.value): return _config.mobile_wallet_fa_deconstruct_strategy elif fa.endswith(MapperResolvedFaType.EMAIL_WALLET.value): return _config.email_wallet_fa_deconstruct_strategy + _logger.info(f"Deconstruction strategy not found for FA: {fa}") return "" diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py index 80433b9..b8cd118 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py @@ -22,6 +22,7 @@ @celery_app.task(name="check_funds_with_bank_worker") def check_funds_with_bank_worker(disbursement_envelope_id: str): + _logger.info(f"Checking funds with bank for envelope: {disbursement_envelope_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -102,5 +103,7 @@ def check_funds_with_bank_worker(disbursement_envelope_id: str): e ) disbursement_envelope_batch_status.funds_available_attempts += 1 - + _logger.info( + f"Checked funds with bank for envelope: {disbursement_envelope_id}" + ) session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py index 029fd55..5c2bb71 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py @@ -29,6 +29,7 @@ @celery_app.task(name="disburse_funds_from_bank_worker") def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): + _logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -42,6 +43,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not batch_status: + _logger.error( + f"Bank Disbursement Batch Status not found for batch: {bank_disbursement_batch_id}" + ) return disbursement_envelope_id = batch_status.disbursement_envelope_id @@ -55,6 +59,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not envelope: + _logger.error( + f"Disbursement Envelope not found for envelope: {disbursement_envelope_id}" + ) return envelope_batch_status = ( @@ -67,6 +74,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not envelope_batch_status: + _logger.error( + f"Disbursement Envelope Batch Status not found for envelope: {disbursement_envelope_id}" + ) return disbursement_batch_controls = ( @@ -167,9 +177,11 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): batch_status.disbursement_attempts += 1 except Exception as e: + _logger.error(f"Error disbursing funds with bank: {str(e)}") batch_status.disbursement_status = ProcessStatus.PENDING.value batch_status.disbursement_timestamp = datetime.utcnow() batch_status.latest_error_code = str(e) batch_status.disbursement_attempts += 1 + _logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id} completed") session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py index 1e6a59b..942a956 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py @@ -24,6 +24,7 @@ @celery_app.task(name="mapper_resolution_worker") def mapper_resolution_worker(mapper_resolution_batch_id: str): + _logger.info(f"Resolving the mapper resolution batch: {mapper_resolution_batch_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -52,6 +53,7 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): loop.close() if not resolve_response: + _logger.error(f"Failed to resolve the request: {error_msg}") session.query(MapperResolutionBatchStatus).filter( MapperResolutionBatchStatus.mapper_resolution_batch_id == mapper_resolution_batch_id @@ -72,6 +74,7 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): async def make_resolve_request(disbursement_batch_controls): + _logger.info("Making resolve request") resolve_helper = ResolveHelper.get_component() single_resolve_requests = [ @@ -88,6 +91,7 @@ async def make_resolve_request(disbursement_batch_controls): ) return resolve_response, None except Exception as e: + _logger.error(f"Failed to resolve the request: {e}") error_msg = f"Failed to resolve the request: {e}" return None, error_msg @@ -95,6 +99,7 @@ async def make_resolve_request(disbursement_batch_controls): def process_and_store_resolution( mapper_resolution_batch_id, resolve_response, beneficiary_disbursement_map ): + _logger.info("Processing and storing resolution") resolve_helper = ResolveHelper.get_component() session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -157,4 +162,5 @@ def process_and_store_resolution( + 1, } ) + _logger.info("Stored the resolution") session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py index 5195d97..146521f 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py @@ -94,6 +94,9 @@ def mt940_processor_worker(statement_id: str): .first() ) if not benefit_program_configuration: + _logger.error( + f"Benefit program configuration not found for account number: {account_statement.account_number}" + ) account_statement.statement_process_status = ProcessStatus.ERROR account_statement.statement_process_error_code = ( G2PBridgeErrorCodes.INVALID_ACCOUNT_NUMBER.value @@ -141,7 +144,9 @@ def mt940_processor_worker(statement_id: str): # Process only debit transactions for parsed_transaction in parsed_transactions_d: - bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session) + bank_disbursement_batch_id = get_bank_batch_id( + parsed_transaction, session + ) if not bank_disbursement_batch_id: disbursement_error_recons.append( @@ -184,7 +189,9 @@ def mt940_processor_worker(statement_id: str): # Start processing reversal transactions - rd for parsed_transaction in parsed_transactions_rd: - bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session) + bank_disbursement_batch_id = get_bank_batch_id( + parsed_transaction, session + ) if not bank_disbursement_batch_id: disbursement_error_recons.append( @@ -254,8 +261,7 @@ def get_disbursement_recon(parsed_transaction, session): disbursement_recon = ( session.query(DisbursementRecon) .filter( - DisbursementRecon.disbursement_id - == parsed_transaction["disbursement_id"] + DisbursementRecon.disbursement_id == parsed_transaction["disbursement_id"] ) .first() ) From 5d6f13e8fa782bb737fbed87e9a085b7ba5a4aea Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Wed, 7 Aug 2024 17:31:44 +0530 Subject: [PATCH 14/14] Add logging to all methods --- .../bank_connectors/example_bank_connector.py | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py index cbc25c1..6a82900 100644 --- a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py +++ b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py @@ -1,4 +1,5 @@ from typing import List, Optional +import logging import httpx from openg2p_g2p_bridge_models.models import ( @@ -18,7 +19,7 @@ from ..config import Settings _config = Settings.get_config() - +_logger = logging.getLogger(_config.logging_default_logger_name) class BankPaymentPayload(BaseModel): payment_reference_number: str @@ -52,6 +53,9 @@ class BankPaymentPayload(BaseModel): class ExampleBankConnector(BankConnectorInterface): def check_funds(self, account_number, currency, amount) -> CheckFundsResponse: + _logger.info( + f"Checking funds availability for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) try: with httpx.Client() as client: request_data = { @@ -66,18 +70,30 @@ def check_funds(self, account_number, currency, amount) -> CheckFundsResponse: data = response.json() if data["status"] == "success": + _logger.info( + f"Funds available for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return CheckFundsResponse( status=FundsAvailableWithBankEnum.FUNDS_AVAILABLE, error_code="" ) + _logger.info( + f"Funds not available for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return CheckFundsResponse( status=FundsAvailableWithBankEnum.FUNDS_NOT_AVAILABLE, error_code="" ) except httpx.HTTPStatusError as e: + _logger.error( + f"Error checking funds availability for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return CheckFundsResponse( status=FundsAvailableWithBankEnum.PENDING_CHECK, error_code=str(e) ) def block_funds(self, account_number, currency, amount) -> BlockFundsResponse: + _logger.info( + f"Blocking funds for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) try: with httpx.Client() as client: request_data = { @@ -92,17 +108,26 @@ def block_funds(self, account_number, currency, amount) -> BlockFundsResponse: data = response.json() if data["status"] == "success": + _logger.info( + f"Funds blocked for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return BlockFundsResponse( status=FundsBlockedWithBankEnum.FUNDS_BLOCK_SUCCESS, block_reference_no=data["block_reference_no"], error_code="", ) + _logger.error( + f"Funds block failed for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return BlockFundsResponse( status=FundsBlockedWithBankEnum.FUNDS_BLOCK_FAILURE, block_reference_no="", error_code=data.get("error_code", ""), ) except httpx.HTTPStatusError as e: + _logger.error( + f"Error blocking funds for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return BlockFundsResponse( status=FundsBlockedWithBankEnum.FUNDS_BLOCK_FAILURE, block_reference_no="", @@ -112,6 +137,7 @@ def block_funds(self, account_number, currency, amount) -> BlockFundsResponse: def initiate_payment( self, disbursement_payment_payloads: List[DisbursementPaymentPayload] ) -> PaymentResponse: + _logger.info(f"Initiating payment for {len(disbursement_payment_payloads)} disbursements") try: with httpx.Client() as client: bank_payment_payloads = [] @@ -152,11 +178,14 @@ def initiate_payment( data = response.json() if data["status"] == "success": + _logger.info(f"Payment initiated successfully") return PaymentResponse(status=PaymentStatus.SUCCESS, error_code="") + _logger.error(f"Payment initiation failed") return PaymentResponse( status=PaymentStatus.ERROR, error_code=data.get("error_message", "") ) except httpx.HTTPStatusError as e: + _logger.error(f"Error initiating payment: {e}") return PaymentResponse(status=PaymentStatus.ERROR, error_code=str(e)) def retrieve_disbursement_id(