Skip to content

Commit

Permalink
RUBY-550 socket util module and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerBrock authored and Brandon Black committed Feb 14, 2013
1 parent 04fbff3 commit a666f84
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 52 deletions.
1 change: 1 addition & 0 deletions lib/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module Constants
require 'mongo/util/pool_manager'
require 'mongo/util/sharding_pool_manager'
require 'mongo/util/server_version'
require 'mongo/util/socket_util'
require 'mongo/util/ssl_socket'
require 'mongo/util/tcp_socket'
require 'mongo/util/unix_socket'
Expand Down
36 changes: 24 additions & 12 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def initialize(collection, opts={})
@options = 0

# Use this socket for the query
@socket = opts[:socket]
@socket_provided = !!@socket
@socket = opts[:socket]
@pool = nil

@closed = false
@query_run = false
Expand Down Expand Up @@ -324,7 +324,16 @@ def close
message.put_int(1)
message.put_long(@cursor_id)
log(:debug, "Cursor#close #{@cursor_id}")
@connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, :socket => @socket)
begin
socket = @pool.checkout
@connection.send_message(
Mongo::Constants::OP_KILL_CURSORS,
message,
:socket => socket
)
ensure
socket.checkin
end
end
@cursor_id = 0
@closed = true
Expand Down Expand Up @@ -461,22 +470,23 @@ def send_initial_query
instrument(:find, instrument_payload) do
begin
message = construct_query_message
@socket ||= checkout_socket_from_connection
socket = @socket || checkout_socket_from_connection
results, @n_received, @cursor_id = @connection.receive_message(
Mongo::Constants::OP_QUERY, message, nil, @socket, @command,
Mongo::Constants::OP_QUERY, message, nil, socket, @command,
nil, @options & OP_QUERY_EXHAUST != 0)
rescue ConnectionFailure => ex
socket.close if socket
@connection.refresh
if tries < 3 && !@socket && (!@command || Mongo::Support::secondary_ok?(@selector))
tries += 1
retry
else
raise
raise ex
end
rescue OperationFailure, OperationTimeout => ex
raise ex
ensure
@socket.pool.checkin(@socket) if @socket && @socket.pool && !@socket_provided
socket.checkin unless @socket || socket.nil?
end
@returned += @n_received
@cache += results
Expand Down Expand Up @@ -506,13 +516,13 @@ def send_get_more
message.put_long(@cursor_id)
log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger

@socket.pool.checkout if @socket.pool
socket = checkout_socket_from_connection

begin
results, @n_received, @cursor_id = @connection.receive_message(
Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command, nil)
Mongo::Constants::OP_GET_MORE, message, nil, socket, @command, nil)
ensure
@socket.pool.checkin(@socket) if @socket.pool
socket.checkin
end

@returned += @n_received
Expand All @@ -523,14 +533,16 @@ def send_get_more
def checkout_socket_from_connection
begin
if @command && !Mongo::Support::secondary_ok?(@selector)
@connection.checkout_reader(:primary)
socket = @connection.checkout_reader(:primary)
else
@connection.checkout_reader(@read, @tag_sets, @acceptable_latency)
socket = @connection.checkout_reader(@read, @tag_sets, @acceptable_latency)
end
rescue SystemStackError, NoMemoryError, SystemCallError => ex
@connection.close
raise ex
end
@pool = socket.pool
socket
end

def checkin_socket(sock)
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/db.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def authenticate(username, password, save_auth=true)
socket = @connection.checkout_reader(:primary_preferred)
issue_authentication(username, password, save_auth, :socket => socket)
ensure
socket.pool.checkin(socket) if socket
socket.checkin if socket
end

