diff --git a/lib/src/connection/connection.dart b/lib/src/connection/connection.dart index 36a532d..cf68bb1 100644 --- a/lib/src/connection/connection.dart +++ b/lib/src/connection/connection.dart @@ -27,6 +27,7 @@ class Connection { Map _pendingResponses; Completer _connected; Completer _drained; + Future _socketFlushed; Connection(String this.connId, String this.host, int this.port, {PoolConfiguration config, String this.defaultKeyspace}) { // If no config is specified, use the default one @@ -82,6 +83,7 @@ class Connection { connectionLogger.info("[${connId}] Trying to connect to ${host}:${port} [attempt ${_connectionAttempt + 1}/${_poolConfig.maxConnectionAttempts}]"); Socket.connect(host, port).then((Socket s) { _socket = s; + _socketFlushed = new Future.value(true); // Initialize our writer pool and set the reservation timeout _reservedFrameWriters.clear(); @@ -227,15 +229,17 @@ class Connection { Future _writeMessage(RequestMessage message) { Completer reply = new Completer(); - // Block till we get back a frame writer - //new Future.sync(_frameWriterPool.reserve) - _frameWriterPool.reserve() + // Make sure we have flushed our socket data and then + // block till we get back a frame writer + _socketFlushed + .then( (_) => _frameWriterPool.reserve()) .then((FrameWriter writer) { _reservedFrameWriters[ writer.getStreamId()] = writer; _pendingResponses[ writer.getStreamId() ] = reply; connectionLogger.fine("[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}"); writer.writeMessage(message, _socket, compression : _poolConfig.compression); - return _socket.flush(); + _socketFlushed = _socket.flush(); + return _socketFlushed; }) .catchError((e, trace) { // Wrap SocketExceptions