From 135b0df0d6394f69162f496674c5b2f2a3dcba4c Mon Sep 17 00:00:00 2001 From: Syphax bouazzouni Date: Sun, 21 Apr 2024 04:23:54 +0200 Subject: [PATCH] Feature: Add index all data step (#136) * move the submission_all_data concern to a submission process service * add index_all step to submission parsing steps * add index all data submission status --- .../submission_index_all_data.rb | 161 ------------------ .../models/concerns/submission_process.rb | 64 ++----- .../models/ontology_submission.rb | 11 +- .../models/submission_status.rb | 3 + .../operations/submission_all_data_indexer.rb | 154 +++++++++++++++++ .../submission_processor.rb | 10 +- test/mappings/test_mappings_bulk_load.rb | 10 +- test/models/test_search.rb | 24 +-- 8 files changed, 205 insertions(+), 232 deletions(-) delete mode 100644 lib/ontologies_linked_data/concerns/ontology_submissions/submission_index_all_data.rb create mode 100644 lib/ontologies_linked_data/services/submission_process/operations/submission_all_data_indexer.rb diff --git a/lib/ontologies_linked_data/concerns/ontology_submissions/submission_index_all_data.rb b/lib/ontologies_linked_data/concerns/ontology_submissions/submission_index_all_data.rb deleted file mode 100644 index 58be6165..00000000 --- a/lib/ontologies_linked_data/concerns/ontology_submissions/submission_index_all_data.rb +++ /dev/null @@ -1,161 +0,0 @@ -require 'parallel' -module LinkedData - module Concerns - module OntologySubmission - module IndexAllData - - module ClassMethods - def clear_indexed_content(ontology) - conn = Goo.init_search_connection(:ontology_data) - begin - conn.delete_by_query("ontology_t:\"#{ontology}\"") - rescue StandardError => e - puts e.message - end - conn - end - - end - - def self.included(base) - base.extend(ClassMethods) - end - - def index_sorted_ids(ids, ontology, conn, logger, commit = true) - total_triples = Parallel.map(ids.each_slice(1000), in_threads: 10) do |ids_slice| - index_ids = 0 - triples_count = 0 - documents = {} - time = Benchmark.realtime do - documents, triples_count = fetch_triples(ids_slice, ontology) - end - - return if documents.empty? - - logger.info("Worker #{Parallel.worker_number} > Fetched #{triples_count} triples of #{id} in #{time} sec.") if triples_count.positive? - - time = Benchmark.realtime do - conn.index_document(documents.values, commit: false) - conn.index_commit if commit - index_ids = documents.size - documents = {} - end - logger.info("Worker #{Parallel.worker_number} > Indexed #{index_ids} ids of #{id} in #{time} sec. Total #{documents.size} ids.") - triples_count - end - total_triples.sum - end - - def index_all_data(logger, commit = true) - page = 1 - size = 10_000 - count_ids = 0 - total_time = 0 - total_triples = 0 - old_count = -1 - - ontology = self.bring(:ontology).ontology - .bring(:acronym).acronym - conn = init_search_collection(ontology) - - ids = {} - - while count_ids != old_count - old_count = count_ids - count = 0 - time = Benchmark.realtime do - ids = fetch_sorted_ids(size, page) - count = ids.size - end - - count_ids += count - total_time += time - page += 1 - - next unless count.positive? - - logger.info("Fetched #{count} ids of #{id} page: #{page} in #{time} sec.") - - total_triples += index_sorted_ids(ids, ontology, conn, logger, commit) - - end - logger.info("Completed indexing all ontology data: #{self.id} in #{total_time} sec. (#{count_ids} ids / #{total_triples} triples)") - logger.flush - end - - private - - def fetch_sorted_ids(size, page) - query = Goo.sparql_query_client.select(:id) - .distinct - .from(RDF::URI.new(self.id)) - .where(%i[id p v]) - .limit(size) - .offset((page - 1) * size) - - query.each_solution.map(&:id).sort - end - - def update_doc(doc, property, new_val) - unescaped_prop = property.gsub('___', '://') - - unescaped_prop = unescaped_prop.gsub('_', '/') - existent_val = doc["#{unescaped_prop}_t"] || doc["#{unescaped_prop}_txt"] - - if !existent_val && !property['#'] - unescaped_prop = unescaped_prop.sub(%r{/([^/]+)$}, '#\1') # change latest '/' with '#' - existent_val = doc["#{unescaped_prop}_t"] || doc["#{unescaped_prop}_txt"] - end - - if existent_val && new_val || new_val.is_a?(Array) - doc.delete("#{unescaped_prop}_t") - doc["#{unescaped_prop}_txt"] = Array(existent_val) + Array(new_val).map(&:to_s) - elsif existent_val.nil? && new_val - doc["#{unescaped_prop}_t"] = new_val.to_s - end - doc - end - - def init_search_collection(ontology) - self.class.clear_indexed_content(ontology) - end - - def fetch_triples(ids_slice, ontology) - documents = {} - count = 0 - filter = ids_slice.map { |x| "?id = <#{x}>" }.join(' || ') - query = Goo.sparql_query_client.select(:id, :p, :v) - .from(RDF::URI.new(self.id)) - .where(%i[id p v]) - .filter(filter) - query.each_solution do |sol| - count += 1 - doc = documents[sol[:id].to_s] - doc ||= { - id: "#{sol[:id]}_#{ontology}", submission_id_t: self.id.to_s, - ontology_t: ontology, resource_model: self.class.model_name, - resource_id: sol[:id].to_s - } - property = sol[:p].to_s - value = sol[:v] - - if property.to_s.eql?(RDF.type.to_s) - update_doc(doc, 'type', value) - else - update_doc(doc, property, value) - end - documents[sol[:id].to_s] = doc - end - [documents, count] - end - - end - end - end -end - - - - - - diff --git a/lib/ontologies_linked_data/models/concerns/submission_process.rb b/lib/ontologies_linked_data/models/concerns/submission_process.rb index 65518fcc..c5b03983 100644 --- a/lib/ontologies_linked_data/models/concerns/submission_process.rb +++ b/lib/ontologies_linked_data/models/concerns/submission_process.rb @@ -7,83 +7,47 @@ def process_submission(logger, options = {}) end def generate_missing_labels(logger) - puts 'Start generate_mission_label' - time = Benchmark.realtime do - LinkedData::Services::GenerateMissingLabels.new(self).process(logger, file_path: self.master_file_path) - end - puts "generate_mission_label ended in #{time}" + LinkedData::Services::GenerateMissingLabels.new(self).process(logger, file_path: self.master_file_path) end def generate_obsolete_classes(logger) - puts 'Start submission_obsolete_classes' - time = Benchmark.realtime do - LinkedData::Services::ObsoleteClassesGenerator.new(self).process(logger, file_path: self.master_file_path) - end - puts "submission_obsolete_classes ended in #{time}" + LinkedData::Services::ObsoleteClassesGenerator.new(self).process(logger, file_path: self.master_file_path) end def extract_metadata(logger, options = {}) - puts 'Start extract metadata' - time = Benchmark.realtime do - LinkedData::Services::SubmissionMetadataExtractor.new(self).process(logger, options) - end - puts "Extract metadata ended in #{time}" + LinkedData::Services::SubmissionMetadataExtractor.new(self).process(logger, options) end def diff(logger, older) - puts 'Start diff' - time = Benchmark.realtime do - LinkedData::Services::SubmissionDiffGenerator.new(self).diff(logger, older) - end - puts "Diff ended in #{time}" + LinkedData::Services::SubmissionDiffGenerator.new(self).diff(logger, older) end def generate_diff(logger) - puts 'Start diff' - time = Benchmark.realtime do - LinkedData::Services::SubmissionDiffGenerator.new(self).process(logger) - end - puts "Diff ended in #{time}" + LinkedData::Services::SubmissionDiffGenerator.new(self).process(logger) + end + + def index_all(logger, commit: true) + LinkedData::Services::OntologySubmissionAllDataIndexer.new(self).process(logger, commit: commit) end def index_terms(logger, commit: true, optimize: true) - puts 'Start index terms' - time = Benchmark.realtime do - LinkedData::Services::OntologySubmissionIndexer.new(self).process(logger, commit: commit, optimize: optimize) - end - puts "Index terms ended in #{time}" + LinkedData::Services::OntologySubmissionIndexer.new(self).process(logger, commit: commit, optimize: optimize) end def index_properties(logger, commit: true, optimize: true) - puts 'Start index properties' - time = Benchmark.realtime do - LinkedData::Services::SubmissionPropertiesIndexer.new(self).process(logger, commit: commit, optimize: optimize) - end - puts "Index properties ended in #{time}" + LinkedData::Services::SubmissionPropertiesIndexer.new(self).process(logger, commit: commit, optimize: optimize) end def archive - puts 'Start archive' - time = Benchmark.realtime do - LinkedData::Services::OntologySubmissionArchiver.new(self).process - end - puts "Archive ended in #{time}" + LinkedData::Services::OntologySubmissionArchiver.new(self).process end def generate_rdf(logger, reasoning: true) - puts 'Start generate RDF' - time = Benchmark.realtime do - LinkedData::Services::SubmissionRDFGenerator.new(self).process(logger, reasoning: reasoning) - end - puts "Generate RDF ended in #{time}" + LinkedData::Services::SubmissionRDFGenerator.new(self).process(logger, reasoning: reasoning) end def generate_metrics(logger) - puts 'Start generate metrics' - time = Benchmark.realtime do - LinkedData::Services::SubmissionMetricsCalculator.new(self).process(logger) - end - puts "Generate metrics ended in #{time}" + LinkedData::Services::SubmissionMetricsCalculator.new(self).process(logger) end end diff --git a/lib/ontologies_linked_data/models/ontology_submission.rb b/lib/ontologies_linked_data/models/ontology_submission.rb index 024250f9..9e2808c5 100644 --- a/lib/ontologies_linked_data/models/ontology_submission.rb +++ b/lib/ontologies_linked_data/models/ontology_submission.rb @@ -12,7 +12,6 @@ module Models class OntologySubmission < LinkedData::Models::Base - include LinkedData::Concerns::OntologySubmission::IndexAllData include LinkedData::Concerns::SubmissionProcessable include LinkedData::Concerns::OntologySubmission::Validators include LinkedData::Concerns::OntologySubmission::UpdateCallbacks @@ -295,6 +294,16 @@ def self.copy_file_repository(acronym, submissionId, src, filename = nil) return dst end + def self.clear_indexed_content(ontology) + conn = Goo.init_search_connection(:ontology_data) + begin + conn.delete_by_query("ontology_t:\"#{ontology}\"") + rescue StandardError => e + #puts e.message + end + conn + end + def valid? valid_result = super return false unless valid_result diff --git a/lib/ontologies_linked_data/models/submission_status.rb b/lib/ontologies_linked_data/models/submission_status.rb index 2e810277..d7c74363 100644 --- a/lib/ontologies_linked_data/models/submission_status.rb +++ b/lib/ontologies_linked_data/models/submission_status.rb @@ -7,6 +7,7 @@ class SubmissionStatus < LinkedData::Models::Base "RDF_LABELS", "ERROR_RDF_LABELS", "OBSOLETE", "ERROR_OBSOLETE", "INDEXED", "ERROR_INDEXED", + "INDEXED_ALL_DATA", "ERROR_INDEXED_ALL_DATA", "INDEXED_PROPERTIES", "ERROR_INDEXED_PROPERTIES", "METRICS", "ERROR_METRICS", "ANNOTATOR", "ERROR_ANNOTATOR", @@ -18,6 +19,8 @@ class SubmissionStatus < LinkedData::Models::Base "RDF" => "Parsed successfully", "RDF_ERROR" => "Error parsing", "INDEXED" => "Indexed terms for search", + "INDEXED_ALL_DATA" => "Indexed all the data of the resource", + "ERROR_INDEXED_ALL_DATA" => "Error indexeding all the data of the resource", "ERROR_INDEXED" => "Error indexing terms for search", "INDEXED_PROPERTIES" => "Indexed properties for search", "ERROR_INDEXED_PROPERTIES" => "Error indexing properties for search", diff --git a/lib/ontologies_linked_data/services/submission_process/operations/submission_all_data_indexer.rb b/lib/ontologies_linked_data/services/submission_process/operations/submission_all_data_indexer.rb new file mode 100644 index 00000000..0a3e46eb --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/operations/submission_all_data_indexer.rb @@ -0,0 +1,154 @@ +require 'parallel' +module LinkedData + module Services + class OntologySubmissionAllDataIndexer < OntologySubmissionProcess + + def process(logger, options = nil) + status = LinkedData::Models::SubmissionStatus.find('INDEXED_ALL_DATA').first + begin + index_all_data(logger, options) + @submission.add_submission_status(status) + rescue StandardError + @submission.add_submission_status(status.get_error_status) + ensure + @submission.save + end + end + + private + + def index_sorted_ids(ids, ontology, conn, logger, commit = true) + total_triples = Parallel.map(ids.each_slice(1000), in_threads: 10) do |ids_slice| + index_ids = 0 + triples_count = 0 + documents = {} + time = Benchmark.realtime do + documents, triples_count = fetch_triples(ids_slice, ontology) + end + + return if documents.empty? + + logger.info("Worker #{Parallel.worker_number} > Fetched #{triples_count} triples of #{@submission.id} in #{time} sec.") if triples_count.positive? + + time = Benchmark.realtime do + conn.index_document(documents.values, commit: false) + conn.index_commit if commit + index_ids = documents.size + documents = {} + end + logger.info("Worker #{Parallel.worker_number} > Indexed #{index_ids} ids of #{@submission.id} in #{time} sec.") + triples_count + end + total_triples.sum + end + + def index_all_data(logger, commit: true) + page = 1 + size = 10_000 + count_ids = 0 + total_time = 0 + total_triples = 0 + old_count = -1 + + ontology = @submission.bring(:ontology).ontology + .bring(:acronym).acronym + conn = init_search_collection(ontology) + + ids = {} + + while count_ids != old_count + old_count = count_ids + count = 0 + time = Benchmark.realtime do + ids = fetch_sorted_ids(size, page) + count = ids.size + end + + count_ids += count + total_time += time + page += 1 + + next unless count.positive? + + logger.info("Fetched #{count} ids of #{@submission.id} page: #{page} in #{time} sec.") + + time = Benchmark.realtime do + total_triples += index_sorted_ids(ids, ontology, conn, logger, commit) + end + logger.info("Indexed #{total_triples} triples of #{@submission.id} page: #{page} in #{time} sec.") + + total_time += time + end + logger.info("Completed indexing all ontology data: #{@submission.id} in #{total_time} sec. (#{count_ids} ids / #{total_triples} triples)") + logger.flush + end + + def fetch_sorted_ids(size, page) + query = Goo.sparql_query_client.select(:id) + .distinct + .from(RDF::URI.new(@submission.id)) + .where(%i[id p v]) + .limit(size) + .offset((page - 1) * size) + + query.each_solution.map(&:id).sort + end + + def update_doc(doc, property, new_val) + unescaped_prop = property.gsub('___', '://') + + unescaped_prop = unescaped_prop.gsub('_', '/') + existent_val = doc["#{unescaped_prop}_t"] || doc["#{unescaped_prop}_txt"] + + if !existent_val && !property['#'] + unescaped_prop = unescaped_prop.sub(%r{/([^/]+)$}, '#\1') # change latest '/' with '#' + existent_val = doc["#{unescaped_prop}_t"] || doc["#{unescaped_prop}_txt"] + end + + if existent_val && new_val || new_val.is_a?(Array) + doc.delete("#{unescaped_prop}_t") + doc["#{unescaped_prop}_txt"] = Array(existent_val) + Array(new_val).map(&:to_s) + elsif existent_val.nil? && new_val + doc["#{unescaped_prop}_t"] = new_val.to_s + end + doc + end + + def init_search_collection(ontology) + @submission.class.clear_indexed_content(ontology) + end + + def fetch_triples(ids_slice, ontology) + documents = {} + count = 0 + filter = ids_slice.map { |x| "?id = <#{x}>" }.join(' || ') + query = Goo.sparql_query_client.select(:id, :p, :v) + .from(RDF::URI.new(@submission.id)) + .where(%i[id p v]) + .filter(filter) + query.each_solution do |sol| + count += 1 + doc = documents[sol[:id].to_s] + doc ||= { + id: "#{sol[:id]}_#{ontology}", submission_id_t: @submission.id.to_s, + ontology_t: ontology, resource_model: @submission.class.model_name, + resource_id: sol[:id].to_s + } + property = sol[:p].to_s + value = sol[:v] + + if property.to_s.eql?(RDF.type.to_s) + update_doc(doc, 'type', value) + else + update_doc(doc, property, value) + end + documents[sol[:id].to_s] = doc + end + [documents, count] + end + + end + end +end + + diff --git a/lib/ontologies_linked_data/services/submission_process/submission_processor.rb b/lib/ontologies_linked_data/services/submission_process/submission_processor.rb index 7cab614d..9fc03020 100644 --- a/lib/ontologies_linked_data/services/submission_process/submission_processor.rb +++ b/lib/ontologies_linked_data/services/submission_process/submission_processor.rb @@ -44,11 +44,13 @@ def process_submission(logger, options = {}) @submission.generate_obsolete_classes(logger) if generate_obsolete_classes?(options) - if !parsed && (index_search?(options) || index_properties?(options)) + if !parsed && (index_search?(options) || index_properties?(options) || index_all_data?(options)) raise StandardError, "The submission #{@submission.ontology.acronym}/submissions/#{@submission.submissionId} cannot be indexed because it has not been successfully parsed" end + @submission.index_all(logger, commit: process_index_commit?(options)) if index_all_data?(options) + @submission.index_terms(logger, commit: process_index_commit?(options)) if index_search?(options) @submission.index_properties(logger, commit: process_index_commit?(options)) if index_properties?(options) @@ -88,6 +90,10 @@ def generate_missing_labels?(options) def generate_obsolete_classes?(options) options[:generate_obsolete_classes].nil? && process_rdf?(options) || options[:generate_obsolete_classes].eql?(true) end + + def index_all_data?(options) + options.empty? || options[:index_all_data].eql?(true) + end def index_search?(options) options.empty? || options[:index_search].eql?(true) @@ -98,7 +104,7 @@ def index_properties?(options) end def process_index_commit?(options) - index_search?(options) || index_properties?(options) + index_search?(options) || index_properties?(options) || index_all_data?(options) end def process_diff?(options) diff --git a/test/mappings/test_mappings_bulk_load.rb b/test/mappings/test_mappings_bulk_load.rb index fe0739d9..d035a71c 100644 --- a/test/mappings/test_mappings_bulk_load.rb +++ b/test/mappings/test_mappings_bulk_load.rb @@ -12,17 +12,11 @@ def self.before_suite helper.submission_parse(ONT_ACR1, 'MappingOntTest1', './test/data/ontology_files/BRO_v3.3.owl', 11, - process_rdf: true, extract_metadata: false, - generate_missing_label: false, - index_search: true, index_properties: false, - diff: false, run_metrics: false) + process_rdf: true, extract_metadata: false, index_search: true) helper.submission_parse(ONT_ACR2, 'MappingOntTest2', './test/data/ontology_files/CNO_05.owl', 22, - process_rdf: true, extract_metadata: false, - generate_missing_label: false, - index_search: true, index_properties: false, - diff: false, run_metrics: false) + process_rdf: true, extract_metadata: false, index_search: true) end diff --git a/test/models/test_search.rb b/test/models/test_search.rb index 7445bdd1..bdf5bc21 100644 --- a/test/models/test_search.rb +++ b/test/models/test_search.rb @@ -6,6 +6,7 @@ def self.after_suite backend_4s_delete LinkedData::Models::Ontology.indexClear LinkedData::Models::Agent.indexClear + Goo.search_client(:ontology_data)&.clear_all_data end def setup @@ -15,17 +16,20 @@ def setup def test_search_ontology ont_count, ont_acronyms, created_ontologies = create_ontologies_and_submissions({ process_submission: true, - process_options: {process_rdf: true, extract_metadata: false, run_metrics: true}, + process_options: { + process_rdf: true, + generate_missing_labels: false, + extract_metadata: false, run_metrics: true }, acronym: 'BROTEST', name: 'ontTEST Bla', file_path: '../../../../test/data/ontology_files/BRO_v3.2.owl', - ont_count: 3, - submission_count: 3 + ont_count: 2, + submission_count: 2 }) ontologies = LinkedData::Models::Ontology.search('*:*', { fq: 'resource_model: "ontology"' })['response']['docs'] - assert_equal 3, ontologies.size + assert_equal 2, ontologies.size ontologies.each do |ont| select_ont = created_ontologies.select { |ont_created| ont_created.id.to_s.eql?(ont['id']) }.first refute_nil select_ont @@ -37,7 +41,7 @@ def test_search_ontology end submissions = LinkedData::Models::Ontology.search('*:*', { fq: 'resource_model: "ontology_submission"' })['response']['docs'] - assert_equal 9, submissions.size + assert_equal 4, submissions.size submissions.each do |sub| created_sub = LinkedData::Models::OntologySubmission.find(RDF::URI.new(sub['id'])).first&.bring_remaining refute_nil created_sub @@ -56,7 +60,7 @@ def test_search_ontology assert_equal sub['openSearchDescription_t'], created_sub.openSearchDescription assert_equal sub['endpoint_txt'], created_sub.endpoint assert_equal sub['uploadFilePath_t'], created_sub.uploadFilePath - assert_equal sub['submissionStatus_txt'].sort, created_sub.submissionStatus.map{|x| x.id.to_s}.sort + assert_equal sub['submissionStatus_txt'].sort, created_sub.submissionStatus.map { |x| x.id.to_s }.sort created_sub.metrics.bring_remaining @@ -129,9 +133,10 @@ def test_search_ontology_data ont_count, ont_acronyms, created_ontologies = create_ontologies_and_submissions({ process_submission: true, process_options: { - process_rdf: true, extract_metadata: false, + process_rdf: true, + extract_metadata: false, generate_missing_labels: false, - index_search: false, + index_all_data: true }, acronym: 'BROTEST', name: 'ontTEST Bla', @@ -143,10 +148,9 @@ def test_search_ontology_data ont_sub = LinkedData::Models::Ontology.find('BROTEST-0').first ont_sub = ont_sub.latest_submission - ont_sub.index_all_data(Logger.new($stdout)) + refute_empty(ont_sub.submissionStatus.select { |x| x.id['INDEXED_ALL_DATA'] }) conn = Goo.search_client(:ontology_data) - response = conn.search('*') count = Goo.sparql_query_client.query("SELECT (COUNT( DISTINCT ?id) as ?c) FROM <#{ont_sub.id}> WHERE {?id ?p ?v}")