diff --git a/lib/mongo.rb b/lib/mongo.rb index fed0f856ba..7c57e7fa10 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -45,6 +45,7 @@ module Constants require 'mongo/util/pool_manager' require 'mongo/util/sharding_pool_manager' require 'mongo/util/server_version' +require 'mongo/util/socket_util' require 'mongo/util/ssl_socket' require 'mongo/util/tcp_socket' require 'mongo/util/unix_socket' diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 847c435488..f09f3b220d 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -50,8 +50,8 @@ def initialize(collection, opts={}) @options = 0 # Use this socket for the query - @socket = opts[:socket] - @socket_provided = !!@socket + @socket = opts[:socket] + @pool = nil @closed = false @query_run = false @@ -324,7 +324,16 @@ def close message.put_int(1) message.put_long(@cursor_id) log(:debug, "Cursor#close #{@cursor_id}") - @connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, :socket => @socket) + begin + socket = @pool.checkout + @connection.send_message( + Mongo::Constants::OP_KILL_CURSORS, + message, + :socket => socket + ) + ensure + socket.checkin + end end @cursor_id = 0 @closed = true @@ -461,22 +470,23 @@ def send_initial_query instrument(:find, instrument_payload) do begin message = construct_query_message - @socket ||= checkout_socket_from_connection + socket = @socket || checkout_socket_from_connection results, @n_received, @cursor_id = @connection.receive_message( - Mongo::Constants::OP_QUERY, message, nil, @socket, @command, + Mongo::Constants::OP_QUERY, message, nil, socket, @command, nil, @options & OP_QUERY_EXHAUST != 0) rescue ConnectionFailure => ex + socket.close if socket @connection.refresh if tries < 3 && !@socket && (!@command || Mongo::Support::secondary_ok?(@selector)) tries += 1 retry else - raise + raise ex end rescue OperationFailure, OperationTimeout => ex raise ex ensure - @socket.pool.checkin(@socket) if @socket && @socket.pool && !@socket_provided + socket.checkin unless @socket || socket.nil? end @returned += @n_received @cache += results @@ -506,13 +516,13 @@ def send_get_more message.put_long(@cursor_id) log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger - @socket.pool.checkout if @socket.pool + socket = checkout_socket_from_connection begin results, @n_received, @cursor_id = @connection.receive_message( - Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command, nil) + Mongo::Constants::OP_GET_MORE, message, nil, socket, @command, nil) ensure - @socket.pool.checkin(@socket) if @socket.pool + socket.checkin end @returned += @n_received @@ -523,14 +533,16 @@ def send_get_more def checkout_socket_from_connection begin if @command && !Mongo::Support::secondary_ok?(@selector) - @connection.checkout_reader(:primary) + socket = @connection.checkout_reader(:primary) else - @connection.checkout_reader(@read, @tag_sets, @acceptable_latency) + socket = @connection.checkout_reader(@read, @tag_sets, @acceptable_latency) end rescue SystemStackError, NoMemoryError, SystemCallError => ex @connection.close raise ex end + @pool = socket.pool + socket end def checkin_socket(sock) diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index fc6e04f47a..41f5fc9d57 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -109,7 +109,7 @@ def authenticate(username, password, save_auth=true) socket = @connection.checkout_reader(:primary_preferred) issue_authentication(username, password, save_auth, :socket => socket) ensure - socket.pool.checkin(socket) if socket + socket.checkin if socket end @connection.authenticate_pools diff --git a/lib/mongo/mongo_client.rb b/lib/mongo/mongo_client.rb index d6941547aa..cbaa7331af 100644 --- a/lib/mongo/mongo_client.rb +++ b/lib/mongo/mongo_client.rb @@ -537,7 +537,7 @@ def checkout_writer # Note: this is overridden in MongoReplicaSetClient. def checkin(socket) if @primary_pool && socket && socket.pool - socket.pool.checkin(socket) + socket.checkin end end diff --git a/lib/mongo/mongo_replica_set_client.rb b/lib/mongo/mongo_replica_set_client.rb index 7ac2fbaa12..ba36f12bf6 100644 --- a/lib/mongo/mongo_replica_set_client.rb +++ b/lib/mongo/mongo_replica_set_client.rb @@ -352,7 +352,7 @@ def checkout_writer # Checkin a socket used for reading. def checkin(socket) if socket && socket.pool - socket.pool.checkin(socket) + socket.checkin end sync_refresh end diff --git a/lib/mongo/networking.rb b/lib/mongo/networking.rb index 9a3ba354d3..ef1eaead3b 100644 --- a/lib/mongo/networking.rb +++ b/lib/mongo/networking.rb @@ -39,7 +39,7 @@ def send_message(operation, message, opts={}) raise ex ensure if sock - sock.pool.checkin(sock) + sock.checkin end end end @@ -116,6 +116,7 @@ def receive_message(operation, message, log_message=nil, socket=nil, command=fal send_message_on_socket(packed_message, socket) result = receive(socket, request_id, exhaust) rescue ConnectionFailure => ex + socket.close checkin(socket) raise ex rescue SystemStackError, NoMemoryError, SystemCallError => ex diff --git a/lib/mongo/util/socket_util.rb b/lib/mongo/util/socket_util.rb new file mode 100644 index 0000000000..90a27459a6 --- /dev/null +++ b/lib/mongo/util/socket_util.rb @@ -0,0 +1,20 @@ +module SocketUtil + + attr_accessor :pool, :pid + + def checkout + @pool.checkout if @pool + end + + def checkin + @pool.checkin(self) if @pool + end + + def close + @socket.close unless closed? + end + + def closed? + @socket.closed? + end +end diff --git a/lib/mongo/util/ssl_socket.rb b/lib/mongo/util/ssl_socket.rb index f068b4088c..a640b50b83 100644 --- a/lib/mongo/util/ssl_socket.rb +++ b/lib/mongo/util/ssl_socket.rb @@ -8,19 +8,18 @@ module Mongo # a TCP connection over SSL and then provides an basic interface # mirroring Ruby's TCPSocket, vis., TCPSocket#send and TCPSocket#read. class SSLSocket - - attr_accessor :pool, :pid + include SocketUtil def initialize(host, port, op_timeout=nil, connect_timeout=nil) @op_timeout = op_timeout @connect_timeout = connect_timeout @pid = Process.pid - @socket = ::TCPSocket.new(host, port) - @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + @tcp_socket = ::TCPSocket.new(host, port) + @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - @ssl = OpenSSL::SSL::SSLSocket.new(@socket) - @ssl.sync_close = true + @socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket) + @socket.sync_close = true connect end @@ -28,33 +27,25 @@ def initialize(host, port, op_timeout=nil, connect_timeout=nil) def connect if @connect_timeout Timeout::timeout(@connect_timeout, ConnectionTimeoutError) do - @ssl.connect + @socket.connect end else - @ssl.connect + @socket.connect end end def send(data) - @ssl.syswrite(data) + @socket.syswrite(data) end def read(length, buffer) if @op_timeout Timeout::timeout(@op_timeout, OperationTimeout) do - @ssl.sysread(length, buffer) + @socket.sysread(length, buffer) end else - @ssl.sysread(length, buffer) + @socket.sysread(length, buffer) end end - - def close - @ssl.close - end - - def closed? - @ssl.closed? - end end end diff --git a/lib/mongo/util/tcp_socket.rb b/lib/mongo/util/tcp_socket.rb index 8d8d9a9021..8c1498c2fe 100644 --- a/lib/mongo/util/tcp_socket.rb +++ b/lib/mongo/util/tcp_socket.rb @@ -8,7 +8,7 @@ module Mongo # sans Timeout::timeout # class TCPSocket - attr_accessor :pool, :pid + include SocketUtil def initialize(host, port, op_timeout=nil, connect_timeout=nil) @op_timeout = op_timeout @@ -58,13 +58,5 @@ def read(maxlen, buffer) raise ConnectionFailure, ex end end - - def close - @socket.close - end - - def closed? - @socket.closed? - end end end diff --git a/test/functional/cursor_test.rb b/test/functional/cursor_test.rb index 3a9643e8cb..c84a9bc1c9 100644 --- a/test/functional/cursor_test.rb +++ b/test/functional/cursor_test.rb @@ -16,7 +16,7 @@ def setup @@coll_full_name = "#{MONGO_TEST_DB}.test" end - def test_alive_and_cursor_socket_affinity + def test_alive batch = [] 5000.times do |n| batch << {:a => n} @@ -24,16 +24,10 @@ def test_alive_and_cursor_socket_affinity @@coll.insert(batch) cursor = @@coll.find - assert !cursor.instance_variable_get(:@socket) assert !cursor.alive? cursor.next - socket = cursor.instance_variable_get(:@socket) - assert socket assert cursor.alive? - 200.times { cursor.next } - assert_equal socket, cursor.instance_variable_get(:@socket) cursor.close - assert_equal socket, cursor.instance_variable_get(:@socket) assert !cursor.alive? @@coll.remove end diff --git a/test/test_helper.rb b/test/test_helper.rb index fdb673b32d..84898245ab 100755 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -166,6 +166,8 @@ def new_mock_socket(host='localhost', port=27017) socket.stubs(:setsockopt).with(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) socket.stubs(:close) socket.stubs(:closed?) + socket.stubs(:checkin) + socket.stubs(:pool) socket end diff --git a/test/unit/collection_test.rb b/test/unit/collection_test.rb index 9965e14f53..82d62dbdde 100644 --- a/test/unit/collection_test.rb +++ b/test/unit/collection_test.rb @@ -37,7 +37,7 @@ class CollectionTest < Test::Unit::TestCase @client = MongoClient.new('localhost', 27017, :logger => @logger, :connect => false) @db = @client['testing'] @coll = @db.collection('books') - @client.expects(:checkout_reader).returns(mock(:pool)) + @client.expects(:checkout_reader).returns(new_mock_socket) @client.expects(:receive_message).with do |op, msg, log, sock| op == 2004 end.returns([[], 0, 0])