-
Notifications
You must be signed in to change notification settings - Fork 36
/
pubsub.dart
90 lines (77 loc) · 2.38 KB
/
pubsub.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
part of redis;
class _WarrningPubSubInProgress extends RedisConnection {
RedisConnection _connection;
_WarrningPubSubInProgress(this._connection) {}
_err() => throw "PubSub on this connaction in progress"
"It is not allowed to issue commands trough this handler";
// swap this relevant methods in Conenction with exception
// ignore: unused_element
Future _sendraw(Parser parser, List<int> data) => _err();
// ignore: unused_element
Future _getdummy() => _err();
Future _senddummy(Parser parser) => _err();
// this fake PubSub connection can be closed
Future close() {
return this._connection.close();
}
}
class PubSub {
late Command _command;
StreamController<List> _stream_controler = StreamController<List>();
PubSub(Command command) {
_command = Command.from(command);
command.send_nothing()!.then((_) {
//override socket with warrning
command._connection = _WarrningPubSubInProgress(_command._connection);
// listen and process forever
return Future.doWhile(() {
return _command._connection
._senddummy(_command.parser)
.then<bool>((var data) {
try {
_stream_controler.add(data);
return true; // run doWhile more
} catch (e) {
try {
_stream_controler.addError(e);
} catch (_) {
// we could not notfy stream that we have eror
}
// stop doWhile()
_stream_controler.close();
return false;
}
}).catchError((e) {
try {
_stream_controler.addError(e);
} catch (_) {
// we could not notfy stream that we have eror
}
// stop doWhile()
_stream_controler.close();
return false;
});
});
});
}
Stream getStream() {
return _stream_controler.stream;
}
void subscribe(List<String> s) {
_sendcmd_and_list("SUBSCRIBE", s);
}
void psubscribe(List<String> s) {
_sendcmd_and_list("PSUBSCRIBE", s);
}
void unsubscribe(List<String> s) {
_sendcmd_and_list("UNSUBSCRIBE", s);
}
void punsubscribe(List<String> s) {
_sendcmd_and_list("PUNSUBSCRIBE", s);
}
void _sendcmd_and_list(String cmd, List<String> s) {
List list = [cmd];
list.addAll(s);
_command._connection._socket.add(_command.serializer.serialize(list));
}
}