@connection.authenticate_pools
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/mongo_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def checkout_writer
# Note: this is overridden in MongoReplicaSetClient.
def checkin(socket)
if @primary_pool && socket && socket.pool
socket.pool.checkin(socket)
socket.checkin
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/mongo_replica_set_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def checkout_writer
# Checkin a socket used for reading.
def checkin(socket)
if socket && socket.pool
socket.pool.checkin(socket)
socket.checkin
end
sync_refresh
end
Expand Down
3 changes: 2 additions & 1 deletion lib/mongo/networking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def send_message(operation, message, opts={})
raise ex
ensure
if sock
sock.pool.checkin(sock)
sock.checkin
end
end
end
Expand Down Expand Up @@ -116,6 +116,7 @@ def receive_message(operation, message, log_message=nil, socket=nil, command=fal
send_message_on_socket(packed_message, socket)
result = receive(socket, request_id, exhaust)
rescue ConnectionFailure => ex
socket.close
checkin(socket)
raise ex
rescue SystemStackError, NoMemoryError, SystemCallError => ex
Expand Down
20 changes: 20 additions & 0 deletions lib/mongo/util/socket_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module SocketUtil

attr_accessor :pool, :pid

def checkout
@pool.checkout if @pool
end

def checkin
@pool.checkin(self) if @pool
end

def close
@socket.close unless closed?
end

def closed?
@socket.closed?
end
end
29 changes: 10 additions & 19 deletions lib/mongo/util/ssl_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,44 @@ module Mongo
# a TCP connection over SSL and then provides an basic interface
# mirroring Ruby's TCPSocket, vis., TCPSocket#send and TCPSocket#read.
class SSLSocket

attr_accessor :pool, :pid
include SocketUtil

def initialize(host, port, op_timeout=nil, connect_timeout=nil)
@op_timeout = op_timeout
@connect_timeout = connect_timeout
@pid = Process.pid

@socket = ::TCPSocket.new(host, port)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
@tcp_socket = ::TCPSocket.new(host, port)
@tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

@ssl = OpenSSL::SSL::SSLSocket.new(@socket)
@ssl.sync_close = true
@socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket)
@socket.sync_close = true

connect
end

def connect
if @connect_timeout
Timeout::timeout(@connect_timeout, ConnectionTimeoutError) do
@ssl.connect
@socket.connect
end
else
@ssl.connect
@socket.connect
end
end

def send(data)
@ssl.syswrite(data)
@socket.syswrite(data)
end

def read(length, buffer)
if @op_timeout
Timeout::timeout(@op_timeout, OperationTimeout) do
@ssl.sysread(length, buffer)
@socket.sysread(length, buffer)
end
else
@ssl.sysread(length, buffer)
@socket.sysread(length, buffer)
end
end

def close
@ssl.close
end

def closed?
@ssl.closed?
end
end
end
10 changes: 1 addition & 9 deletions lib/mongo/util/tcp_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Mongo
# sans Timeout::timeout
#
class TCPSocket
attr_accessor :pool, :pid
include SocketUtil

def initialize(host, port, op_timeout=nil, connect_timeout=nil)
@op_timeout = op_timeout
Expand Down Expand Up @@ -58,13 +58,5 @@ def read(maxlen, buffer)
raise ConnectionFailure, ex
end
end

def close
@socket.close
end

def closed?
@socket.closed?
end
end
end
8 changes: 1 addition & 7 deletions test/functional/cursor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,18 @@ def setup
@@coll_full_name = "#{MONGO_TEST_DB}.test"
end

def test_alive_and_cursor_socket_affinity
def test_alive
batch = []
5000.times do |n|
batch << {:a => n}
end

@@coll.insert(batch)
cursor = @@coll.find
assert !cursor.instance_variable_get(:@socket)
assert !cursor.alive?
cursor.next
socket = cursor.instance_variable_get(:@socket)
assert socket
assert cursor.alive?
200.times { cursor.next }
assert_equal socket, cursor.instance_variable_get(:@socket)
cursor.close
assert_equal socket, cursor.instance_variable_get(:@socket)
assert !cursor.alive?
@@coll.remove
end
Expand Down
2 changes: 2 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def new_mock_socket(host='localhost', port=27017)
socket.stubs(:setsockopt).with(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
socket.stubs(:close)
socket.stubs(:closed?)
socket.stubs(:checkin)
socket.stubs(:pool)
socket
end

Expand Down
2 changes: 1 addition & 1 deletion test/unit/collection_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CollectionTest < Test::Unit::TestCase
@client = MongoClient.new('localhost', 27017, :logger => @logger, :connect => false)
@db = @client['testing']
@coll = @db.collection('books')
@client.expects(:checkout_reader).returns(mock(:pool))
@client.expects(:checkout_reader).returns(new_mock_socket)
@client.expects(:receive_message).with do |op, msg, log, sock|
op == 2004
end.returns([[], 0, 0])
Expand Down

0 comments on commit a666f84

Please sign in to comment.