Skip to content

Commit

Permalink
Allow subscriptions for complete sitemaps (not limited to a single pa…
Browse files Browse the repository at this point in the history
…ge) (#3652)

* allow subscriptions for the complete sitemap

Signed-off-by: Tassilo Karge <[email protected]>
  • Loading branch information
TAKeanice authored May 1, 2024
1 parent 551c06b commit c430e6f
Show file tree
Hide file tree
Showing 4 changed files with 437 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -29,8 +30,8 @@
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.events.Event;
import org.openhab.core.events.EventSubscriber;
import org.openhab.core.io.rest.sitemap.internal.PageChangeListener;
import org.openhab.core.io.rest.sitemap.internal.SitemapEvent;
import org.openhab.core.io.rest.sitemap.internal.WidgetsChangeListener;
import org.openhab.core.items.GroupItem;
import org.openhab.core.items.Item;
import org.openhab.core.items.events.ItemStatePredictedEvent;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener
private static final String SITEMAP_PAGE_SEPARATOR = "#";
private static final String SITEMAP_SUFFIX = ".sitemap";
private static final int DEFAULT_MAX_SUBSCRIPTIONS = 50;
private static final Duration WAIT_AFTER_CREATE_SECONDS = Duration.ofSeconds(30);

private final Logger logger = LoggerFactory.getLogger(SitemapSubscriptionService.class);
private final BundleContext bundleContext;
Expand All @@ -88,7 +90,7 @@ public interface SitemapSubscriptionCallback {
private final List<SitemapProvider> sitemapProviders = new ArrayList<>();

/* subscription id -> sitemap+page */
private final Map<String, String> pageOfSubscription = new ConcurrentHashMap<>();
private final Map<String, String> scopeOfSubscription = new ConcurrentHashMap<>();

/* subscription id -> callback */
private final Map<String, SitemapSubscriptionCallback> callbacks = new ConcurrentHashMap<>();
Expand All @@ -112,7 +114,7 @@ public SitemapSubscriptionService(Map<String, Object> config, final @Reference I

@Deactivate
protected void deactivate() {
pageOfSubscription.clear();
scopeOfSubscription.clear();
callbacks.clear();
creationInstants.clear();
pageChangeListeners.values().forEach(l -> l.serviceRegistration.unregister());
Expand All @@ -139,12 +141,12 @@ private void applyConfig(Map<String, Object> config) {
}

@Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
protected void addSitemapProvider(SitemapProvider provider) {
public void addSitemapProvider(SitemapProvider provider) {
sitemapProviders.add(provider);
provider.addModelChangeListener(this);
}

protected void removeSitemapProvider(SitemapProvider provider) {
public void removeSitemapProvider(SitemapProvider provider) {
sitemapProviders.remove(provider);
provider.removeModelChangeListener(this);
}
Expand Down Expand Up @@ -176,10 +178,10 @@ protected void removeSitemapProvider(SitemapProvider provider) {
public void removeSubscription(String subscriptionId) {
creationInstants.remove(subscriptionId);
callbacks.remove(subscriptionId);
String sitemapPage = pageOfSubscription.remove(subscriptionId);
if (sitemapPage != null && !pageOfSubscription.containsValue(sitemapPage)) {
String sitemapWithPageId = scopeOfSubscription.remove(subscriptionId);
if (sitemapWithPageId != null && !scopeOfSubscription.containsValue(sitemapWithPageId)) {
// this was the only subscription listening on this page, so we can dispose the listener
ListenerRecord listener = pageChangeListeners.remove(sitemapPage);
ListenerRecord listener = pageChangeListeners.remove(sitemapWithPageId);
if (listener != null) {
listener.serviceRegistration().unregister();
}
Expand All @@ -204,7 +206,7 @@ public boolean exists(String subscriptionId) {
* @return the id of the currently active page or null if no page is currently set for the subscription
*/
public @Nullable String getPageId(String subscriptionId) {
String sitemapWithPageId = pageOfSubscription.get(subscriptionId);
String sitemapWithPageId = scopeOfSubscription.get(subscriptionId);
return (sitemapWithPageId == null) ? null : extractPageId(sitemapWithPageId);
}

Expand All @@ -215,86 +217,111 @@ public boolean exists(String subscriptionId) {
* @return the name of the current sitemap or null if no sitemap is currently set for the subscription
*/
public @Nullable String getSitemapName(String subscriptionId) {
String sitemapWithPageId = pageOfSubscription.get(subscriptionId);
String sitemapWithPageId = scopeOfSubscription.get(subscriptionId);
return (sitemapWithPageId == null) ? null : extractSitemapName(sitemapWithPageId);
}

private String extractSitemapName(String sitemapWithPageId) {
return sitemapWithPageId.split(SITEMAP_PAGE_SEPARATOR)[0];
}

private boolean isPageListener(String sitemapWithPageId) {
return sitemapWithPageId.contains(SITEMAP_PAGE_SEPARATOR);
}

private String extractPageId(String sitemapWithPageId) {
return sitemapWithPageId.split(SITEMAP_PAGE_SEPARATOR)[1];
}

/**
* Updates the subscription to send events for the provided page id.
* Updates the subscription to send events for the provided page id (or whole sitemap if pageId is null).
*
* @param subscriptionId the subscription to update
* @param sitemapName the current sitemap name
* @param pageId the current page id
* @param pageId the current page id or null for whole sitemap subscription
*/
public void setPageId(String subscriptionId, String sitemapName, String pageId) {
public void updateSubscriptionLocation(String subscriptionId, String sitemapName, @Nullable String pageId) {
SitemapSubscriptionCallback callback = callbacks.get(subscriptionId);
if (callback != null) {
String oldSitemapPage = pageOfSubscription.remove(subscriptionId);
if (oldSitemapPage != null) {
removeCallbackFromListener(oldSitemapPage, callback);
String oldSitemapWithPage = scopeOfSubscription.remove(subscriptionId);
if (oldSitemapWithPage != null) {
removeCallbackFromListener(oldSitemapWithPage, callback);
}
addCallbackToListener(sitemapName, pageId, callback);
pageOfSubscription.put(subscriptionId, getValue(sitemapName, pageId));
String scopeIdentifier = getScopeIdentifier(sitemapName, pageId);
scopeOfSubscription.put(subscriptionId, scopeIdentifier);

logger.debug("Subscription {} changed to page {} of sitemap {} ({} active subscriptions}", subscriptionId,
pageId, sitemapName, callbacks.size());
logger.debug("Subscription {} changed to {} ({} active subscriptions}", subscriptionId, scopeIdentifier,
callbacks.size());
} else {
throw new IllegalArgumentException("Subscription " + subscriptionId + " does not exist!");
}
}

private void addCallbackToListener(String sitemapName, String pageId, SitemapSubscriptionCallback callback) {
ListenerRecord listener = pageChangeListeners.computeIfAbsent(getValue(sitemapName, pageId), v -> {
PageChangeListener newListener = new PageChangeListener(sitemapName, pageId, itemUIRegistry,
private void addCallbackToListener(String sitemapName, @Nullable String pageId,
SitemapSubscriptionCallback callback) {
String sitemapWithPageId = getScopeIdentifier(sitemapName, pageId);
ListenerRecord listener = pageChangeListeners.computeIfAbsent(sitemapWithPageId, v -> {
WidgetsChangeListener newListener = new WidgetsChangeListener(sitemapName, pageId, itemUIRegistry,
collectWidgets(sitemapName, pageId));
ServiceRegistration<?> registration = bundleContext.registerService(EventSubscriber.class.getName(),
newListener, null);
return new ListenerRecord(newListener, registration);
});
listener.pageChangeListener().addCallback(callback);
listener.widgetsChangeListener().addCallback(callback);
}

private EList<Widget> collectWidgets(String sitemapName, String pageId) {
public EList<Widget> collectWidgets(String sitemapName, @Nullable String pageId) {
EList<Widget> widgets = new BasicEList<>();

Sitemap sitemap = getSitemap(sitemapName);
if (sitemap != null) {
if (pageId.equals(sitemap.getName())) {
widgets = itemUIRegistry.getChildren(sitemap);
} else {
Widget pageWidget = itemUIRegistry.getWidget(sitemap, pageId);
if (pageWidget instanceof LinkableWidget widget) {
widgets = itemUIRegistry.getChildren(widget);
// We add the page widget. It will help any UI to update the page title.
widgets.add(pageWidget);
if (sitemap == null) {
// no sitemap found with the given name
return widgets;
}

if (pageId != null && !pageId.equals(sitemap.getName())) {
// subscribing to subpage of sitemap --> get all widgets from that page
Widget pageWidget = itemUIRegistry.getWidget(sitemap, pageId);
if (pageWidget instanceof LinkableWidget widget) {
widgets.addAll(itemUIRegistry.getChildren(widget));
// We add the page widget. It will help any UI to update the page title.
widgets.add(pageWidget);
}
} else {
// subscribing to main page --> get immediate children of sitemap
widgets.addAll(itemUIRegistry.getChildren(sitemap));
if (pageId == null) {
// subscribing to whole sitemap --> get items for all subpages as well
LinkedList<Widget> childrenQueue = new LinkedList<>(widgets);
while (!childrenQueue.isEmpty()) {
Widget child = childrenQueue.remove(0);
if (child instanceof LinkableWidget) {
List<Widget> subWidgets = itemUIRegistry.getChildren((LinkableWidget) child);
widgets.addAll(subWidgets);
childrenQueue.addAll(subWidgets);
}
}
}
}
logger.debug("Collected {} widgets for sitemap: {}, page id {}", widgets.size(), sitemapName, pageId);
return widgets;
}

private void removeCallbackFromListener(String sitemapPage, SitemapSubscriptionCallback callback) {
ListenerRecord oldListener = pageChangeListeners.get(sitemapPage);
if (oldListener != null) {
oldListener.pageChangeListener().removeCallback(callback);
if (!pageOfSubscription.containsValue(sitemapPage)) {
oldListener.widgetsChangeListener().removeCallback(callback);
if (!scopeOfSubscription.containsValue(sitemapPage)) {
// no other callbacks are left here, so we can safely dispose the listener
oldListener.serviceRegistration().unregister();
pageChangeListeners.remove(sitemapPage);
}
}
}

private String getValue(String sitemapName, String pageId) {
return sitemapName + SITEMAP_PAGE_SEPARATOR + pageId;
private String getScopeIdentifier(String sitemapName, @Nullable String pageId) {
return pageId == null ? sitemapName : sitemapName + SITEMAP_PAGE_SEPARATOR + pageId;
}

private @Nullable Sitemap getSitemap(String sitemapName) {
Expand All @@ -318,11 +345,16 @@ public void modelChanged(String modelName, EventType type) {
for (Entry<String, ListenerRecord> listenerEntry : pageChangeListeners.entrySet()) {
String sitemapWithPage = listenerEntry.getKey();
String sitemapName = extractSitemapName(sitemapWithPage);
String pageId = extractPageId(sitemapWithPage);

EList<Widget> widgets;
if (sitemapName.equals(changedSitemapName)) {
EList<Widget> widgets = collectWidgets(sitemapName, pageId);
listenerEntry.getValue().pageChangeListener().sitemapContentChanged(widgets);
if (isPageListener(sitemapWithPage)) {
String pageId = extractPageId(sitemapWithPage);
widgets = collectWidgets(sitemapName, pageId);
} else {
widgets = collectWidgets(sitemapName, null);
}
listenerEntry.getValue().widgetsChangeListener().sitemapContentChanged(widgets);
}
}
}
Expand All @@ -332,15 +364,16 @@ public void checkAliveClients() {
for (Entry<String, Instant> creationEntry : creationInstants.entrySet()) {
String subscriptionId = creationEntry.getKey();
SitemapSubscriptionCallback callback = callbacks.get(subscriptionId);
if (getPageId(subscriptionId) == null && callback != null
&& (creationEntry.getValue().plus(Duration.ofSeconds(30)).isBefore(Instant.now()))) {
logger.debug("Release subscription {} as sitemap page is not set", subscriptionId);
if (!scopeOfSubscription.containsKey(subscriptionId) && callback != null
&& (creationEntry.getValue().plus(WAIT_AFTER_CREATE_SECONDS).isBefore(Instant.now()))) {
logger.debug("Release subscription {} as it was not queried within {} seconds", subscriptionId,
WAIT_AFTER_CREATE_SECONDS);
removeSubscription(subscriptionId);
callback.onRelease(subscriptionId);
}
}
// Send an ALIVE event to all subscribers to trigger an exception for dead subscribers
pageChangeListeners.values().forEach(l -> l.pageChangeListener().sendAliveEvent());
pageChangeListeners.values().forEach(l -> l.widgetsChangeListener().sendAliveEvent());
}

@Override
Expand All @@ -359,20 +392,21 @@ public void receive(Event event) {
}
for (ListenerRecord listener : pageChangeListeners.values()) {
if (prediction.isConfirmation()) {
listener.pageChangeListener().keepCurrentState(item);
listener.widgetsChangeListener().keepCurrentState(item);
} else {
listener.pageChangeListener().changeStateTo(item, prediction.getPredictedState());
listener.widgetsChangeListener().changeStateTo(item, prediction.getPredictedState());
}
}
} else if (event instanceof ChannelDescriptionChangedEvent channelDescriptionChangedEvent) {
channelDescriptionChangedEvent.getLinkedItemNames().forEach(itemName -> {
for (ListenerRecord listener : pageChangeListeners.values()) {
listener.pageChangeListener().descriptionChanged(itemName);
listener.widgetsChangeListener().descriptionChanged(itemName);
}
});
}
}

private record ListenerRecord(PageChangeListener pageChangeListener, ServiceRegistration<?> serviceRegistration) {
private record ListenerRecord(WidgetsChangeListener widgetsChangeListener,
ServiceRegistration<?> serviceRegistration) {
}
}
Loading

0 comments on commit c430e6f

Please sign in to comment.