diff --git a/lib/command.dart b/lib/command.dart index bb68f10..1e9c090 100644 --- a/lib/command.dart +++ b/lib/command.dart @@ -11,8 +11,17 @@ part of redis; class Command { /*RedisConnection*/ var _connection; + // parser is somthing that transfer data from redis database to object + Parser parser = Parser(); + // serialiser is somehing that transform object to redis + Serialiser serialise = Serialiser(); Command(this._connection) {} + Command.from(Command other) { + this._connection = other._connection; + this.parser = other.parser; + this.serialise = other.serialise; + } /// Serialise and send data to server /// @@ -24,7 +33,7 @@ class Command { /// send_object(["SET","key","value"]); Future send_object(Object obj) { try { - return _connection._sendraw(RedisSerialise.Serialise(obj)).then((v) { + return _connection._sendraw(parser,serialise.serialise(obj)).then((v) { // turn RedisError into exception if (v is RedisError) { return Future.error(v); diff --git a/lib/connection.dart b/lib/connection.dart index 0935a16..6120d69 100644 --- a/lib/connection.dart +++ b/lib/connection.dart @@ -53,9 +53,9 @@ class RedisConnection { //this doesnt send anything //it just wait something to come from socket //it parse it and execute future - Future _senddummy() { + Future _senddummy(Parser parser) { _future = _future.then((_) { - return RedisParser.parseredisresponse(_stream!); + return parser.parse(_stream!); }); return _future; } @@ -71,9 +71,9 @@ class RedisConnection { } // ignore: unused_element - Future _sendraw(List data) { + Future _sendraw(Parser parser,List data) { _socket!.add(data); - return _senddummy(); + return _senddummy(parser); } void disable_nagle(bool v) { diff --git a/lib/pubsub.dart b/lib/pubsub.dart index 7b6a38d..2d920b2 100644 --- a/lib/pubsub.dart +++ b/lib/pubsub.dart @@ -10,13 +10,13 @@ class PubSub { StreamController _stream_controler = StreamController(); PubSub(Command command) { - _command = Command(command._connection); + _command = Command.from(command); command.send_nothing()!.then((_) { //override socket with warrning command._connection = _WarrningPubSubInProgress(); // listen and process forever return Future.doWhile(() { - return _command._connection._senddummy().then((var data) { + return _command._connection._senddummy(_command.parser).then((var data) { _stream_controler.add(data); return true; }); diff --git a/lib/redisparser.dart b/lib/redisparser.dart index 72db062..6a0ee71 100644 --- a/lib/redisparser.dart +++ b/lib/redisparser.dart @@ -9,6 +9,12 @@ part of redis; +class Parser { + Future parse(LazyStream s){ + return RedisParser.parseredisresponse(s); + } +} + class RedisParser { static final UTF8 = const Utf8Codec(); static const int CR = 13; diff --git a/lib/redisserialise.dart b/lib/redisserialise.dart index af9a61e..7d4011b 100644 --- a/lib/redisserialise.dart +++ b/lib/redisserialise.dart @@ -21,6 +21,13 @@ class RedisBulk { typedef Consumer = void Function(Iterable s); + +class Serialiser { + List serialise(Object? object){ + return RedisSerialise.Serialise(object); + } +} + class RedisSerialise { static final ASCII = const AsciiCodec(); static final UTF8 = const Utf8Codec();