Skip to content

Commit

Permalink
Add logging to all methods
Browse files Browse the repository at this point in the history
  • Loading branch information
PSNAppz committed Aug 7, 2024
1 parent 8c204a1 commit 8e85cfd
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 12 deletions.
2 changes: 1 addition & 1 deletion openg2p-g2p-bridge-celery-beat-producers/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ G2P_BRIDGE_MAPPER_RESOLVE_FREQUENCY=3600
G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_FREQUENCY=3600
G2P_BRIDGE_FUNDS_BLOCKED_FREQUENCY=3600
G2P_BRIDGE_FUNDS_DISBURSEMENT_FREQUENCY=3600
G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600
G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ def block_funds_with_bank_beat_producer():
args=(envelope.disbursement_envelope_id,),
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info("Completed checking for envelopes to block funds with bank")
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

@celery_app.task(name="check_funds_with_bank_beat_producer")
def check_funds_with_bank_beat_producer():
_logger.info("Checking funds with bank")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)

with session_maker() as session:
envelopes = (
session.execute(
Expand Down Expand Up @@ -62,8 +62,13 @@ def check_funds_with_bank_beat_producer():
)

for envelope in envelopes:
_logger.info(
f"Sending task to check funds with bank for envelope {envelope.disbursement_envelope_id}"
)
celery_app.send_task(
"check_funds_with_bank_worker",
args=(envelope.disbursement_envelope_id,),
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info("Checking funds with bank beat tasks push completed")
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

@celery_app.task(name="disburse_funds_from_bank_beat_producer")
def disburse_funds_from_bank_beat_producer():
_logger.info("Running disburse_funds_from_bank_beat_producer")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)
with session_maker() as session:
envelopes = (
Expand Down Expand Up @@ -67,8 +68,15 @@ def disburse_funds_from_bank_beat_producer():
)

for batch in pending_batches:
_logger.info(
f"Sending task to disburse funds for batch {batch.bank_disbursement_batch_id}"
)
celery_app.send_task(
"disburse_funds_from_bank_worker",
(batch.bank_disbursement_batch_id,),
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info(
f"Sent tasks to disburse funds for {len(pending_batches)} batches"
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

@celery_app.task(name="mapper_resolution_beat_producer")
def mapper_resolution_beat_producer():
_logger.info("Running mapper_resolution_beat_producer")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)

with session_maker() as session:
mapper_resolution_batch_statuses = (
session.execute(
Expand All @@ -36,8 +36,13 @@ def mapper_resolution_beat_producer():
)

for mapper_resolution_batch_status in mapper_resolution_batch_statuses:
_logger.info(
f"Sending mapper_resolution_worker task for mapper_resolution_batch_id: {mapper_resolution_batch_status.mapper_resolution_batch_id}"
)
celery_app.send_task(
"mapper_resolution_worker",
args=[mapper_resolution_batch_status.mapper_resolution_batch_id],
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info("Finished mapper_resolution_beat_producer")
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

@celery_app.task(name="mt940_processor_beat_producer")
def mt940_processor_beat_producer():
_logger.info("Running mt940_processor_beat_producer")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)
with session_maker() as session:
account_statements = (
Expand All @@ -35,8 +36,13 @@ def mt940_processor_beat_producer():
)

for statement in account_statements:
_logger.info(
f"Sending mt940_processor_worker task for statement_id: {statement.statement_id}"
)
celery_app.send_task(
"mt940_processor_worker",
args=[statement.statement_id],
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info("Finished mt940_processor_beat_producer")
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
from datetime import datetime
from typing import List
import logging

from openg2p_fastapi_common.service import BaseService
from openg2p_g2p_bridge_models.models import MapperResolvedFaType
Expand All @@ -17,7 +18,7 @@
from ..config import Settings

_config = Settings.get_config()

_logger = logging.getLogger(_config.logging_default_logger_name)

class FAKeys(enum.Enum):
account_number = "account_number"
Expand All @@ -38,17 +39,20 @@ class KeyValuePair(BaseModel):

class ResolveHelper(BaseService):
def construct_single_resolve_request(self, id: str) -> SingleResolveRequest:
_logger.info(f"Constructing single resolve request for ID: {id}")
single_resolve_request = SingleResolveRequest(
reference_id=str(uuid.uuid4()),
timestamp=datetime.now(),
id=id,
scope="details",
)
_logger.info(f"Constructed single resolve request for ID: {id}")
return single_resolve_request

def construct_resolve_request(
self, single_resolve_requests: List[SingleResolveRequest]
) -> ResolveRequest:
_logger.info(f"Constructing resolve request for {len(single_resolve_requests)} single resolve requests")
resolve_request_message = ResolveRequestMessage(
transaction_id=str(uuid.uuid4()),
resolve_request=single_resolve_requests,
Expand All @@ -66,10 +70,11 @@ def construct_resolve_request(
),
message=resolve_request_message,
)

_logger.info(f"Constructed resolve request for {len(single_resolve_requests)} single resolve requests")
return resolve_request

def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]:
_logger.info(f"Deconstructing ID/FA: {value}")
regex_res = re.match(strategy, value)
deconstructed_list = []
if regex_res:
Expand All @@ -79,24 +84,30 @@ def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]:
KeyValuePair(key=k, value=v) for k, v in regex_res.items()
]
except Exception as e:
_logger.error(f"Error while deconstructing ID/FA: {e}")
raise ValueError("Error while deconstructing ID/FA") from e
_logger.info(f"Deconstructed ID/FA: {value}")
return deconstructed_list

def deconstruct_fa(self, fa: str) -> dict:
deconstruct_strategy = self.get_deconstruct_strategy(fa)
_logger.info(f"Deconstructing FA: {fa}")
deconstruct_strategy = self._get_deconstruct_strategy(fa)
if deconstruct_strategy:
deconstructed_pairs = self._deconstruct(fa, deconstruct_strategy)
deconstructed_fa = {
pair.key.value: pair.value for pair in deconstructed_pairs
}
_logger.info(f"Deconstructed FA: {fa}")
return deconstructed_fa
return {}

def get_deconstruct_strategy(self, fa: str) -> str:
def _get_deconstruct_strategy(self, fa: str) -> str:
_logger.info(f"Getting deconstruction strategy for FA: {fa}")
if fa.endswith(MapperResolvedFaType.BANK_ACCOUNT.value):
return _config.bank_fa_deconstruct_strategy
elif fa.endswith(MapperResolvedFaType.MOBILE_WALLET.value):
return _config.mobile_wallet_fa_deconstruct_strategy
elif fa.endswith(MapperResolvedFaType.EMAIL_WALLET.value):
return _config.email_wallet_fa_deconstruct_strategy
_logger.info(f"Deconstruction strategy not found for FA: {fa}")
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

@celery_app.task(name="check_funds_with_bank_worker")
def check_funds_with_bank_worker(disbursement_envelope_id: str):
_logger.info(f"Checking funds with bank for envelope: {disbursement_envelope_id}")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)

