Skip to content
This repository has been archived by the owner on Apr 17, 2023. It is now read-only.

Commit

Permalink
de-duplicate TriggerMetricCollection events by detecting whether metr…
Browse files Browse the repository at this point in the history
…ic collection process already started
  • Loading branch information
lfryc committed Mar 17, 2016
1 parent 34e4835 commit c8dd6d6
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@
*/
package org.jboss.aerogear.unifiedpush.message;

import java.util.Collection;

import javax.ejb.Stateless;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

import org.jboss.aerogear.unifiedpush.api.PushMessageInformation;
import org.jboss.aerogear.unifiedpush.api.Variant;
import org.jboss.aerogear.unifiedpush.api.VariantMetricInformation;
import org.jboss.aerogear.unifiedpush.message.event.TriggerMetricCollection;
import org.jboss.aerogear.unifiedpush.message.event.TriggerVariantMetricCollection;
import org.jboss.aerogear.unifiedpush.message.holder.MessageHolderWithTokens;
import org.jboss.aerogear.unifiedpush.message.jms.Dequeue;
import org.jboss.aerogear.unifiedpush.message.jms.DispatchToQueue;
Expand All @@ -28,14 +37,6 @@
import org.jboss.aerogear.unifiedpush.message.sender.SenderTypeLiteral;
import org.jboss.aerogear.unifiedpush.utils.AeroGearLogger;

import javax.ejb.Stateless;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import java.util.Collection;

/**
* Receives a request for dispatching push notifications to specified devices from {@link TokenLoader}
*
Expand All @@ -56,7 +57,7 @@ public class NotificationDispatcher {

@Inject
@DispatchToQueue
private Event<TriggerMetricCollection> triggerMetricCollection;
private Event<TriggerVariantMetricCollection> triggerVariantMetricCollection;

/**
* Receives a {@link UnifiedPushMessage} and list of device tokens that the message should be sent to, selects appropriate sender implementation that
Expand Down Expand Up @@ -124,6 +125,6 @@ private void updateStatusOfPushMessageInformation(final PushMessageInformation p
variantMetricInformation.setServedBatches(1);

dispatchVariantMetricEvent.fire(variantMetricInformation);
triggerMetricCollection.fire(new TriggerMetricCollection(pushMessageInformation));
triggerVariantMetricCollection.fire(new TriggerVariantMetricCollection(pushMessageInformation.getId(), variantID));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@
*/
package org.jboss.aerogear.unifiedpush.message;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

import javax.ejb.Stateless;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

import org.jboss.aerogear.unifiedpush.api.PushMessageInformation;
import org.jboss.aerogear.unifiedpush.api.Variant;
import org.jboss.aerogear.unifiedpush.api.VariantMetricInformation;
import org.jboss.aerogear.unifiedpush.api.VariantType;
Expand All @@ -24,7 +37,7 @@
import org.jboss.aerogear.unifiedpush.message.configuration.SenderConfiguration;
import org.jboss.aerogear.unifiedpush.message.event.AllBatchesLoadedEvent;
import org.jboss.aerogear.unifiedpush.message.event.BatchLoadedEvent;
import org.jboss.aerogear.unifiedpush.message.event.TriggerMetricCollection;
import org.jboss.aerogear.unifiedpush.message.event.TriggerVariantMetricCollection;
import org.jboss.aerogear.unifiedpush.message.holder.MessageHolderWithTokens;
import org.jboss.aerogear.unifiedpush.message.holder.MessageHolderWithVariants;
import org.jboss.aerogear.unifiedpush.message.jms.Dequeue;
Expand All @@ -33,17 +46,6 @@
import org.jboss.aerogear.unifiedpush.service.ClientInstallationService;
import org.jboss.aerogear.unifiedpush.utils.AeroGearLogger;

import javax.ejb.Stateless;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

