Skip to content

Commit

Permalink
Make sure that socket is flushed before adding new data
Browse files Browse the repository at this point in the history
  • Loading branch information
Achilleas Anagnostopoulos committed Dec 5, 2014
1 parent 34767fc commit 53ecd7a
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions lib/src/connection/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Connection {
Map<int, Completer> _pendingResponses;
Completer _connected;
Completer _drained;
Future _socketFlushed;

Connection(String this.connId, String this.host, int this.port, {PoolConfiguration config, String this.defaultKeyspace}) {
// If no config is specified, use the default one
Expand Down Expand Up @@ -82,6 +83,7 @@ class Connection {
connectionLogger.info("[${connId}] Trying to connect to ${host}:${port} [attempt ${_connectionAttempt + 1}/${_poolConfig.maxConnectionAttempts}]");
Socket.connect(host, port).then((Socket s) {
_socket = s;
_socketFlushed = new Future.value(true);

// Initialize our writer pool and set the reservation timeout
_reservedFrameWriters.clear();
Expand Down Expand Up @@ -227,15 +229,17 @@ class Connection {
Future _writeMessage(RequestMessage message) {

Completer reply = new Completer();
// Block till we get back a frame writer
//new Future.sync(_frameWriterPool.reserve)
_frameWriterPool.reserve()
// Make sure we have flushed our socket data and then
// block till we get back a frame writer
_socketFlushed
.then( (_) => _frameWriterPool.reserve())
.then((FrameWriter writer) {
_reservedFrameWriters[ writer.getStreamId()] = writer;
_pendingResponses[ writer.getStreamId() ] = reply;
connectionLogger.fine("[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}");
writer.writeMessage(message, _socket, compression : _poolConfig.compression);
return _socket.flush();
_socketFlushed = _socket.flush();
return _socketFlushed;
})
.catchError((e, trace) {
// Wrap SocketExceptions
Expand Down

0 comments on commit 53ecd7a

Please sign in to comment.