From 267a4d0e98bf37cef6241d697023ff6208cfcac9 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Tue, 14 Jul 2020 16:39:58 +1000 Subject: [PATCH] Add retry logic to writes --- shard.yml | 4 ++-- spec/spec_helper.cr | 6 ++++++ src/rethinkdb/connection.cr | 43 ++++++++++++++++++++++++++----------- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/shard.yml b/shard.yml index 8505c95..781132a 100644 --- a/shard.yml +++ b/shard.yml @@ -1,6 +1,6 @@ name: rethinkdb -version: 0.2.0 -crystal: 0.34.0 +version: 0.2.1 +crystal: ~> 0.34 license: MIT dependencies: diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 76c859b..561f5f2 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -1,7 +1,13 @@ require "spec" +require "log" + require "../src/crystal-rethinkdb" include RethinkDB::Shortcuts +Spec.before_suite do + ::Log.setup "*", Log::Severity::Debug +end + module Generators @@i = 0 diff --git a/src/rethinkdb/connection.cr b/src/rethinkdb/connection.cr index 04f96c3..2079a5c 100644 --- a/src/rethinkdb/connection.cr +++ b/src/rethinkdb/connection.cr @@ -1,4 +1,5 @@ require "json" +require "log" require "retriable" require "socket" require "socket/tcp_socket" @@ -13,6 +14,8 @@ require "./serialization" module RethinkDB class Connection + Log = ::Log.for(self) + # Authentication getter user private getter password @@ -65,16 +68,9 @@ module RethinkDB end sock.close rescue e + Log.error(exception: e) { "reconnecting" } sock.close - write_lock.synchronize do - reset_channels - reset_id - # Create a new socket - @sock = TCPSocket.new(host, port) - sock.sync = false - connect - authorise(user, password) - end + write_lock.synchronize { reconnect } raise e end end @@ -128,14 +124,37 @@ module RethinkDB protected getter write_lock = Mutex.new(Mutex::Protection::Reentrant) protected def write(data) - write_lock.synchronize { + try_write do sock.write(data) sock.flush + end + end + + protected def reconnect + reset_channels + reset_id + # Create a new socket + @sock = TCPSocket.new(host, port) + sock.sync = false + connect + authorise(user, password) + end + + protected def try_write + write_lock.synchronize { + yield } + rescue e + # Retry in the read loop + sock.close + raise e end protected def read sock.gets('\0', true).not_nil! + rescue e : NilAssertionError + sock.close + raise ConnectionException.new("Socket closed") end @next_id : UInt64 = 1_u64 @@ -323,12 +342,12 @@ module RethinkDB end query_slice = query.to_slice - conn.write_lock.synchronize { + conn.try_write do conn.sock.write_bytes(id, IO::ByteFormat::LittleEndian) conn.sock.write_bytes(query_slice.size, IO::ByteFormat::LittleEndian) conn.sock.write(query_slice) conn.sock.flush - } + end end private def read_response