/**
* Receives a request for sending a push message to given variants from {@link NotificationRouter}.
*
Expand Down Expand Up @@ -77,7 +79,7 @@ public class TokenLoader {

@Inject
@DispatchToQueue
private Event<TriggerMetricCollection> triggerMetricCollection;
private Event<TriggerVariantMetricCollection> triggerVariantMetricCollection;

@Inject
@DispatchToQueue
Expand All @@ -104,6 +106,7 @@ public void loadAndQueueTokenBatch(@Observes @Dequeue MessageHolderWithVariants
final Collection<Variant> variants = msg.getVariants();
final String lastTokenFromPreviousBatch = msg.getLastTokenFromPreviousBatch();
final SenderConfiguration configuration = senderConfiguration.select(new SenderTypeLiteral(variantType)).get();
final PushMessageInformation pushMessageInformation = msg.getPushMessageInformation();
int serialId = msg.getLastSerialId();

logger.fine("Received message from queue: " + message.getMessage().getAlert());
Expand Down Expand Up @@ -136,7 +139,7 @@ public void loadAndQueueTokenBatch(@Observes @Dequeue MessageHolderWithVariants

// using combined key of variant and PMI (AGPUSH-1585):
batchLoaded.fire(new BatchLoadedEvent(variant.getVariantID()+":"+msg.getPushMessageInformation().getId()));
triggerMetricCollection.fire(new TriggerMetricCollection(msg.getPushMessageInformation()));
triggerVariantMetricCollection.fire(new TriggerVariantMetricCollection(msg.getPushMessageInformation(), variant));
} else {
break;
}
Expand All @@ -146,11 +149,11 @@ public void loadAndQueueTokenBatch(@Observes @Dequeue MessageHolderWithVariants
logger.fine(String.format("Ending token loading transaction for %s variant (%s)", variant.getType().getTypeName(), variant.getVariantID()));
nextBatchEvent.fire(new MessageHolderWithVariants(msg.getPushMessageInformation(), message, msg.getVariantType(), variants, serialId, lastTokenInBatch));
} else {
logger.fine(String.format("All batches for %s variant were loaded (%s)", variant.getType().getTypeName(), msg.getPushMessageInformation().getId()));
logger.fine(String.format("All batches for %s variant were loaded (%s)", variant.getType().getTypeName(), pushMessageInformation.getId()));

// using combined key of variant and PMI (AGPUSH-1585):
allBatchesLoaded.fire(new AllBatchesLoadedEvent(variant.getVariantID()+":"+msg.getPushMessageInformation().getId()));
triggerMetricCollection.fire(new TriggerMetricCollection(msg.getPushMessageInformation()));
triggerVariantMetricCollection.fire(new TriggerVariantMetricCollection(pushMessageInformation, variant));

if (tokensLoaded == 0 && lastTokenFromPreviousBatch == null) {
// no tokens were loaded at all!
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* JBoss, Home of Professional Open Source
* Copyright Red Hat, Inc., and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jboss.aerogear.unifiedpush.message.event;

import java.io.Serializable;

public class MetricsProcessingStarted implements Serializable {

private static final long serialVersionUID = -5192336478703267019L;

private String pushMessageInformationId;

public MetricsProcessingStarted(String pushMessageInformationId) {
this.pushMessageInformationId = pushMessageInformationId;
}

public String getPushMessageInformationId() {
return pushMessageInformationId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* JBoss, Home of Professional Open Source
* Copyright Red Hat, Inc., and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jboss.aerogear.unifiedpush.message.event;

import java.io.Serializable;

import org.jboss.aerogear.unifiedpush.api.PushMessageInformation;
import org.jboss.aerogear.unifiedpush.api.Variant;
import org.jboss.aerogear.unifiedpush.message.MetricsCollector;

/**
* Event that triggers {@link MetricsCollector} processing.
*
* @see {@link org.jboss.aerogear.unifiedpush.message.jms.TriggerMetricCollectionConsumer}
*/
public class TriggerVariantMetricCollection implements Serializable {

private static final long serialVersionUID = 1036025116554796512L;

public static final long REDELIVERY_DELAY_MS = 1000L;

private String pushMessageInformationId;
private String variantID;

public TriggerVariantMetricCollection(PushMessageInformation pushMessageInformation, Variant variant) {
this.pushMessageInformationId = pushMessageInformation.getId();
this.variantID = variant.getVariantID();
}

public TriggerVariantMetricCollection(String pushMessageInformationId, String variantID) {
this.pushMessageInformationId = pushMessageInformationId;
this.variantID = variantID;
}

public String getPushMessageInformationId() {
return pushMessageInformationId;
}

public String getVariantID() {
return variantID;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.Queue;
import javax.jms.Topic;

import org.jboss.aerogear.unifiedpush.message.event.AllBatchesLoadedEvent;
import org.jboss.aerogear.unifiedpush.message.event.BatchLoadedEvent;
import org.jboss.aerogear.unifiedpush.message.event.MetricsProcessingStarted;
import org.jboss.aerogear.unifiedpush.message.event.TriggerMetricCollection;
import org.jboss.aerogear.unifiedpush.message.event.TriggerVariantMetricCollection;
import org.jboss.aerogear.unifiedpush.message.util.JmsClient;

/**
Expand All @@ -40,6 +43,12 @@ public class CdiJmsBridge {
@Resource(mappedName = "java:/queue/TriggerMetricCollectionQueue")
private Queue triggerMetricCollectionQueue;

@Resource(mappedName = "java:/queue/TriggerVariantMetricCollectionQueue")
private Queue triggerVariantMetricCollectionQueue;

@Resource(mappedName = "java:/topic/MetricsProcessingStartedTopic")
private Topic metricsProcessingStartedTopic;

@Inject
private JmsClient jmsClient;

Expand All @@ -63,4 +72,14 @@ public void queueMessage(@Observes @DispatchToQueue TriggerMetricCollection msg)
.withDelayedDelivery(500L)
.to(triggerMetricCollectionQueue);
}

public void queueMessage(@Observes @DispatchToQueue TriggerVariantMetricCollection msg) {
jmsClient.send(msg)
.to(triggerVariantMetricCollectionQueue);
}

public void broadcastMessage(@Observes @DispatchToQueue MetricsProcessingStarted msg) {
jmsClient.send(msg)
.to(metricsProcessingStartedTopic);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* JBoss, Home of Professional Open Source
* Copyright Red Hat, Inc., and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jboss.aerogear.unifiedpush.message.jms;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.ejb.Stateless;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.JMSException;

import org.jboss.aerogear.unifiedpush.api.PushMessageInformation;
import org.jboss.aerogear.unifiedpush.api.VariantMetricInformation;
import org.jboss.aerogear.unifiedpush.message.event.MetricsProcessingStarted;
import org.jboss.aerogear.unifiedpush.message.event.TriggerMetricCollection;
import org.jboss.aerogear.unifiedpush.message.event.TriggerVariantMetricCollection;
import org.jboss.aerogear.unifiedpush.service.metrics.PushMessageMetricsService;
import org.jboss.aerogear.unifiedpush.utils.AeroGearLogger;

@Stateless
public class MetricCollectionTrigger {

private final AeroGearLogger logger = AeroGearLogger.getInstance(MetricCollectionTrigger.class);

private static final Set<String> METRICS_PROCESSING_STARTED = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

@Inject @DispatchToQueue
private Event<MetricsProcessingStarted> broadcastMetricsProcessingStarted;

@Inject @DispatchToQueue
private Event<TriggerMetricCollection> triggerMetricCollection;

@Inject
private PushMessageMetricsService metricsService;

public void tryToStartMetricCollection(@Observes @Dequeue TriggerVariantMetricCollection event) throws JMSException {
final String pushMessageInformationId = event.getPushMessageInformationId();

if (!METRICS_PROCESSING_STARTED.contains(pushMessageInformationId)) {
if (detectMetricsProcessingStartedFromDB(pushMessageInformationId)) {
logger.fine(String.format("Detected that metrics collection already started from DB state for push message %s", pushMessageInformationId));
METRICS_PROCESSING_STARTED.add(pushMessageInformationId);
} else {
if (!METRICS_PROCESSING_STARTED.contains(pushMessageInformationId)) { // re-check after DB read
METRICS_PROCESSING_STARTED.add(pushMessageInformationId);
logger.fine(String.format("Broadcasting information that metrics processing started for push message %s", pushMessageInformationId));
broadcastMetricsProcessingStarted.fire(new MetricsProcessingStarted(pushMessageInformationId));
logger.fine(String.format("Trigger metric collection process for push message %s", pushMessageInformationId));
triggerMetricCollection.fire(new TriggerMetricCollection(pushMessageInformationId));
}
}
}
}

private boolean detectMetricsProcessingStartedFromDB(String pushMessageInformationId) {
PushMessageInformation pmi = metricsService.getPushMessageInformation(pushMessageInformationId);
if (pmi.getServedVariants() > 0) {
return true;
}
for (VariantMetricInformation vmi : pmi.getVariantInformations()) {
if (vmi.getServedBatches() > 0 || vmi.getTotalBatches() > 0) {
return true;
}
}
return false;
}

public void markMetricsProcessingAsStarted(@Observes @Dequeue MetricsProcessingStarted event) throws JMSException {
logger.fine(String.format("Received signal that metrics collection started for push message %s", event.getPushMessageInformationId()));
METRICS_PROCESSING_STARTED.add(event.getPushMessageInformationId());
}
}
Loading

0 comments on commit c8dd6d6

Please sign in to comment.