Skip to content
This repository has been archived by the owner on May 17, 2021. It is now read-only.

Fixes a thread leak in the polling delay code. Fixes a memory issues fo... #2213

Merged
merged 2 commits into from
Mar 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.openhab.io.rest.internal.cache;

import org.atmosphere.cache.BroadcastMessage;
import org.atmosphere.cache.CacheMessage;
import org.atmosphere.cache.UUIDBroadcasterCache;
import org.openhab.io.rest.internal.resources.beans.PageBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* UUIDBroadcasterCache that will enforce that only a single sitemap will exist
* in the cache for any given resource. This prevents leaks and other bad things
* from happening.
* @author Dan Cunningham
* @since 1.7.0
*/
public class SingleMessageBroadcastCache extends UUIDBroadcasterCache {

private final static Logger logger = LoggerFactory.getLogger(SingleMessageBroadcastCache.class);

@Override
public CacheMessage addToCache(String broadcasterId, String uuid, BroadcastMessage message) {
if(uuid != null && message.message() instanceof PageBean){
//remove previous message
retrieveFromCache(broadcasterId, uuid);
//add the new message
return super.addToCache(broadcasterId, uuid, message);
} else {
logger.trace("Not caching {}", message.message().getClass().getName());
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.openhab.io.rest.internal.filter;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.servlet.http.HttpServletRequest;

Expand All @@ -34,6 +36,8 @@
public class PollingDelayFilter implements PerRequestBroadcastFilter {
private static final Logger logger = LoggerFactory.getLogger(PollingDelayFilter.class);

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

@Override
public BroadcastAction filter(String broadcasterId, Object originalMessage, Object message) {
return new BroadcastAction(message);
Expand All @@ -47,12 +51,18 @@ public BroadcastAction filter(String broadcasterId, final AtmosphereResource res
boolean isItemMessage = originalMessage instanceof Item || originalMessage instanceof GroupItem;
boolean isStreamingTransport = ResponseTypeHelper.isStreamingTransport(request);

//strange atmosphere bug, seems harmless, but pollutes the logs
//so lets see if this fails or not first before we call it again.
try {
resource.getRequest().getPathInfo();
} catch (Exception e) {
return new BroadcastAction(ACTION.ABORT, message);
}
if(!isStreamingTransport && message instanceof PageBean && isItemMessage) {
final String delayedBroadcasterName = resource.getRequest().getPathInfo();
Executors.newSingleThreadExecutor().submit(new Runnable() {
executor.schedule(new Runnable() {
public void run() {
try {
Thread.sleep(300);
BroadcasterFactory broadcasterFactory = resource.getAtmosphereConfig().getBroadcasterFactory();
GeneralBroadcaster delayedBroadcaster = broadcasterFactory.lookup(GeneralBroadcaster.class, delayedBroadcasterName);
if(delayedBroadcaster != null)
Expand All @@ -61,7 +71,7 @@ public void run() {
logger.error("Could not broadcast message", e);
}
}
});
}, 300, TimeUnit.MILLISECONDS);
} else {
//pass message to next filter
return new BroadcastAction(ACTION.CONTINUE, message);
Expand All @@ -73,4 +83,4 @@ public void run() {
return new BroadcastAction(ACTION.ABORT, message);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,21 @@ public static ConcurrentMap<String, CacheEntry> getCachedEntries() {
return cachedEntries;
}

/**
* Configure what cache we want to use
* @param config
*/
public void configureCache(BroadcasterConfig config){
config.setBroadcasterCache(new UUIDBroadcasterCache());
config.getBroadcasterCache().configure(broadcaster.getBroadcasterConfig());
config.getBroadcasterCache().start();
}

public void registerItems(){
StartCacheExecutor();
BroadcasterConfig config = broadcaster.getBroadcasterConfig();

config.setBroadcasterCache(new UUIDBroadcasterCache());
config.getBroadcasterCache().configure(broadcaster.getBroadcasterConfig());
config.getBroadcasterCache().start();
configureCache(config);

addBroadcastFilter(config, new PerRequestBroadcastFilter() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.UriBuilder;

import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.BroadcasterCache;
import org.atmosphere.cpr.PerRequestBroadcastFilter;
import org.atmosphere.cpr.BroadcastFilter.BroadcastAction.ACTION;
import org.atmosphere.cpr.BroadcasterConfig;
import org.openhab.core.items.Item;
import org.openhab.io.rest.RESTApplication;
import org.openhab.io.rest.internal.cache.SingleMessageBroadcastCache;
import org.openhab.io.rest.internal.resources.ResponseTypeHelper;
import org.openhab.io.rest.internal.resources.SitemapResource;
import org.openhab.io.rest.internal.resources.beans.PageBean;
Expand Down Expand Up @@ -50,29 +48,10 @@ public class SitemapStateChangeListener extends ResourceStateChangeListener {
private static final Logger logger = LoggerFactory.getLogger(SitemapStateChangeListener.class);

@Override
public void registerItems() {
super.registerItems();
//if other filters have let this through then clear out any cached messages for the client.
//There should at most be only one message (version of the sitemap) in the cache for a client.
broadcaster.getBroadcasterConfig().addFilter(new PerRequestBroadcastFilter() {

@Override
public BroadcastAction filter(String broadcasterId,
Object originalMessage, Object message) {
return new BroadcastAction(message);
}

@Override
public BroadcastAction filter(String broadcasterId,
AtmosphereResource resource, Object originalMessage, Object message) {
//this will clear any cached messages before we add the new one
BroadcasterCache uuidCache = broadcaster.getBroadcasterConfig().getBroadcasterCache();
List<Object> entries = uuidCache.retrieveFromCache(broadcasterId, resource.uuid());
if(entries != null)
logger.trace("UUID {} had {} previous messages", resource.uuid(), entries.size());
return new BroadcastAction(ACTION.CONTINUE, message);
}
});
public void configureCache(BroadcasterConfig config){
config.setBroadcasterCache(new SingleMessageBroadcastCache());
config.getBroadcasterCache().configure(broadcaster.getBroadcasterConfig());
config.getBroadcasterCache().start();
}

@Override
Expand Down