Skip to content

Commit

Permalink
feat(realtime_client): Add support for authorized realtime channels w…
Browse files Browse the repository at this point in the history
…ith broadcast and presence. (#970)

* Add private channel support

* remove unused variables

* update test

* chore: add comment to _private in realtime_channel

* Update packages/realtime_client/lib/src/realtime_channel.dart

Co-authored-by: Bruno D'Luka <[email protected]>

* Update packages/realtime_client/lib/src/realtime_channel.dart

Co-authored-by: Bruno D'Luka <[email protected]>

* format code

---------

Co-authored-by: Bruno D'Luka <[email protected]>
  • Loading branch information
dshukertjr and bdlukaa authored Jul 12, 2024
1 parent 74747f0 commit 8305fef
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 17 deletions.
29 changes: 15 additions & 14 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class RealtimeChannel {
@internal
final RealtimeClient socket;

/// Defines if the channel is private or not and if RLS policies will be used to check data
late final bool _private;

RealtimeChannel(
this.topic,
this.socket, {
Expand All @@ -40,7 +43,8 @@ class RealtimeChannel {
params = params.toMap(),
subTopic = topic.replaceFirst(
RegExp(r"^realtime:", caseSensitive: false), "") {
broadcastEndpointURL = _broadcastEndpointURL;
broadcastEndpointURL = '${httpEndpointURL(socket.endPoint)}/api/broadcast';
_private = params.private;

joinPush = Push(
this,
Expand Down Expand Up @@ -117,6 +121,7 @@ class RealtimeChannel {
} else {
final broadcast = params['config']['broadcast'];
final presence = params['config']['presence'];
final isPrivate = params['config']['private'];

_onError((e) {
if (callback != null) callback(RealtimeSubscribeStatus.channelError, e);
Expand All @@ -131,6 +136,7 @@ class RealtimeChannel {
'presence': presence,
'postgres_changes':
_bindings['postgres_changes']?.map((r) => r.filter).toList() ?? [],
'private': isPrivate == true,
};

if (socket.accessToken != null) {
Expand Down Expand Up @@ -483,13 +489,20 @@ class RealtimeChannel {
}

if (!canPush && type == RealtimeListenTypes.broadcast) {
final headers = {'Content-Type': 'application/json', ...socket.headers};
final headers = <String, String>{
'Content-Type': 'application/json',
if (socket.params['apikey'] != null) 'apikey': socket.params['apikey']!,
...socket.headers,
if (socket.accessToken != null)
'Authorization': 'Bearer ${socket.accessToken}',
};
final body = {
'messages': [
{
'topic': subTopic,
'payload': payload,
'event': event,
'private': _private,
}
]
};
Expand Down Expand Up @@ -595,18 +608,6 @@ class RealtimeChannel {
return completer.future;
}

String get _broadcastEndpointURL {
var url = socket.endPoint;
url = url.replaceFirst(RegExp(r'^ws', caseSensitive: false), 'http');
url = url.replaceAll(
RegExp(r'(/socket/websocket|/socket|/websocket)/?$',
caseSensitive: false),
'',
);
url = '${url.replaceAll(RegExp(r'/+$'), '')}/api/broadcast';
return url;
}

/// Overridable message hook
///
/// Receives all events for specialized message handling before dispatching to the channel callbacks.
Expand Down
2 changes: 2 additions & 0 deletions packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RealtimeClient {
String? accessToken;
List<RealtimeChannel> channels = [];
final String endPoint;

final Map<String, String> headers;
final Map<String, dynamic> params;
final Duration timeout;
Expand Down Expand Up @@ -85,6 +86,7 @@ class RealtimeClient {
/// Initializes the Socket
///
/// `endPoint` The string WebSocket endpoint, ie, "ws://example.com/socket", "wss://example.com", "/socket" (inherited host & protocol)
/// `httpEndpoint` The string HTTP endpoint, ie, "https://example.com", "/" (inherited host & protocol)
/// `transport` The Websocket Transport, for example WebSocket.
/// `timeout` The default timeout in milliseconds to trigger push timeouts.
/// `params` The optional params to pass when connecting.
Expand Down
17 changes: 17 additions & 0 deletions packages/realtime_client/lib/src/transformers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,20 @@ Map<String, Map<String, dynamic>> getPayloadRecords(

return records;
}

/// Converts a WebSocket URL to an HTTP URL.
String httpEndpointURL(String socketUrl) {
var url = socketUrl;

// Replace 'ws' or 'wss' with 'http' or 'https' respectively
url = url.replaceFirst(RegExp(r'^ws', caseSensitive: false), 'http');

// Remove WebSocket-specific endings
url = url.replaceFirst(
RegExp(r'(/socket/websocket|/socket|/websocket)/?$', caseSensitive: false),
'',
);

// Remove trailing slashes
return url.replaceAll(RegExp(r'/+$'), '');
}
5 changes: 5 additions & 0 deletions packages/realtime_client/lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,14 @@ class RealtimeChannelConfig {
/// [key] option is used to track presence payload across clients
final String key;

/// Defines if the channel is private or not and if RLS policies will be used to check data
final bool private;

const RealtimeChannelConfig({
this.ack = false,
this.self = false,
this.key = '',
this.private = false,
});

Map<String, dynamic> toMap() {
Expand All @@ -164,6 +168,7 @@ class RealtimeChannelConfig {
'presence': {
'key': key,
},
'private': private,
}
};
}
Expand Down
27 changes: 25 additions & 2 deletions packages/realtime_client/test/channel_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:io';

import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/constants.dart';
import 'package:realtime_client/src/push.dart';
import 'package:realtime_client/src/types.dart';
import 'package:test/test.dart';

Expand Down Expand Up @@ -33,11 +34,31 @@ void main() {
expect(channel.params, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''}
'presence': {'key': ''},
'private': false,
}
});
expect(channel.socket, socket);
});

test('sets up joinPush object with private defined', () {
channel = RealtimeChannel(
'topic',
socket,
params: RealtimeChannelConfig(
private: true,
),
);
final Push joinPush = channel.joinPush;

expect(joinPush.payload, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''},
'private': true,
},
});
});
});

group('join', () {
Expand Down Expand Up @@ -252,7 +273,7 @@ void main() {
params: {'apikey': 'supabaseKey'},
);

channel = socket.channel('myTopic');
channel = socket.channel('myTopic', RealtimeChannelConfig(private: true));
});

tearDown(() async {
Expand Down Expand Up @@ -311,9 +332,11 @@ void main() {
final body = json.decode(await utf8.decodeStream(req));
final message = body['messages'][0];
final payload = message['payload'];
final private = message['private'];

expect(payload, containsPair('myKey', 'myValue'));
expect(message, containsPair('topic', 'myTopic'));
expect(private, true);

await req.response.close();
break;
Expand Down
3 changes: 2 additions & 1 deletion packages/realtime_client/test/socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ void main() {
expect(channel.params, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''}
'presence': {'key': ''},
'private': false,
}
});
});
Expand Down
15 changes: 15 additions & 0 deletions packages/realtime_client/test/transformers_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,19 @@ void main() {
expect(enrichedPayload, expectedMap);
});
});

group('httpEndpointURL', () {
test('Converts a hosted Supabase WS URL', () {
expect(
httpEndpointURL('wss://example.supabase.co/realtime/v1'),
equals('https://example.supabase.co/realtime/v1'),
);
});
test('Converts a custom domain WS URL', () {
expect(
httpEndpointURL('wss://custom-domain.com/realtime/v1'),
equals('https://custom-domain.com/realtime/v1'),
);
});
});
}

0 comments on commit 8305fef

Please sign in to comment.