Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance for state update handling #3635

Merged
merged 4 commits into from
Jul 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.openhab.core.model.sitemap.sitemap.Widget;
import org.openhab.core.thing.events.ChannelDescriptionChangedEvent;
import org.openhab.core.ui.items.ItemUIRegistry;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener
private static final int DEFAULT_MAX_SUBSCRIPTIONS = 50;

private final Logger logger = LoggerFactory.getLogger(SitemapSubscriptionService.class);
private final BundleContext bundleContext;

public interface SitemapSubscriptionCallback {

Expand All @@ -94,25 +97,25 @@ public interface SitemapSubscriptionCallback {
private final Map<String, Instant> creationInstants = new ConcurrentHashMap<>();

/* sitemap+page -> listener */
private final Map<String, PageChangeListener> pageChangeListeners = new ConcurrentHashMap<>();
private final Map<String, ListenerRecord> pageChangeListeners = new ConcurrentHashMap<>();

/* Max number of subscriptions at the same time */
private int maxSubscriptions = DEFAULT_MAX_SUBSCRIPTIONS;

@Activate
public SitemapSubscriptionService(Map<String, Object> config, final @Reference ItemUIRegistry itemUIRegistry) {
applyConfig(config);
public SitemapSubscriptionService(Map<String, Object> config, final @Reference ItemUIRegistry itemUIRegistry,
BundleContext bundleContext) {
this.itemUIRegistry = itemUIRegistry;
this.bundleContext = bundleContext;
applyConfig(config);
}

@Deactivate
protected void deactivate() {
pageOfSubscription.clear();
callbacks.clear();
creationInstants.clear();
for (PageChangeListener listener : pageChangeListeners.values()) {
listener.dispose();
}
pageChangeListeners.values().forEach(l -> l.serviceRegistration.unregister());
pageChangeListeners.clear();
}

Expand Down Expand Up @@ -150,7 +153,7 @@ protected void removeSitemapProvider(SitemapProvider provider) {
* Creates a new subscription with the given id.
*
* @param callback an instance that should receive the events
* @returns a unique id that identifies the subscription or null if the limit of subscriptions is already reached
* @return a unique id that identifies the subscription or null if the limit of subscriptions is already reached
*/
public @Nullable String createSubscription(SitemapSubscriptionCallback callback) {
if (maxSubscriptions >= 0 && callbacks.size() >= maxSubscriptions) {
Expand All @@ -176,9 +179,9 @@ public void removeSubscription(String subscriptionId) {
String sitemapPage = pageOfSubscription.remove(subscriptionId);
if (sitemapPage != null && !pageOfSubscription.values().contains(sitemapPage)) {
// this was the only subscription listening on this page, so we can dispose the listener
PageChangeListener listener = pageChangeListeners.remove(sitemapPage);
ListenerRecord listener = pageChangeListeners.remove(sitemapPage);
if (listener != null) {
listener.dispose();
listener.serviceRegistration().unregister();
}
}
logger.debug("Removed subscription with id {} ({} active subscriptions)", subscriptionId, callbacks.size());
Expand Down Expand Up @@ -249,13 +252,14 @@ public void setPageId(String subscriptionId, String sitemapName, String pageId)
}

private void addCallbackToListener(String sitemapName, String pageId, SitemapSubscriptionCallback callback) {
PageChangeListener listener = pageChangeListeners.get(getValue(sitemapName, pageId));
if (listener == null) {
// there is no listener for this page yet, so let's try to create one
listener = new PageChangeListener(sitemapName, pageId, itemUIRegistry, collectWidgets(sitemapName, pageId));
pageChangeListeners.put(getValue(sitemapName, pageId), listener);
}
listener.addCallback(callback);
ListenerRecord listener = pageChangeListeners.computeIfAbsent(getValue(sitemapName, pageId), v -> {
PageChangeListener newListener = new PageChangeListener(sitemapName, pageId, itemUIRegistry,
collectWidgets(sitemapName, pageId));
ServiceRegistration<?> registration = bundleContext.registerService(EventSubscriber.class.getName(),
newListener, null);
return new ListenerRecord(newListener, registration);
});
listener.pageChangeListener().addCallback(callback);
}

private EList<Widget> collectWidgets(String sitemapName, String pageId) {
Expand All @@ -278,12 +282,12 @@ private EList<Widget> collectWidgets(String sitemapName, String pageId) {
}

private void removeCallbackFromListener(String sitemapPage, SitemapSubscriptionCallback callback) {
PageChangeListener oldListener = pageChangeListeners.get(sitemapPage);
ListenerRecord oldListener = pageChangeListeners.get(sitemapPage);
if (oldListener != null) {
oldListener.removeCallback(callback);
if (!pageOfSubscription.values().contains(sitemapPage)) {
oldListener.pageChangeListener().removeCallback(callback);
if (!pageOfSubscription.containsValue(sitemapPage)) {
// no other callbacks are left here, so we can safely dispose the listener
oldListener.dispose();
oldListener.serviceRegistration().unregister();
pageChangeListeners.remove(sitemapPage);
}
}
Expand Down Expand Up @@ -311,14 +315,14 @@ public void modelChanged(String modelName, EventType type) {

String changedSitemapName = modelName.substring(0, modelName.length() - SITEMAP_SUFFIX.length());

for (Entry<String, PageChangeListener> listenerEntry : pageChangeListeners.entrySet()) {
for (Entry<String, ListenerRecord> listenerEntry : pageChangeListeners.entrySet()) {
String sitemapWithPage = listenerEntry.getKey();
String sitemapName = extractSitemapName(sitemapWithPage);
String pageId = extractPageId(sitemapWithPage);

if (sitemapName.equals(changedSitemapName)) {
EList<Widget> widgets = collectWidgets(sitemapName, pageId);
listenerEntry.getValue().sitemapContentChanged(widgets);
listenerEntry.getValue().pageChangeListener().sitemapContentChanged(widgets);
}
}
}
Expand All @@ -336,9 +340,7 @@ public void checkAliveClients() {
}
}
// Send an ALIVE event to all subscribers to trigger an exception for dead subscribers
for (Entry<String, PageChangeListener> listenerEntry : pageChangeListeners.entrySet()) {
listenerEntry.getValue().sendAliveEvent();
}
pageChangeListeners.values().forEach(l -> l.pageChangeListener().sendAliveEvent());
}

@Override
Expand All @@ -355,19 +357,22 @@ public void receive(Event event) {
// members and predictions aren't really possible in that case (or at least would be highly complex).
return;
}
for (PageChangeListener pageChangeListener : pageChangeListeners.values()) {
for (ListenerRecord listener : pageChangeListeners.values()) {
if (prediction.isConfirmation()) {
pageChangeListener.keepCurrentState(item);
listener.pageChangeListener().keepCurrentState(item);
} else {
pageChangeListener.changeStateTo(item, prediction.getPredictedState());
listener.pageChangeListener().changeStateTo(item, prediction.getPredictedState());
}
}
} else if (event instanceof ChannelDescriptionChangedEvent channelDescriptionChangedEvent) {
channelDescriptionChangedEvent.getLinkedItemNames().forEach(itemName -> {
for (PageChangeListener pageChangeListener : pageChangeListeners.values()) {
pageChangeListener.descriptionChanged(itemName);
for (ListenerRecord listener : pageChangeListeners.values()) {
listener.pageChangeListener().descriptionChanged(itemName);
}
});
}
}

private record ListenerRecord(PageChangeListener pageChangeListener, ServiceRegistration<?> serviceRegistration) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.eclipse.emf.common.util.EList;
import org.openhab.core.common.ThreadPoolManager;
import org.openhab.core.events.Event;
import org.openhab.core.events.EventSubscriber;
import org.openhab.core.io.rest.core.item.EnrichedItemDTOMapper;
import org.openhab.core.io.rest.sitemap.SitemapSubscriptionService.SitemapSubscriptionCallback;
import org.openhab.core.items.GenericItem;
import org.openhab.core.items.GroupItem;
import org.openhab.core.items.Item;
import org.openhab.core.items.ItemNotFoundException;
import org.openhab.core.items.StateChangeListener;
import org.openhab.core.items.events.GroupStateUpdatedEvent;
import org.openhab.core.items.events.ItemEvent;
import org.openhab.core.items.events.ItemStateChangedEvent;
import org.openhab.core.library.CoreItemFactory;
import org.openhab.core.model.sitemap.sitemap.Chart;
import org.openhab.core.model.sitemap.sitemap.ColorArray;
Expand All @@ -45,7 +48,7 @@
* @author Kai Kreuzer - Initial contribution
* @author Laurent Garnier - Added support for icon color
*/
public class PageChangeListener implements StateChangeListener {
public class PageChangeListener implements EventSubscriber {

private static final int REVERT_INTERVAL = 300;
private final ScheduledExecutorService scheduler = ThreadPoolManager
Expand All @@ -55,6 +58,7 @@ public class PageChangeListener implements StateChangeListener {
private final ItemUIRegistry itemUIRegistry;
private EList<Widget> widgets;
private Set<Item> items;
private final HashSet<String> filterItems = new HashSet<>();
private final List<SitemapSubscriptionCallback> callbacks = Collections.synchronizedList(new ArrayList<>());
private Set<SitemapSubscriptionCallback> distinctCallbacks = Collections.emptySet();

Expand All @@ -75,23 +79,10 @@ public PageChangeListener(String sitemapName, String pageId, ItemUIRegistry item
}

private void updateItemsAndWidgets(EList<Widget> widgets) {
if (this.widgets != null) {
// cleanup statechange listeners in case widgets were removed
items = getAllItems(this.widgets);
for (Item item : items) {
if (item instanceof GenericItem genericItem) {
genericItem.removeStateChangeListener(this);
}
}
}

this.widgets = widgets;
items = getAllItems(widgets);
for (Item item : items) {
if (item instanceof GenericItem genericItem) {
genericItem.addStateChangeListener(this);
}
}
filterItems.clear();
filterItems.addAll(items.stream().map(Item::getName).collect(Collectors.toSet()));
}

public String getSitemapName() {
Expand All @@ -113,19 +104,6 @@ public void removeCallback(SitemapSubscriptionCallback callback) {
distinctCallbacks = new HashSet<>(callbacks);
}

/**
* Disposes this instance and releases all resources.
*/
public void dispose() {
for (Item item : items) {
if (item instanceof GenericItem genericItem) {
genericItem.removeStateChangeListener(this);
} else if (item instanceof GroupItem groupItem) {
groupItem.removeStateChangeListener(this);
}
}
}

/**
* Collects all items that are represented by a given list of widgets
*
Expand Down Expand Up @@ -182,26 +160,6 @@ private void constructAndSendEvents(Item item, State newState) {
}
}

@Override
public void stateChanged(Item item, State oldState, State newState) {
// For all items except group, send an event only when the event state is changed.
if (item instanceof GroupItem) {
return;
}
constructAndSendEvents(item, newState);
}

@Override
public void stateUpdated(Item item, State state) {
// For group item only, send an event each time the event state is updated.
// It allows updating the group label while the group state is unchanged,
// for example the count in label for Group:Switch:OR
if (!(item instanceof GroupItem)) {
return;
}
constructAndSendEvents(item, state);
}

public void keepCurrentState(Item item) {
scheduler.schedule(() -> {
constructAndSendEvents(item, item.getState());
Expand Down Expand Up @@ -338,4 +296,24 @@ private Set<SitemapEvent> constructSitemapEventsForUpdatedDescr(Item item, List<
}
return events;
}

@Override
public Set<String> getSubscribedEventTypes() {
return Set.of(ItemStateChangedEvent.TYPE, GroupStateUpdatedEvent.TYPE);
}

@Override
public void receive(Event event) {
if (event instanceof ItemEvent itemEvent && filterItems.contains(itemEvent.getItemName())) {
Item item = itemUIRegistry.get(itemEvent.getItemName());
if (item == null) {
return;
}
if (event instanceof GroupStateUpdatedEvent groupStateUpdatedEvent) {
constructAndSendEvents(item, groupStateUpdatedEvent.getItemState());
} else if (event instanceof ItemStateChangedEvent itemStateChangedEvent) {
constructAndSendEvents(item, itemStateChangedEvent.getItemState());
}
}
}
}
Loading