diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index 2692c849..31765c82 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -11,7 +11,6 @@ import java.io.IOException import java.net.URLEncoder import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.TimeUnit -import kotlin.random.Random class ApiService { private val client = OkHttpClient.Builder() @@ -111,7 +110,7 @@ class ApiService { unifiedPushTopics: String, since: String?, user: User?, - notify: (topic: String, Notification) -> Unit, + notify: (Message) -> Unit, fail: (Exception) -> Unit ): Call { val sinceVal = since ?: "all" @@ -128,10 +127,8 @@ class ApiService { val source = response.body?.source() ?: throw Exception("Unexpected response for $url: body is empty") while (!source.exhausted()) { val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null") - val notification = parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) // subscriptionId to be set downstream - if (notification != null) { - notify(notification.topic, notification.notification) - } + val message = parser.parseMessage(line) + if (message != null) notify(message) } } catch (e: Exception) { Log.e(TAG, "Connection to $url failed (1): ${e.message}", e) @@ -175,6 +172,8 @@ class ApiService { // These constants have corresponding values in the server codebase! const val CONTROL_TOPIC = "~control" + const val EVENT_OPEN_PARAM_NEW_TOPIC = "new_topic" + const val EVENT_OPEN = "open" const val EVENT_MESSAGE = "message" const val EVENT_KEEPALIVE = "keepalive" const val EVENT_POLL_REQUEST = "poll_request" diff --git a/app/src/main/java/io/heckel/ntfy/msg/Message.kt b/app/src/main/java/io/heckel/ntfy/msg/Message.kt index 8de0ab85..45735170 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/Message.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/Message.kt @@ -16,7 +16,7 @@ data class Message( val icon: String?, val actions: List?, val title: String?, - val message: String, + val message: String?, val encoding: String?, val attachment: MessageAttachment?, ) diff --git a/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt b/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt index 81a60dba..93688782 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt @@ -13,13 +13,17 @@ import java.lang.reflect.Type class NotificationParser { private val gson = Gson() + fun parseMessage(s: String) : Message? { + return gson.fromJson(s, Message::class.java) + } + fun parse(s: String, subscriptionId: Long = 0, notificationId: Int = 0): Notification? { - val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId, notificationId = notificationId) + val message = parseMessage(s) ?: return null + val notificationWithTopic = parseNotificationWithTopic(message, subscriptionId = subscriptionId, notificationId = notificationId) return notificationWithTopic?.notification } - fun parseWithTopic(s: String, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? { - val message = gson.fromJson(s, Message::class.java) + fun parseNotificationWithTopic(message: Message, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? { if (message.event != ApiService.EVENT_MESSAGE) { return null } @@ -56,7 +60,7 @@ class NotificationParser { subscriptionId = subscriptionId, timestamp = message.time, title = message.title ?: "", - message = message.message, + message = message.message?: "", encoding = message.encoding ?: "", priority = toPriority(message.priority), tags = joinTags(message.tags), diff --git a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index 39fa0088..6a7806e9 100644 --- a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -3,6 +3,7 @@ package io.heckel.ntfy.service import io.heckel.ntfy.db.* import io.heckel.ntfy.util.Log import io.heckel.ntfy.msg.ApiService +import io.heckel.ntfy.msg.Message import io.heckel.ntfy.util.topicUrl import kotlinx.coroutines.* import okhttp3.Call @@ -16,7 +17,7 @@ class JsonConnection( private val user: User?, private val sinceId: String?, private val stateChangeListener: (Collection, ConnectionState) -> Unit, - private val notificationListener: (Subscription, Notification) -> Unit, + private val notificationListener: (ConnectionId, Message) -> String?, private val serviceActive: () -> Boolean ) : Connection { private val baseUrl = connectionId.baseUrl @@ -40,12 +41,8 @@ class JsonConnection( while (isActive && serviceActive()) { Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds") val startTime = System.currentTimeMillis() - val notify = notify@ { topic: String, notification: Notification -> - since = notification.id - val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify - val subscription = repository.getSubscription(subscriptionId) ?: return@notify - val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) - notificationListener(subscription, notificationWithSubscriptionId) + val notify = { message : Message -> + since = notificationListener(ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush), message)?: since } val failed = AtomicBoolean(false) val fail = { _: Exception -> diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt index 192cfc9f..ff3a64c9 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt @@ -15,9 +15,10 @@ import io.heckel.ntfy.R import io.heckel.ntfy.app.Application import io.heckel.ntfy.db.ConnectionState import io.heckel.ntfy.db.Repository -import io.heckel.ntfy.db.Subscription import io.heckel.ntfy.msg.ApiService +import io.heckel.ntfy.msg.Message import io.heckel.ntfy.msg.NotificationDispatcher +import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.ui.Colors import io.heckel.ntfy.ui.MainActivity import io.heckel.ntfy.util.Log @@ -28,6 +29,7 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import java.util.concurrent.ConcurrentHashMap +import kotlin.random.Random /** * The subscriber service manages the foreground service for instant delivery. @@ -67,6 +69,7 @@ class SubscriberService : Service() { private var notificationManager: NotificationManager? = null private var serviceNotification: Notification? = null private val refreshMutex = Mutex() // Ensure refreshConnections() is only run one at a time + private val parser = NotificationParser() override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { Log.d(TAG, "onStartCommand executed with startId: $startId") @@ -247,11 +250,39 @@ class SubscriberService : Service() { } } + private fun onConnectionOpen(connectionId: ConnectionId, message: String?) { + Log.d(TAG, "Received open from connection ${connectionId.baseUrl} with message: $message") + // this check is sufficient for now, and the message can be upgraded to include other parameters in the future + if (message?.contains(ApiService.EVENT_OPEN_PARAM_NEW_TOPIC) == true) { + GlobalScope.launch(Dispatchers.IO) { + for (topic in connectionId.topicsToSubscriptionIds.keys) { + if (connectionId.topicIsUnifiedPush[topic] == true) { + Log.d(TAG, "Attempting to re-register ${connectionId.baseUrl}/$topic") + io.heckel.ntfy.up.BroadcastReceiver.sendRegistration(baseContext, connectionId.baseUrl, topic) + // TODO is that the right context - looks like it works??? + } + } + } + } + } private fun onStateChanged(subscriptionIds: Collection, state: ConnectionState) { repository.updateState(subscriptionIds, state) } - private fun onNotificationReceived(subscription: Subscription, notification: io.heckel.ntfy.db.Notification) { + // Process messages received from the server, and dispatch a notification if required. + // Return the ID of the notification if successfully processed, else null. + private fun onNotificationReceived(connectionId: ConnectionId, message: Message) : String? { + if (message.event == ApiService.EVENT_OPEN) { + onConnectionOpen(connectionId, message.message) + return null + } + + val (topic, notificationWithoutId) = parser.parseNotificationWithTopic(message, notificationId = Random.nextInt(), subscriptionId = 0) + ?: return null // subscriptionId to be set downstream + val subscriptionId = connectionId.topicsToSubscriptionIds[topic] ?: return null + val subscription = repository.getSubscription(subscriptionId) ?: return null + val notification = notificationWithoutId.copy(subscriptionId = subscription.id) + // Wakelock while notifications are being dispatched // Wakelocks are reference counted by default so that should work neatly here wakeLock?.acquire(NOTIFICATION_RECEIVED_WAKELOCK_TIMEOUT_MILLIS) @@ -269,6 +300,7 @@ class SubscriberService : Service() { } } } + return notification.id } private fun createNotificationChannel(): NotificationManager? { diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt index 080e8482..fac744de 100644 --- a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -6,6 +6,7 @@ import android.os.Handler import android.os.Looper import io.heckel.ntfy.db.* import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder +import io.heckel.ntfy.msg.Message import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.topicShortUrl @@ -18,7 +19,6 @@ import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference -import kotlin.random.Random /** * Connect to ntfy server via WebSockets. This connection represents a single connection to a server, with @@ -36,7 +36,7 @@ class WsConnection( private val user: User?, private val sinceId: String?, private val stateChangeListener: (Collection, ConnectionState) -> Unit, - private val notificationListener: (Subscription, Notification) -> Unit, + private val notificationListener: (ConnectionId, Message) -> String?, private val alarmManager: AlarmManager ) : Connection { private val parser = NotificationParser() @@ -59,7 +59,8 @@ class WsConnection( private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush private val subscriptionIds = topicsToSubscriptionIds.values private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") - private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value }.keys.joinToString(separator = ",") + private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value +}.keys.joinToString(separator = ",") private val shortUrl = topicShortUrl(baseUrl, topicsStr) init { @@ -137,18 +138,16 @@ class WsConnection( override fun onMessage(webSocket: WebSocket, text: String) { synchronize("onMessage") { Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text") - val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt()) - if (notificationWithTopic == null) { - Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.") - return@synchronize + val message = parser.parseMessage(text) ?: return@synchronize + val id = notificationListener( + ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush), + message + ) + if (id != null) { + since.set(id) + } else { + Log.d(WsConnection.TAG,"$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.") } - val topic = notificationWithTopic.topic - val notification = notificationWithTopic.notification - val subscriptionId = topicsToSubscriptionIds[topic] ?: return@synchronize - val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize - val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) - notificationListener(subscription, notificationWithSubscriptionId) - since.set(notification.id) } } diff --git a/app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt b/app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt index 46a0bcb1..1f4bbfb7 100644 --- a/app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt +++ b/app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt @@ -94,6 +94,13 @@ class BroadcastReceiver : android.content.BroadcastReceiver() { // Note, this may fail due to a SQL constraint exception, see https://github.com/binwiederhier/ntfy/issues/185 repository.addSubscription(subscription) distributor.sendEndpoint(appId, connectorToken, endpoint) + /* We need to stop sending the endpoint here once everyone has the new server, + the foreground service will do that after registering with the server. + That will avoid a race condition where the application server + is rejected before ntfy even establishes that this topic exists. + This is fine from an application perspective, because other distributors can't even register + without a connection to the push server. + Unless the app registers twice. Then it'll get the endpoint anyway.*/ // Refresh (and maybe start) foreground service SubscriberServiceManager.refresh(app) @@ -143,5 +150,26 @@ class BroadcastReceiver : android.content.BroadcastReceiver() { private const val TOPIC_RANDOM_ID_LENGTH = 12 val mutex = Mutex() // https://github.com/binwiederhier/ntfy/issues/230 + + // TODO Where's the best place to put this function? This seems to be the only place + // with the access to the locks, but also globally accessible + // but also, broadcast receiver is for *receiving Android broadcasts* + public fun sendRegistration(context: Context, baseUrl : String, topic : String) { + val app = context.applicationContext as Application + val repository = app.repository + val distributor = Distributor(app) + GlobalScope.launch(Dispatchers.IO) { + // We're doing all of this inside a critical section, because of possible races. + // See https://github.com/binwiederhier/ntfy/issues/230 for details. + + mutex.withLock { + val existingSubscription = repository.getSubscription(baseUrl, topic) ?: return@launch + val appId = existingSubscription.upAppId ?: return@launch + val connectorToken = existingSubscription.upConnectorToken ?: return@launch + val endpoint = topicUrlUp(existingSubscription.baseUrl, existingSubscription.topic) + distributor.sendEndpoint(appId, connectorToken, endpoint) + } + } + } } }