with session_maker() as session:
Expand Down Expand Up @@ -102,5 +103,7 @@ def check_funds_with_bank_worker(disbursement_envelope_id: str):
e
)
disbursement_envelope_batch_status.funds_available_attempts += 1

_logger.info(
f"Checked funds with bank for envelope: {disbursement_envelope_id}"
)
session.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

@celery_app.task(name="disburse_funds_from_bank_worker")
def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str):
_logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id}")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)

with session_maker() as session:
Expand All @@ -42,6 +43,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str):
)

if not batch_status:
_logger.error(
f"Bank Disbursement Batch Status not found for batch: {bank_disbursement_batch_id}"
)
return

disbursement_envelope_id = batch_status.disbursement_envelope_id
Expand All @@ -55,6 +59,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str):
)

if not envelope:
_logger.error(
f"Disbursement Envelope not found for envelope: {disbursement_envelope_id}"
)
return

envelope_batch_status = (
Expand All @@ -67,6 +74,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str):
)

if not envelope_batch_status:
_logger.error(
f"Disbursement Envelope Batch Status not found for envelope: {disbursement_envelope_id}"
)
return

disbursement_batch_controls = (
Expand Down Expand Up @@ -167,9 +177,11 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str):
batch_status.disbursement_attempts += 1

except Exception as e:
_logger.error(f"Error disbursing funds with bank: {str(e)}")
batch_status.disbursement_status = ProcessStatus.PENDING.value
batch_status.disbursement_timestamp = datetime.utcnow()
batch_status.latest_error_code = str(e)
batch_status.disbursement_attempts += 1

_logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id} completed")
session.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

@celery_app.task(name="mapper_resolution_worker")
def mapper_resolution_worker(mapper_resolution_batch_id: str):
_logger.info(f"Resolving the mapper resolution batch: {mapper_resolution_batch_id}")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)

with session_maker() as session:
Expand Down Expand Up @@ -52,6 +53,7 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str):
loop.close()

if not resolve_response:
_logger.error(f"Failed to resolve the request: {error_msg}")
session.query(MapperResolutionBatchStatus).filter(
MapperResolutionBatchStatus.mapper_resolution_batch_id
== mapper_resolution_batch_id
Expand All @@ -72,6 +74,7 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str):


async def make_resolve_request(disbursement_batch_controls):
_logger.info("Making resolve request")
resolve_helper = ResolveHelper.get_component()

single_resolve_requests = [
Expand All @@ -88,13 +91,15 @@ async def make_resolve_request(disbursement_batch_controls):
)
return resolve_response, None
except Exception as e:
_logger.error(f"Failed to resolve the request: {e}")
error_msg = f"Failed to resolve the request: {e}"
return None, error_msg


def process_and_store_resolution(
mapper_resolution_batch_id, resolve_response, beneficiary_disbursement_map
):
_logger.info("Processing and storing resolution")
resolve_helper = ResolveHelper.get_component()
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)
with session_maker() as session:
Expand Down Expand Up @@ -157,4 +162,5 @@ def process_and_store_resolution(
+ 1,
}
)
_logger.info("Stored the resolution")
session.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def mt940_processor_worker(statement_id: str):
.first()
)
if not benefit_program_configuration:
_logger.error(
f"Benefit program configuration not found for account number: {account_statement.account_number}"
)
account_statement.statement_process_status = ProcessStatus.ERROR
account_statement.statement_process_error_code = (
G2PBridgeErrorCodes.INVALID_ACCOUNT_NUMBER.value
Expand Down Expand Up @@ -141,7 +144,9 @@ def mt940_processor_worker(statement_id: str):

# Process only debit transactions
for parsed_transaction in parsed_transactions_d:
bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session)
bank_disbursement_batch_id = get_bank_batch_id(
parsed_transaction, session
)

if not bank_disbursement_batch_id:
disbursement_error_recons.append(
Expand Down Expand Up @@ -184,7 +189,9 @@ def mt940_processor_worker(statement_id: str):

# Start processing reversal transactions - rd
for parsed_transaction in parsed_transactions_rd:
bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session)
bank_disbursement_batch_id = get_bank_batch_id(
parsed_transaction, session
)

if not bank_disbursement_batch_id:
disbursement_error_recons.append(
Expand Down Expand Up @@ -254,8 +261,7 @@ def get_disbursement_recon(parsed_transaction, session):
disbursement_recon = (
session.query(DisbursementRecon)
.filter(
DisbursementRecon.disbursement_id
== parsed_transaction["disbursement_id"]
DisbursementRecon.disbursement_id == parsed_transaction["disbursement_id"]
)
.first()
)
Expand Down

0 comments on commit 8e85cfd

Please sign in to comment.