Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2833 from matrix-org/rav/pusher_hacks
Browse files Browse the repository at this point in the history
Logging and metrics for the http pusher
  • Loading branch information
richvdh authored Jan 31, 2018
2 parents 421d68c + 03dd745 commit d1fe4db
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,30 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.push import PusherConfigException
import logging

from twisted.internet import defer, reactor
from twisted.internet.error import AlreadyCalled, AlreadyCancelled

import logging
import push_rule_evaluator
import push_tools

import synapse
from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)

metrics = synapse.metrics.get_metrics_for(__name__)

http_push_processed_counter = metrics.register_counter(
"http_pushes_processed",
)

http_push_failed_counter = metrics.register_counter(
"http_pushes_failed",
)


class HttpPusher(object):
INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
Expand Down Expand Up @@ -152,9 +161,16 @@ def _unsafe_process(self):
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)

logger.info(
"Processing %i unprocessed push actions for %s starting at "
"stream_ordering %s",
len(unprocessed), self.name, self.last_stream_ordering,
)

for push_action in unprocessed:
processed = yield self._process_one(push_action)
if processed:
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action['stream_ordering']
yield self.store.update_pusher_last_stream_ordering_and_success(
Expand All @@ -169,6 +185,7 @@ def _unsafe_process(self):
self.failing_since
)
else:
http_push_failed_counter.inc()
if not self.failing_since:
self.failing_since = self.clock.time_msec()
yield self.store.update_pusher_failing_since(
Expand Down Expand Up @@ -316,7 +333,10 @@ def dispatch_push(self, event, tweaks, badge):
try:
resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
except Exception:
logger.warn("Failed to push %s ", self.url)
logger.warn(
"Failed to push event %s to %s",
event.event_id, self.name, exc_info=True,
)
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
Expand All @@ -325,7 +345,7 @@ def dispatch_push(self, event, tweaks, badge):

@defer.inlineCallbacks
def _send_badge(self, badge):
logger.info("Sending updated badge count %d to %r", badge, self.user_id)
logger.info("Sending updated badge count %d to %s", badge, self.name)
d = {
'notification': {
'id': '',
Expand All @@ -347,7 +367,10 @@ def _send_badge(self, badge):
try:
resp = yield self.http_client.post_json_get_json(self.url, d)
except Exception:
logger.exception("Failed to push %s ", self.url)
logger.warn(
"Failed to send badge count to %s",
self.name, exc_info=True,
)
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
Expand Down

0 comments on commit d1fe4db

Please sign in to comment.