Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(realtime_client): Add support for authorized realtime channels with broadcast and presence. #970

Merged
merged 7 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class RealtimeChannel {
@internal
final RealtimeClient socket;

late final bool _private;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this attribute should be documented as well like in RealtimeChannelConfig


RealtimeChannel(
this.topic,
this.socket, {
Expand All @@ -40,7 +42,8 @@ class RealtimeChannel {
params = params.toMap(),
subTopic = topic.replaceFirst(
RegExp(r"^realtime:", caseSensitive: false), "") {
broadcastEndpointURL = _broadcastEndpointURL;
broadcastEndpointURL = '${httpEndpointURL(socket.endPoint)}/api/broadcast';
_private = params.private;

joinPush = Push(
this,
Expand Down Expand Up @@ -117,6 +120,7 @@ class RealtimeChannel {
} else {
final broadcast = params['config']['broadcast'];
final presence = params['config']['presence'];
final isPrivate = params['config']['private'];

_onError((e) {
if (callback != null) callback(RealtimeSubscribeStatus.channelError, e);
Expand All @@ -131,6 +135,7 @@ class RealtimeChannel {
'presence': presence,
'postgres_changes':
_bindings['postgres_changes']?.map((r) => r.filter).toList() ?? [],
'private': isPrivate == true,
};

if (socket.accessToken != null) {
Expand Down Expand Up @@ -483,13 +488,20 @@ class RealtimeChannel {
}

if (!canPush && type == RealtimeListenTypes.broadcast) {
final headers = {'Content-Type': 'application/json', ...socket.headers};
final headers = <String, String>{
'Content-Type': 'application/json',
'apikey': socket.params['apikey'] ?? '',
dshukertjr marked this conversation as resolved.
Show resolved Hide resolved
...socket.headers,
'Authorization':
socket.accessToken != null ? 'Bearer ${socket.accessToken}' : '',
dshukertjr marked this conversation as resolved.
Show resolved Hide resolved
};
final body = {
'messages': [
{
'topic': subTopic,
'payload': payload,
'event': event,
'private': _private,
}
]
};
Expand Down Expand Up @@ -595,18 +607,6 @@ class RealtimeChannel {
return completer.future;
}

String get _broadcastEndpointURL {
var url = socket.endPoint;
url = url.replaceFirst(RegExp(r'^ws', caseSensitive: false), 'http');
url = url.replaceAll(
RegExp(r'(/socket/websocket|/socket|/websocket)/?$',
caseSensitive: false),
'',
);
url = '${url.replaceAll(RegExp(r'/+$'), '')}/api/broadcast';
return url;
}

/// Overridable message hook
///
/// Receives all events for specialized message handling before dispatching to the channel callbacks.
Expand Down
2 changes: 2 additions & 0 deletions packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RealtimeClient {
String? accessToken;
List<RealtimeChannel> channels = [];
final String endPoint;

final Map<String, String> headers;
final Map<String, dynamic> params;
final Duration timeout;
Expand Down Expand Up @@ -85,6 +86,7 @@ class RealtimeClient {
/// Initializes the Socket
///
/// `endPoint` The string WebSocket endpoint, ie, "ws://example.com/socket", "wss://example.com", "/socket" (inherited host & protocol)
/// `httpEndpoint` The string HTTP endpoint, ie, "https://example.com", "/" (inherited host & protocol)
/// `transport` The Websocket Transport, for example WebSocket.
/// `timeout` The default timeout in milliseconds to trigger push timeouts.
/// `params` The optional params to pass when connecting.
Expand Down
17 changes: 17 additions & 0 deletions packages/realtime_client/lib/src/transformers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,20 @@ Map<String, Map<String, dynamic>> getPayloadRecords(

return records;
}

/// Converts a WebSocket URL to an HTTP URL.
String httpEndpointURL(String socketUrl) {
var url = socketUrl;

// Replace 'ws' or 'wss' with 'http' or 'https' respectively
url = url.replaceFirst(RegExp(r'^ws', caseSensitive: false), 'http');

// Remove WebSocket-specific endings
url = url.replaceFirst(
RegExp(r'(/socket/websocket|/socket|/websocket)/?$', caseSensitive: false),
'',
);

// Remove trailing slashes
return url.replaceAll(RegExp(r'/+$'), '');
}
5 changes: 5 additions & 0 deletions packages/realtime_client/lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,14 @@ class RealtimeChannelConfig {
/// [key] option is used to track presence payload across clients
final String key;

/// defines if the channel is private or not and if RLS policies will be used to check data
final bool private;

const RealtimeChannelConfig({
this.ack = false,
this.self = false,
this.key = '',
this.private = false,
});

Map<String, dynamic> toMap() {
Expand All @@ -164,6 +168,7 @@ class RealtimeChannelConfig {
'presence': {
'key': key,
},
'private': private,
}
};
}
Expand Down
27 changes: 25 additions & 2 deletions packages/realtime_client/test/channel_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:io';

import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/constants.dart';
import 'package:realtime_client/src/push.dart';
import 'package:realtime_client/src/types.dart';
import 'package:test/test.dart';

Expand Down Expand Up @@ -33,11 +34,31 @@ void main() {
expect(channel.params, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''}
'presence': {'key': ''},
'private': false,
}
});
expect(channel.socket, socket);
});

test('sets up joinPush object with private defined', () {
channel = RealtimeChannel(
'topic',
socket,
params: RealtimeChannelConfig(
private: true,
),
);
final Push joinPush = channel.joinPush;

expect(joinPush.payload, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''},
'private': true,
},
});
});
});

group('join', () {
Expand Down Expand Up @@ -252,7 +273,7 @@ void main() {
params: {'apikey': 'supabaseKey'},
);

channel = socket.channel('myTopic');
channel = socket.channel('myTopic', RealtimeChannelConfig(private: true));
});

tearDown(() async {
Expand Down Expand Up @@ -311,9 +332,11 @@ void main() {
final body = json.decode(await utf8.decodeStream(req));
final message = body['messages'][0];
final payload = message['payload'];
final private = message['private'];

expect(payload, containsPair('myKey', 'myValue'));
expect(message, containsPair('topic', 'myTopic'));
expect(private, true);

await req.response.close();
break;
Expand Down
3 changes: 2 additions & 1 deletion packages/realtime_client/test/socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ void main() {
expect(channel.params, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''}
'presence': {'key': ''},
'private': false,
}
});
});
Expand Down
15 changes: 15 additions & 0 deletions packages/realtime_client/test/transformers_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,19 @@ void main() {
expect(enrichedPayload, expectedMap);
});
});

group('httpEndpointURL', () {
test('Converts a hosted Supabase WS URL', () {
expect(
httpEndpointURL('wss://example.supabase.co/realtime/v1'),
equals('https://example.supabase.co/realtime/v1'),
);
});
test('Converts a custom domain WS URL', () {
expect(
httpEndpointURL('wss://custom-domain.com/realtime/v1'),
equals('https://custom-domain.com/realtime/v1'),
);
});
});
}
Loading