Skip to content

Commit

Permalink
Fix queue_service connection issues (#1843)
Browse files Browse the repository at this point in the history
* Fix queue_service connection issues

* Apply suggestions from code review

Co-authored-by: Uxío <[email protected]>

---------

Co-authored-by: Uxío <[email protected]>
  • Loading branch information
moisses89 and Uxio0 authored Jan 26, 2024
1 parent 5489845 commit f7bac16
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 20 deletions.
4 changes: 3 additions & 1 deletion config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@
"handlers": ["console", "mail_admins"],
"propagate": True,
},
"pika": {
"propagate": True if DEBUG else False,
},
},
}

Expand Down Expand Up @@ -518,7 +521,6 @@
# Events
# ------------------------------------------------------------------------------
EVENTS_QUEUE_URL = env("EVENTS_QUEUE_URL", default=None)
EVENTS_QUEUE_ASYNC_CONNECTION = env("EVENTS_QUEUE_ASYNC_CONNECTION", default=False)
EVENTS_QUEUE_EXCHANGE_NAME = env("EVENTS_QUEUE_EXCHANGE_NAME", default="amq.fanout")

# Cache
Expand Down
2 changes: 0 additions & 2 deletions config/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,3 @@
"level": "DEBUG",
}
}

EVENTS_QUEUE_ASYNC_CONNECTION = False
29 changes: 12 additions & 17 deletions safe_transaction_service/events/services/queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,29 @@ def connect(self) -> Optional[BlockingConnection]:
exchange_type=ExchangeType.fanout,
durable=True,
)
# Send messages if there was any missing
# self.send_unsent_events()
logger.debug("Opened connection to RabbitMQ")
return self.connection
except pika.exceptions.AMQPConnectionError:
except pika.exceptions.AMQPError:
logger.error("Cannot open connection to RabbitMQ")
return None

def is_connected(self) -> bool:
"""
:return: `True` if connected, `False` otherwise
"""
return self.connection and self.connection.is_open

def publish(self, message: str) -> bool:
def publish(self, message: str, retry: Optional[bool] = True) -> bool:
"""
:param message:
:param retry:
:return: `True` if message was published, `False` otherwise
"""
# Check if is still connected if not try to reconnect
if not self.is_connected() and not self.connect():
return False
try:
self.channel.basic_publish(
exchange=self.exchange_name, routing_key="", body=message
)
return True
except pika.exceptions.AMQPConnectionError:
except pika.exceptions.AMQPError:
if retry:
logger.info("The connection has been terminated, trying again.")
# One more chance
self.connect()
return self.publish(message, retry=False)
return False


Expand Down Expand Up @@ -116,7 +111,7 @@ def send_event(self, payload: Dict[str, Any]) -> int:
self.release_connection(broker_connection)
return self.send_unsent_events() + 1

logger.warning("Event can not be sent due any connection error")
logger.warning("Unable to send the event due to a connection error")
logger.debug("Adding %s to unsent messages", payload)
self.unsent_events.append(event)
return 0
Expand All @@ -137,15 +132,15 @@ def send_unsent_events(self) -> int:
self.unsent_events = []

total_sent_events = 0
logger.info("Sending %i not sent messages", len(unsent_events))
logger.info("Sending previously unsent messages: %i", len(unsent_events))
for unsent_message in unsent_events:
if broker_connection.publish(unsent_message):
total_sent_events += 1
else:
self.unsent_events.append(unsent_message)

self.release_connection(broker_connection)
logger.info("Sent %i not sent messages", total_sent_events)
logger.info("Correctly sent messages: %i", total_sent_events)
return total_sent_events

def clear_unsent_events(self):
Expand Down

0 comments on commit f7bac16

Please sign in to comment.