Skip to content

Commit

Permalink
Merge pull request #1192 from pat/realtime-cleaning
Browse files Browse the repository at this point in the history
Clean up orphaned real-time records after reindexing.
  • Loading branch information
pat authored Mar 13, 2021
2 parents 4e6eda0 + 08086ff commit d1f0940
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 3 deletions.
12 changes: 12 additions & 0 deletions lib/thinking_sphinx/real_time/index/template.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ def apply
add_attribute primary_key, :sphinx_internal_id, :bigint
add_attribute class_column, :sphinx_internal_class, :string, :facet => true
add_attribute 0, :sphinx_deleted, :integer

if tidying?
add_attribute -> (_) { Time.current.to_i }, :sphinx_updated_at, :timestamp
end
end

private
Expand All @@ -34,7 +38,15 @@ def class_column
[:class, :name]
end

def config
ThinkingSphinx::Configuration.instance
end

def primary_key
index.primary_key.to_sym
end

def tidying?
config.settings["real_time_tidy"]
end
end
5 changes: 4 additions & 1 deletion lib/thinking_sphinx/real_time/populator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def self.populate(index)

def initialize(index)
@index = index
@started_at = Time.current
end

def populate
Expand All @@ -17,12 +18,14 @@ def populate
instrument 'populated', :instances => instances
end

transcriber.clear_before(started_at) if configuration.settings["real_time_tidy"]

instrument 'finish_populating'
end

private

attr_reader :index
attr_reader :index, :started_at

delegate :controller, :batch_size, :to => :configuration
delegate :scope, :to => :index
Expand Down
6 changes: 6 additions & 0 deletions lib/thinking_sphinx/real_time/transcriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ def initialize(index)
@index = index
end

def clear_before(time)
execute <<~SQL.strip
DELETE FROM #{@index.name} WHERE sphinx_updated_at < #{time.to_i}
SQL
end

def copy(*instances)
items = instances.select { |instance|
instance.persisted? && copy?(instance)
Expand Down
1 change: 1 addition & 0 deletions lib/thinking_sphinx/real_time/translator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def initialize(object, column)
end

def call
return name.call(object) if name.is_a?(Proc)
return name unless name.is_a?(Symbol)
return result unless result.is_a?(String)

Expand Down
3 changes: 2 additions & 1 deletion lib/thinking_sphinx/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class ThinkingSphinx::Settings
"binlog_path" => "tmp/binlog/ENVIRONMENT",
"workers" => "threads",
"mysql_encoding" => "utf8",
"maximum_statement_length" => (2 ** 23) - 5
"maximum_statement_length" => (2 ** 23) - 5,
"real_time_tidy" => false
}.freeze

def self.call(configuration)
Expand Down
14 changes: 13 additions & 1 deletion spec/thinking_sphinx/real_time/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
describe ThinkingSphinx::RealTime::Index do
let(:index) { ThinkingSphinx::RealTime::Index.new :user }
let(:config) { double('config', :settings => {},
:indices_location => 'location', :next_offset => 8) }
:indices_location => 'location', :next_offset => 8,
:index_set_class => index_set_class) }
let(:index_set_class) { double(:index_set_class, :reference_name => :user) }

before :each do
allow(ThinkingSphinx::Configuration).to receive_messages :instance => config
Expand Down Expand Up @@ -61,6 +63,16 @@
it "has the internal deleted attribute by default" do
expect(index.attributes.collect(&:name)).to include('sphinx_deleted')
end

it "does not have an internal updated_at attribute by default" do
expect(index.attributes.collect(&:name)).to_not include('sphinx_updated_at')
end

it "has an internal updated_at attribute if real_time_tidy is true" do
config.settings["real_time_tidy"] = true

expect(index.attributes.collect(&:name)).to include('sphinx_updated_at')
end
end

describe '#delta?' do
Expand Down

0 comments on commit d1f0940

Please sign in to comment.