From 9da547f814ced9e24cd9c13fb270b58653f316ef Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Mon, 28 Sep 2020 18:42:32 +0200 Subject: [PATCH 1/6] replace direct hidden indices access with system indices api --- .../config_management/elasticsearch_source.rb | 31 ++----- .../management/multiple_pipelines_spec.rb | 2 +- .../management/read_configuration_spec.rb | 2 +- x-pack/qa/integration/support/helpers.rb | 16 +++- .../elasticsearch_source_spec.rb | 90 ++++++++----------- 5 files changed, 65 insertions(+), 76 deletions(-) diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index 9e04121fce5..bd842850a2e 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -18,7 +18,7 @@ class ElasticsearchSource < LogStash::Config::Source::Base class RemoteConfigError < LogStash::Error; end - PIPELINE_INDEX = ".logstash" + SYSTEM_INDICES_API_PATH = "_logstash/pipeline" # exclude basic VALID_LICENSES = %w(trial standard gold platinum enterprise) FEATURE_INTERNAL = 'management' @@ -69,27 +69,18 @@ def pipeline_configs raise RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" end - if response["docs"].nil? - logger.debug("Server returned an unknown or malformed document structure", :response => response) - raise RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" - end - - # Cache pipelines to handle the case where a remote configuration error can render a pipeline unusable - # it is not reloadable - @cached_pipelines = response["docs"].collect do |document| - get_pipeline(document) + @cached_pipelines = pipeline_ids.collect do |pid| + get_pipeline(pid, response) end.compact end - def get_pipeline(response) - pipeline_id = response["_id"] - - if response["found"] == false + def get_pipeline(pipeline_id, response) + if response.has_key?(pipeline_id) == false logger.debug("Could not find a remote configuration for a specific `pipeline_id`", :pipeline_id => pipeline_id) return nil end - config_string = response.fetch("_source", {})["pipeline"] + config_string = response.fetch(pipeline_id, {})["pipeline"] raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty? @@ -100,7 +91,7 @@ def get_pipeline(response) settings.set("pipeline.id", pipeline_id) # override global settings with pipeline settings from ES, if any - pipeline_settings = response["_source"]["pipeline_settings"] + pipeline_settings = response[pipeline_id]["pipeline_settings"] unless pipeline_settings.nil? pipeline_settings.each do |setting, value| if SUPPORTED_PIPELINE_SETTINGS.include? setting @@ -128,12 +119,8 @@ def build_client end def fetch_config(pipeline_ids) - request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } }) - client.post(config_path, {}, request_body_string) - end - - def config_path - "#{PIPELINE_INDEX}/_mget" + path_ids = pipeline_ids.join(",") + client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}") end def populate_license_state(xpack_info) diff --git a/x-pack/qa/integration/management/multiple_pipelines_spec.rb b/x-pack/qa/integration/management/multiple_pipelines_spec.rb index b069fa89a36..680f04976cf 100644 --- a/x-pack/qa/integration/management/multiple_pipelines_spec.rb +++ b/x-pack/qa/integration/management/multiple_pipelines_spec.rb @@ -17,7 +17,7 @@ "hello" => nil } - cleanup_elasticsearch(".logstash*") + cleanup_system_indices(@pipelines.keys) cleanup_elasticsearch(".monitoring-logstash*") @pipelines.each do |pipeline_id, config| diff --git a/x-pack/qa/integration/management/read_configuration_spec.rb b/x-pack/qa/integration/management/read_configuration_spec.rb index 809d70c89df..c60537f39cf 100644 --- a/x-pack/qa/integration/management/read_configuration_spec.rb +++ b/x-pack/qa/integration/management/read_configuration_spec.rb @@ -31,7 +31,7 @@ def logstash_options(pipeline_id, wait_condition) def start_services(elasticsearch_options, logstash_options) @elasticsearch_service = elasticsearch(elasticsearch_options) - cleanup_elasticsearch(".logstash*") + cleanup_system_indices([PIPELINE_ID]) config = "input { generator { count => 100 } tcp { port => 6000 } } output { null {} }" push_elasticsearch_config(PIPELINE_ID, config) diff --git a/x-pack/qa/integration/support/helpers.rb b/x-pack/qa/integration/support/helpers.rb index 217ebc9d2bc..374d9836a71 100644 --- a/x-pack/qa/integration/support/helpers.rb +++ b/x-pack/qa/integration/support/helpers.rb @@ -8,6 +8,7 @@ require "fileutils" require "stud/try" require "open3" +require "time" VERSIONS_YML_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "..", "versions.yml") VERSION_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "VERSION") @@ -95,7 +96,9 @@ def elasticsearch_client(options = { :url => "http://elastic:#{elastic_password} end def push_elasticsearch_config(pipeline_id, config) - elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config } + elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {}, + { :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => "1" }, + :pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601}) end def cleanup_elasticsearch(index = MONITORING_INDEXES) @@ -103,6 +106,17 @@ def cleanup_elasticsearch(index = MONITORING_INDEXES) elasticsearch_client.indices.refresh end +def cleanup_system_indices(pipeline_ids) + pipeline_ids.each do |id| + begin + elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}") + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + puts ".logstash can be empty #{e.message}" + end + end + elasticsearch_client.indices.refresh +end + def logstash_command_append(cmd, argument, value) if cmd !~ /#{Regexp.escape(argument)}/ cmd << " #{argument} #{value}" diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index 927d853b241..d7a18d70916 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -162,22 +162,18 @@ end end - describe "#config_path" do - before do - # we are testing the arguments here, not the behavior of the elasticsearch output - allow_any_instance_of(described_class).to receive(:build_client).and_return(nil) - end + describe "#fetch_config" do + let(:mock_client) { double("http_client") } - let(:pipeline_id) { "foobar" } - let(:settings) do - { - "xpack.management.pipeline.id" => pipeline_id, - "xpack.management.elasticsearch.password" => "testpassword" - } + before do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response) + allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:client).and_return(mock_license_client) end it "generates the path to get the configuration" do - expect(subject.config_path).to eq("#{described_class::PIPELINE_INDEX}/_mget") + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(LogStash::Json.load("{}")) + subject.fetch_config(["apache", "nginx"]) end end @@ -216,11 +212,9 @@ let(:pipeline_id) { "apache" } let(:mock_client) { double("http_client") } let(:settings) { super.merge({ "xpack.management.pipeline.id" => pipeline_id }) } - let(:es_path) { "#{described_class::PIPELINE_INDEX}/_mget" } - let(:request_body_string) { LogStash::Json.dump({ "docs" => [{ "_id" => pipeline_id }] }) } before do - allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_response)) + allow(mock_client).to receive(:get).with(instance_of(String)).and_return(LogStash::Json.load(elasticsearch_response)) allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response) allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:client).and_return(mock_license_client) @@ -241,29 +235,22 @@ let(:invalid_pipeline_setting) {"nonsensical.invalid.setting"} let(:elasticsearch_response) { %{ - { "docs": [{ - "_index":".logstash", - "_type":"pipelines", - "_id":"#{pipeline_id}", - "_version":8, - "found":true, - "_source": { - "id":"apache", - "description":"Process apache logs", - "modified_timestamp":"2017-02-28T23:02:17.023Z", - "pipeline_metadata":{ - "version":5, - "type":"logstash_pipeline", - "username":"elastic" - }, - "pipeline":"#{config}", - "pipeline_settings": { - "#{whitelisted_pipeline_setting_name}":#{whitelisted_pipeline_setting_value}, - "#{non_whitelisted_pipeline_setting_name}":#{non_whitelisted_pipeline_setting_value}, - "#{invalid_pipeline_setting}":-9999 - } + { + "#{pipeline_id}" : { + "username": "log.stash", + "modified_timestamp":"2017-02-28T23:02:17.023Z", + "pipeline_metadata":{ + "version":5, + "type":"logstash_pipeline", + "username":"elastic" + }, + "pipeline":"#{config}", + "pipeline_settings": { + "#{whitelisted_pipeline_setting_name}":#{whitelisted_pipeline_setting_value}, + "#{non_whitelisted_pipeline_setting_name}":#{non_whitelisted_pipeline_setting_value}, + "#{invalid_pipeline_setting}":-9999 } - }] + } } } } @@ -287,7 +274,7 @@ context 'when the license has expired' do let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } + let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } let(:license_status) { 'expired'} let(:license_expiry_date) { Time.now - (60 * 60 * 24)} @@ -305,7 +292,7 @@ context 'when the license server is not available' do let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } + let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } before :each do allow(mock_license_client).to receive(:get).with('_xpack').and_raise("An error is here") @@ -319,7 +306,7 @@ context 'when the xpack is not installed' do let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } + let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } before :each do expect(mock_license_client).to receive(:get).with('_xpack').and_return(no_xpack_response) @@ -333,7 +320,7 @@ describe 'security enabled/disabled in Elasticsearch' do let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } + let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } let(:xpack_response) do { @@ -377,7 +364,7 @@ context "With an invalid basic license, it should raise an error" do let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } + let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } let(:license_type) { 'basic' } it 'should raise an error' do @@ -394,7 +381,7 @@ end let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } + let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } let(:license_type) { license_type } it "returns a valid pipeline config" do @@ -410,7 +397,7 @@ context "with multiples `pipeline_id` configured" do before do - allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_response)) + allow(mock_client).to receive(:get).with(instance_of(String)).and_return(LogStash::Json.load(elasticsearch_response)) expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) end @@ -421,15 +408,16 @@ "firewall" => config_firewall } end + let(:pipeline_id) { pipelines.keys } let(:config_apache) { "input { generator { id => '123'} } filter { mutate {} } output { }" } let(:config_firewall) { "input { generator { id => '321' } } filter { mutate {} } output { }" } let(:elasticsearch_response) do - content = "{ \"docs\":[" + content = "{" content << pipelines.collect do |pipeline_id, config| - "{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}" + "\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}" end.join(",") - content << "]}" + content << "}" content end @@ -443,7 +431,7 @@ end context "when the configuration is not found" do - let(:elasticsearch_response) { "{ \"docs\": [{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"donotexist\",\"found\":false}]}" } + let(:elasticsearch_response) { "{}" } before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) @@ -455,14 +443,14 @@ end context "when any error returned from elasticsearch" do - let(:elasticsearch_response){'{ "error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"}],"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"},"status":400}' } + let(:elasticsearch_response){"{\"error\" : \"no handler found for uri [/_logstash/pipelines?pretty] and method [GET]\"}" } before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) end it "raises a `RemoteConfigError`" do - expect { subject.pipeline_configs }.to raise_error /illegal_argument_exception/ + expect { subject.pipeline_configs }.to raise_error /no handler found/ end end @@ -474,7 +462,7 @@ end it "raises the exception upstream" do - expect(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_raise("Something bad") + expect(mock_client).to receive(:get).with(instance_of(String)).and_raise("Something bad") expect { subject.pipeline_configs }.to raise_error /Something bad/ end end From 9dd7c7303ae07f576b03fe2facabe26fde2ec6c8 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Thu, 1 Oct 2020 01:12:54 +0200 Subject: [PATCH 2/6] fulfill backward compatibility --- .../config_management/elasticsearch_source.rb | 81 ++- x-pack/qa/integration/support/helpers.rb | 17 +- .../elasticsearch_source_spec.rb | 518 +++++++++++------- 3 files changed, 406 insertions(+), 210 deletions(-) diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index bd842850a2e..2a228886218 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -18,7 +18,6 @@ class ElasticsearchSource < LogStash::Config::Source::Base class RemoteConfigError < LogStash::Error; end - SYSTEM_INDICES_API_PATH = "_logstash/pipeline" # exclude basic VALID_LICENSES = %w(trial standard gold platinum enterprise) FEATURE_INTERNAL = 'management' @@ -50,6 +49,19 @@ def config_conflict? false end + def pipeline_fetcher_factory + response = client.get("/") + + if response["error"] + raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" + end + + version_number = response["version"]["number"].split(".") + first = version_number[0].to_i + second = version_number[1].to_i + (first >= 8 || (first == 7 && second >= 10))? SystemIndicesFetcher.new: LegacyHiddenIndicesFetcher.new + end + def pipeline_configs logger.trace("Fetch remote config pipeline", :pipeline_ids => pipeline_ids) @@ -63,24 +75,27 @@ def pipeline_configs end end - response = fetch_config(pipeline_ids) + fetcher = pipeline_fetcher_factory + response = fetcher.fetch_config(pipeline_ids, client) if response["error"] raise RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" end + response = fetcher.format_response(response) + @cached_pipelines = pipeline_ids.collect do |pid| - get_pipeline(pid, response) + get_pipeline(pid, response, fetcher) end.compact end - def get_pipeline(pipeline_id, response) + def get_pipeline(pipeline_id, response, fetcher) if response.has_key?(pipeline_id) == false logger.debug("Could not find a remote configuration for a specific `pipeline_id`", :pipeline_id => pipeline_id) return nil end - config_string = response.fetch(pipeline_id, {})["pipeline"] + config_string = fetcher.get_single_pipeline_setting(response, pipeline_id)["pipeline"] raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty? @@ -91,7 +106,7 @@ def get_pipeline(pipeline_id, response) settings.set("pipeline.id", pipeline_id) # override global settings with pipeline settings from ES, if any - pipeline_settings = response[pipeline_id]["pipeline_settings"] + pipeline_settings = fetcher.get_single_pipeline_setting(response, pipeline_id)["pipeline_settings"] unless pipeline_settings.nil? pipeline_settings.each do |setting, value| if SUPPORTED_PIPELINE_SETTINGS.include? setting @@ -118,11 +133,6 @@ def build_client es.build_client end - def fetch_config(pipeline_ids) - path_ids = pipeline_ids.join(",") - client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}") - end - def populate_license_state(xpack_info) if xpack_info.failed? { @@ -180,5 +190,54 @@ def client @client ||= build_client end end + + class SystemIndicesFetcher + include LogStash::Util::Loggable + + SYSTEM_INDICES_API_PATH = "_logstash/pipeline" + + def fetch_config(pipeline_ids, client) + path_ids = pipeline_ids.join(",") + client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}") + end + + def format_response(response) + # for backwards compatibility + response + end + + def get_single_pipeline_setting(response, pipeline_id) + response.fetch(pipeline_id, {}) + end + end + + # TODO clean up LegacyHiddenIndicesFetcher when 7.9.* is deprecated + class LegacyHiddenIndicesFetcher + include LogStash::Util::Loggable + + PIPELINE_INDEX = ".logstash" + + def fetch_config(pipeline_ids, client) + request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } }) + client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string) + end + + def format_response(response) + if response["docs"].nil? + logger.debug("Server returned an unknown or malformed document structure", :response => response) + raise LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" + end + + response["docs"].map { |pipeline| + {pipeline["_id"] => pipeline} if pipeline.fetch("found", false) + }.compact + .reduce({}, :merge) + end + + def get_single_pipeline_setting(response, pipeline_id) + response.fetch(pipeline_id, {}).fetch("_source", {}) + end + end + end end diff --git a/x-pack/qa/integration/support/helpers.rb b/x-pack/qa/integration/support/helpers.rb index 374d9836a71..cd05f72cc5b 100644 --- a/x-pack/qa/integration/support/helpers.rb +++ b/x-pack/qa/integration/support/helpers.rb @@ -95,10 +95,19 @@ def elasticsearch_client(options = { :url => "http://elastic:#{elastic_password} Elasticsearch::Client.new(options) end +def es_version + response = elasticsearch_client.perform_request(:get, "") + response.body["version"]["number"].gsub(/(\d+\.\d+)\..+/, '\1').to_f +end + def push_elasticsearch_config(pipeline_id, config) - elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {}, - { :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => "1" }, - :pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601}) + if es_version >= 7.10 + elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {}, + { :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => "1" }, + :pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601}) + else + elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config } + end end def cleanup_elasticsearch(index = MONITORING_INDEXES) @@ -150,4 +159,4 @@ def verify_response!(cmd, response) unless response.successful? raise "Something went wrong when installing xpack,\ncmd: #{cmd}\nresponse: #{response}" end -end +end \ No newline at end of file diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index d7a18d70916..ed2adfb4751 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -12,6 +12,8 @@ require "stud/temporary" describe LogStash::ConfigManagement::ElasticsearchSource do + let(:system_indices_api) { LogStash::ConfigManagement::SystemIndicesFetcher::SYSTEM_INDICES_API_PATH } + let(:system_indices_url_regex) { Regexp.new("^#{system_indices_api}") } let(:elasticsearch_url) { ["https://localhost:9898"] } let(:elasticsearch_username) { "elastictest" } let(:elasticsearch_password) { "testchangeme" } @@ -41,7 +43,6 @@ }" } - let(:valid_xpack_response) { { "license" => { @@ -83,7 +84,6 @@ }") } - let(:settings) do { "xpack.management.enabled" => true, @@ -94,6 +94,10 @@ } end + let(:es_version_response) { es_version_8_response } + let(:es_version_8_response) { generate_es_version_response("8.0.0-SNAPSHOT") } + let(:es_version_7_9_response) { generate_es_version_response("7.9.1") } + before do extension.additionals_settings(system_settings) apply_settings(settings, system_settings) @@ -162,18 +166,89 @@ end end - describe "#fetch_config" do - let(:mock_client) { double("http_client") } + describe LogStash::ConfigManagement::SystemIndicesFetcher do + subject { described_class.new } - before do - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) - allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response) - allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:client).and_return(mock_license_client) + describe "system indices api" do + let(:mock_client) { double("http_client") } + let(:config) { "input { generator { count => 100 } tcp { port => 6005 } } output { }}" } + let(:pipeline_id) { "super_generator" } + let(:elasticsearch_response) { {"#{pipeline_id}"=> {"pipeline"=> "#{config}"}} } + + it "#fetch_config" do + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(LogStash::Json.load("{}")) + subject.fetch_config(["apache", "nginx"], mock_client) + end + + it "#get_single_pipeline_setting" do + expect(subject.get_single_pipeline_setting(elasticsearch_response, pipeline_id)).to eq({"pipeline"=> "#{config}"}) + end end + end - it "generates the path to get the configuration" do - expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(LogStash::Json.load("{}")) - subject.fetch_config(["apache", "nginx"]) + describe LogStash::ConfigManagement::LegacyHiddenIndicesFetcher do + subject { described_class.new } + + describe "legacy api" do + let(:mock_client) { double("http_client") } + let(:config) { "input { generator { count => 100 } tcp { port => 6005 } } output { }}" } + let(:pipeline_id) { "super_generator" } + let(:another_pipeline_id) { "another_generator" } + let(:elasticsearch_response) { + {"docs"=> + [{"_index"=>".logstash", + "_id"=>"#{pipeline_id}", + "_version"=>2, + "_seq_no"=>2, + "_primary_term"=>1, + "found"=>true, + "_source"=> + {"pipeline"=> + "#{config}"}}, + {"_index"=>".logstash", + "_id"=>"#{another_pipeline_id}", + "_version"=>2, + "_seq_no"=>3, + "_primary_term"=>1, + "found"=>true, + "_source"=> + {"pipeline"=> + "input { generator { count => 100 } tcp { port => 6006 } } output { }}"}}, + {"_index"=>".logstash", "_id"=>"not_exists", "found"=>false}]} + } + let(:elasticsearch_err_response) { + {"error"=> + {"root_cause"=> + [{"type"=>"parse_exception", + "reason"=>"request body or source parameter is required"}], + "type"=>"parse_exception", + "reason"=>"request body or source parameter is required"}, + "status"=>400} + } + let(:formatted_es_response) { + {"super_generator"=>{"_index"=>".logstash", "_id"=>"super_generator", "_version"=>2, "_seq_no"=>2, "_primary_term"=>1, "found"=>true, "_source"=>{"pipeline"=>"input { generator { count => 100 } tcp { port => 6005 } } output { }}"}}} + } + + it "#fetch_config" do + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"apache\"},{\"_id\":\"nginx\"}]}").and_return(LogStash::Json.load("{}")) + subject.fetch_config(["apache", "nginx"], mock_client) + end + + it "#format_response should raise error" do + expect{ subject.format_response(elasticsearch_err_response) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + end + + it "#format_response should return pipelines" do + result = subject.format_response(elasticsearch_response) + expect(result.size).to eq(2) + expect(result.has_key?(pipeline_id)).to be_truthy + expect(result.has_key?(another_pipeline_id)).to be_truthy + end + + it "#get_single_pipeline_setting" do + result = subject.get_single_pipeline_setting(formatted_es_response, pipeline_id) + expect(result).to eq({"pipeline" => "#{config}"}) + end end end @@ -212,259 +287,312 @@ let(:pipeline_id) { "apache" } let(:mock_client) { double("http_client") } let(:settings) { super.merge({ "xpack.management.pipeline.id" => pipeline_id }) } + let(:config) { "input { generator {} } filter { mutate {} } output { }" } + let(:elasticsearch_response) { elasticsearch_8_response } + let(:elasticsearch_8_response) { + "{\"#{pipeline_id}\":{ + \"username\":\"log.stash\", + \"modified_timestamp\":\"2017-02-28T23:02:17.023Z\", + \"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"}, + \"pipeline\":\"#{config}\", + \"pipeline_settings\":{\"pipeline.batch.delay\":\"50\", \"pipeline.workers\":\"99\", \"pipeline.output.workers\":\"99\", \"nonsensical.invalid.setting\":\"-9999\"}}}" } + + let(:elasticsearch_7_9_response) { + "{ \"docs\":[{ + \"_index\":\".logstash\", + \"_type\":\"pipelines\", + \"_id\":\"#{pipeline_id}\", + \"_version\":8, + \"found\":true, + \"_source\":{ + \"id\":\"apache\", + \"description\":\"Process apache logs\", + \"modified_timestamp\":\"2017-02-28T23:02:17.023Z\", + \"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"}, + \"pipeline\":\"#{config}\", + \"pipeline_settings\":{\"pipeline.workers\":\"99\", \"pipeline.output.workers\":\"99\", \"nonsensical.invalid.setting\":\"-9999\"}}}]}" } + let(:es_path) { ".logstash/_mget" } + let(:request_body_string) { LogStash::Json.dump({ "docs" => [{ "_id" => pipeline_id }] }) } before do - allow(mock_client).to receive(:get).with(instance_of(String)).and_return(LogStash::Json.load(elasticsearch_response)) + allow(mock_client).to receive(:get).with(system_indices_url_regex).and_return(LogStash::Json.load(elasticsearch_response)) + allow(mock_client).to receive(:get).with("/").and_return(es_version_response) + allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response) allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:client).and_return(mock_license_client) - end - context "with one `pipeline_id` configured" do - context "when successfully fetching a remote configuration" do + describe "system indices [8] and legacy api [7.9]" do + [8, 7.9].each { |es_version| + let(:elasticsearch_response) { (es_version >= 8)? elasticsearch_8_response: elasticsearch_7_9_response } before :each do - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + allow(mock_client).to receive(:get).with("/").and_return(es_version >= 8? es_version_response: es_version_7_9_response) end - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:whitelisted_pipeline_setting_name) {"pipeline.workers"} - let(:whitelisted_pipeline_setting_value) {"99"} - let(:non_whitelisted_pipeline_setting_name) {"pipeline.output.workers"} - let(:non_whitelisted_pipeline_setting_value) {"99"} - let(:invalid_pipeline_setting) {"nonsensical.invalid.setting"} - let(:elasticsearch_response) { - %{ - { - "#{pipeline_id}" : { - "username": "log.stash", - "modified_timestamp":"2017-02-28T23:02:17.023Z", - "pipeline_metadata":{ - "version":5, - "type":"logstash_pipeline", - "username":"elastic" - }, - "pipeline":"#{config}", - "pipeline_settings": { - "#{whitelisted_pipeline_setting_name}":#{whitelisted_pipeline_setting_value}, - "#{non_whitelisted_pipeline_setting_name}":#{non_whitelisted_pipeline_setting_value}, - "#{invalid_pipeline_setting}":-9999 - } - } - } - } - } + context "with one `pipeline_id` configured [#{es_version}]" do + context "when successfully fetching a remote configuration" do + before :each do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) + end - it "returns a valid pipeline config" do - pipeline_config = subject.pipeline_configs + let(:config) { "input { generator {} } filter { mutate {} } output { }" } - expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) - end + it "returns a valid pipeline config" do + pipeline_config = subject.pipeline_configs - it "ignores non-whitelisted and invalid settings" do - pipeline_config = subject.pipeline_configs - settings_hash = pipeline_config[0].settings.to_hash + expect(pipeline_config.first.config_string).to match(config) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) + end - expect(settings_hash[whitelisted_pipeline_setting_name]).to eq(whitelisted_pipeline_setting_value.to_i) - expect(settings_hash[non_whitelisted_pipeline_setting_name]).not_to eq(non_whitelisted_pipeline_setting_value.to_i) - expect(settings_hash[invalid_pipeline_setting]).to be_falsey - end - end + it "ignores non-whitelisted and invalid settings" do + pipeline_config = subject.pipeline_configs + settings_hash = pipeline_config[0].settings.to_hash - context 'when the license has expired' do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } - let(:license_status) { 'expired'} - let(:license_expiry_date) { Time.now - (60 * 60 * 24)} + expect(settings_hash["pipeline.workers"]).to eq(99) + expect(settings_hash["pipeline.output.workers"]).not_to eq(99) + expect(settings_hash["nonsensical.invalid.setting"]).to be_falsey + end + end - before :each do - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) - end + context "when the license has expired [#{es_version}]" do + let(:license_status) { 'expired'} + let(:license_expiry_date) { Time.now - (60 * 60 * 24)} - it "returns a valid pipeline config" do - pipeline_config = subject.pipeline_configs + before :each do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + end - expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) - end - end + it "returns a valid pipeline config" do + pipeline_config = subject.pipeline_configs - context 'when the license server is not available' do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } + expect(pipeline_config.first.config_string).to match(config) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) + end + end - before :each do - allow(mock_license_client).to receive(:get).with('_xpack').and_raise("An error is here") - allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) - end + context "when the license server is not available [#{es_version}]" do + before :each do + allow(mock_license_client).to receive(:get).with('_xpack').and_raise("An error is here") + allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) + end - it 'should raise an error' do - expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) - end - end + it 'should raise an error' do + expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) + end + end - context 'when the xpack is not installed' do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } + context "when the xpack is not installed [#{es_version}]" do + before :each do + expect(mock_license_client).to receive(:get).with('_xpack').and_return(no_xpack_response) + allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) + end - before :each do - expect(mock_license_client).to receive(:get).with('_xpack').and_return(no_xpack_response) - allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) - end + it 'should raise an error' do + expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) + end + end - it 'should raise an error' do - expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) - end - end + describe "security enabled/disabled in Elasticsearch [#{es_version}]" do + let(:xpack_response) do + { + "license"=> { + "status"=> license_status, + "uid"=> "9a48c67c-ce2c-4169-97bf-37d324b8ab80", + "type"=> license_type, + "expiry_date_in_millis"=> license_expiry_in_millis + }, + "features" => { + "security" => { + "description" => "Security for the Elastic Stack", + "available" => true, + "enabled" => security_enabled + } + } + } + end + + before :each do + allow(mock_license_client).to receive(:get).with('_xpack').and_return(xpack_response) + allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) + end + + context "when security is disabled in Elasticsearch [#{es_version}]" do + let(:security_enabled) { false } + it 'should raise an error' do + expect { subject.pipeline_configs }.to raise_error(LogStash::LicenseChecker::LicenseError) + end + end + + context "when security is enabled in Elasticsearch [#{es_version}]" do + let(:security_enabled) { true } + it 'should not raise an error' do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + expect { subject.pipeline_configs }.not_to raise_error + end + end + end - describe 'security enabled/disabled in Elasticsearch' do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } - let(:xpack_response) do - { - "license"=> { - "status"=> license_status, - "uid"=> "9a48c67c-ce2c-4169-97bf-37d324b8ab80", - "type"=> license_type, - "expiry_date_in_millis"=> license_expiry_in_millis - }, - "features" => { - "security" => { - "description" => "Security for the Elastic Stack", - "available" => true, - "enabled" => security_enabled - } - } - } - end + context "With an invalid basic license, it should raise an error [#{es_version}]" do + let(:license_type) { 'basic' } - before :each do - allow(mock_license_client).to receive(:get).with('_xpack').and_return(xpack_response) - allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) - end + it 'should raise an error' do + expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) + end + end + + # config management can be used with any license type except basic + (::LogStash::LicenseChecker::LICENSE_TYPES - ["basic"]).each do |license_type| + context "With a valid #{license_type} license, it should return a pipeline [#{es_version}]" do + before do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + end + + let(:license_type) { license_type } - context 'when security is disabled in Elasticsearch' do - let(:security_enabled) { false } - it 'should raise an error' do - expect { subject.pipeline_configs }.to raise_error(LogStash::LicenseChecker::LicenseError) + it "returns a valid pipeline config" do + pipeline_config = subject.pipeline_configs + + expect(pipeline_config.first.config_string).to match(config) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) + end + end end end - context 'when security is enabled in Elasticsearch' do - let(:security_enabled) { true } - it 'should not raise an error' do + context "with multiples `pipeline_id` configured [#{es_version}]" do + before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) - expect { subject.pipeline_configs }.not_to raise_error end - end - end - - - context "With an invalid basic license, it should raise an error" do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } - let(:license_type) { 'basic' } - it 'should raise an error' do - expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) + context "when successfully fetching multiple remote configuration" do + let(:pipelines) do + { + "apache" => config_apache, + "firewall" => config_firewall + } + end + let(:pipeline_id) { pipelines.keys } + + let(:config_apache) { "input { generator { id => '123'} } filter { mutate {} } output { }" } + let(:config_firewall) { "input { generator { id => '321' } } filter { mutate {} } output { }" } + let(:elasticsearch_response) do + content = "{" + content << pipelines.collect do |pipeline_id, config| + "\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}" + end.join(",") + content << "}" + content + end + + let(:elasticsearch_7_9_response) do + content = "{ \"docs\":[" + content << pipelines.collect do |pipeline_id, config| + "{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}" + end.join(",") + content << "]}" + content + end + let(:request_body_string) { LogStash::Json.dump({ "docs" => pipeline_id.collect { |pipeline_id| { "_id" => pipeline_id } } }) } + + it "returns a valid pipeline config" do + pipeline_config = subject.pipeline_configs + + expect(pipeline_config.collect(&:config_string)).to include(*pipelines.values) + expect(pipeline_config.map(&:pipeline_id).collect(&:to_sym)).to include(*pipelines.keys.collect(&:to_sym)) + end + end end - end - # config management can be used with any license type except basic - (::LogStash::LicenseChecker::LICENSE_TYPES - ["basic"]).each do |license_type| - context "With a valid #{license_type} license, it should return a pipeline" do + context "when the configuration is not found [#{es_version}]" do + let(:elasticsearch_8_response) { "{}" } + let(:elasticsearch_7_9_response) { "{ \"docs\": [{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"donotexist\",\"found\":false}]}" } before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) end - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" } - let(:license_type) { license_type } - - it "returns a valid pipeline config" do - pipeline_config = subject.pipeline_configs - - expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) + it "returns no pipeline config" do + expect(subject.pipeline_configs).to be_empty end end - end - end - context "with multiples `pipeline_id` configured" do + context "when any error returned from elasticsearch [#{es_version}]" do + let(:elasticsearch_8_response){"{\"error\" : \"no handler found for uri [/_logstash/pipelines?pretty] and method [GET]\"}" } + let(:elasticsearch_7_9_response) { '{ "error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"}],"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"},"status":400}' } - before do - allow(mock_client).to receive(:get).with(instance_of(String)).and_return(LogStash::Json.load(elasticsearch_response)) - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) - end + before do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + end - context "when successfully fetching multiple remote configuration" do - let(:pipelines) do - { - "apache" => config_apache, - "firewall" => config_firewall - } - end - let(:pipeline_id) { pipelines.keys } - - let(:config_apache) { "input { generator { id => '123'} } filter { mutate {} } output { }" } - let(:config_firewall) { "input { generator { id => '321' } } filter { mutate {} } output { }" } - let(:elasticsearch_response) do - content = "{" - content << pipelines.collect do |pipeline_id, config| - "\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}" - end.join(",") - content << "}" - content + it "raises a `RemoteConfigError`" do + expect { subject.pipeline_configs }.to raise_error LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError + end end - it "returns a valid pipeline config" do - pipeline_config = subject.pipeline_configs - - expect(pipeline_config.collect(&:config_string)).to include(*pipelines.values) - expect(pipeline_config.map(&:pipeline_id).collect(&:to_sym)).to include(*pipelines.keys.collect(&:to_sym)) - end - end + } end - context "when the configuration is not found" do - let(:elasticsearch_response) { "{}" } - + describe "create pipeline fetcher by es version" do before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) end - it "returns no pipeline config" do - expect(subject.pipeline_configs).to be_empty + it "should give SystemIndicesFetcher in [8]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_response) + expect(subject.pipeline_fetcher_factory).to be_an_instance_of LogStash::ConfigManagement::SystemIndicesFetcher end - end - - context "when any error returned from elasticsearch" do - let(:elasticsearch_response){"{\"error\" : \"no handler found for uri [/_logstash/pipelines?pretty] and method [GET]\"}" } - before do - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + it "should give SystemIndicesFetcher in [7.10]" do + allow(mock_client).to receive(:get).with("/").and_return(generate_es_version_response("7.10.0-SNAPSHOT")) + expect(subject.pipeline_fetcher_factory).to be_an_instance_of LogStash::ConfigManagement::SystemIndicesFetcher end - it "raises a `RemoteConfigError`" do - expect { subject.pipeline_configs }.to raise_error /no handler found/ + it "should give LegacyHiddenIndicesFetcher in [7.9]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) + expect(subject.pipeline_fetcher_factory).to be_an_instance_of LogStash::ConfigManagement::LegacyHiddenIndicesFetcher end end - context "when exception occur" do + describe "when exception occur" do let(:elasticsearch_response) { "" } before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) end - it "raises the exception upstream" do - expect(mock_client).to receive(:get).with(instance_of(String)).and_raise("Something bad") + it "raises the exception upstream in [8]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_response) + allow(mock_client).to receive(:get).with(system_indices_url_regex).and_raise("Something bad") + expect { subject.pipeline_configs }.to raise_error /Something bad/ + end + + + it "raises the exception upstream in [7.9]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) + expect(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_raise("Something bad") expect { subject.pipeline_configs }.to raise_error /Something bad/ end end + + end + + def generate_es_version_response(version) + {"name"=>"MacBook-Pro", + "cluster_name"=>"elasticsearch", + "cluster_uuid"=>"YgpKq8VkTJuGTSb9aidlIA", + "version"=> + {"number"=>"#{version}", + "build_flavor"=>"default", + "build_type"=>"tar", + "build_hash"=>"26eb422dc55236a1c5625e8a73e5d866e54610a2", + "build_date"=>"2020-09-24T09:37:06.459350Z", + "build_snapshot"=>true, + "lucene_version"=>"8.7.0", + "minimum_wire_compatibility_version"=>"7.10.0", + "minimum_index_compatibility_version"=>"7.0.0"}, + "tagline"=>"You Know, for Search"} end end From ddc1b7052a687bcde35c681084bbc56f1a6e3a8f Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Thu, 1 Oct 2020 11:47:00 +0200 Subject: [PATCH 3/6] add log --- x-pack/lib/config_management/elasticsearch_source.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index 2a228886218..bf769a8842d 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -56,6 +56,7 @@ def pipeline_fetcher_factory raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" end + logger.debug("Elasticsearch version ", response["version"]["number"]) version_number = response["version"]["number"].split(".") first = version_number[0].to_i second = version_number[1].to_i @@ -225,7 +226,7 @@ def fetch_config(pipeline_ids, client) def format_response(response) if response["docs"].nil? logger.debug("Server returned an unknown or malformed document structure", :response => response) - raise LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" + raise ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" end response["docs"].map { |pipeline| From 4b0a134d8b69d0318c553fae9f958710123cc152 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Fri, 2 Oct 2020 12:21:51 +0200 Subject: [PATCH 4/6] add comment --- x-pack/lib/config_management/elasticsearch_source.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index bf769a8842d..dd434fd27c0 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -49,6 +49,7 @@ def config_conflict? false end + # decide using system indices api (7.10+) or legacy api (< 7.10) base on elasticsearch server version def pipeline_fetcher_factory response = client.get("/") @@ -223,6 +224,7 @@ def fetch_config(pipeline_ids, client) client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string) end + # transform legacy response to be similar to system indices response def format_response(response) if response["docs"].nil? logger.debug("Server returned an unknown or malformed document structure", :response => response) From 329e37aa584aa21a1d5c4b26a824bcbc17e16b99 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Tue, 6 Oct 2020 14:41:48 +0200 Subject: [PATCH 5/6] fix log msg, rename class, simplify response handling --- .../config_management/elasticsearch_source.rb | 46 +++++++------ .../elasticsearch_source_spec.rb | 69 ++++++++++++++----- 2 files changed, 74 insertions(+), 41 deletions(-) diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index dd434fd27c0..d097db048ce 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -50,14 +50,14 @@ def config_conflict? end # decide using system indices api (7.10+) or legacy api (< 7.10) base on elasticsearch server version - def pipeline_fetcher_factory + def get_pipeline_fetcher response = client.get("/") if response["error"] raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" end - logger.debug("Elasticsearch version ", response["version"]["number"]) + logger.debug("Reading configuration from Elasticsearch version {}", response["version"]["number"]) version_number = response["version"]["number"].split(".") first = version_number[0].to_i second = version_number[1].to_i @@ -77,15 +77,9 @@ def pipeline_configs end end - fetcher = pipeline_fetcher_factory + fetcher = get_pipeline_fetcher response = fetcher.fetch_config(pipeline_ids, client) - if response["error"] - raise RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" - end - - response = fetcher.format_response(response) - @cached_pipelines = pipeline_ids.collect do |pid| get_pipeline(pid, response, fetcher) end.compact @@ -200,11 +194,12 @@ class SystemIndicesFetcher def fetch_config(pipeline_ids, client) path_ids = pipeline_ids.join(",") - client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}") - end + response = client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}") + + if response["error"] + raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" + end - def format_response(response) - # for backwards compatibility response end @@ -213,7 +208,7 @@ def get_single_pipeline_setting(response, pipeline_id) end end - # TODO clean up LegacyHiddenIndicesFetcher when 7.9.* is deprecated + # clean up LegacyHiddenIndicesFetcher https://github.com/elastic/logstash/issues/12291 class LegacyHiddenIndicesFetcher include LogStash::Util::Loggable @@ -221,25 +216,32 @@ class LegacyHiddenIndicesFetcher def fetch_config(pipeline_ids, client) request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } }) - client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string) - end + response = client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string) + + if response["error"] + raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" + end - # transform legacy response to be similar to system indices response - def format_response(response) if response["docs"].nil? logger.debug("Server returned an unknown or malformed document structure", :response => response) raise ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" end - response["docs"].map { |pipeline| - {pipeline["_id"] => pipeline} if pipeline.fetch("found", false) - }.compact - .reduce({}, :merge) + format_response(response) end def get_single_pipeline_setting(response, pipeline_id) response.fetch(pipeline_id, {}).fetch("_source", {}) end + + private + # transform legacy response to be similar to system indices response + def format_response(response) + response["docs"].map { |pipeline| + {pipeline["_id"] => pipeline} if pipeline.fetch("found", false) + }.compact + .reduce({}, :merge) + end end end diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index ed2adfb4751..1e6e72e2ec7 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -98,6 +98,34 @@ let(:es_version_8_response) { generate_es_version_response("8.0.0-SNAPSHOT") } let(:es_version_7_9_response) { generate_es_version_response("7.9.1") } + let(:elasticsearch_7_9_err_response) { + {"error"=> + {"root_cause"=> + [{"type"=>"parse_exception", + "reason"=>"request body or source parameter is required"}], + "type"=>"parse_exception", + "reason"=>"request body or source parameter is required"}, + "status"=>400} + } + + let(:elasticsearch_8_err_response) { + {"error"=> + {"root_cause"=> + [{"type"=>"index_not_found_exception", + "reason"=>"no such index [.logstash]", + "resource.type"=>"index_expression", + "resource.id"=>".logstash", + "index_uuid"=>"_na_", + "index"=>".logstash"}], + "type"=>"index_not_found_exception", + "reason"=>"no such index [.logstash]", + "resource.type"=>"index_expression", + "resource.id"=>".logstash", + "index_uuid"=>"_na_", + "index"=>".logstash"}, + "status"=>404} + } + before do extension.additionals_settings(system_settings) apply_settings(settings, system_settings) @@ -176,8 +204,13 @@ let(:elasticsearch_response) { {"#{pipeline_id}"=> {"pipeline"=> "#{config}"}} } it "#fetch_config" do - expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(LogStash::Json.load("{}")) - subject.fetch_config(["apache", "nginx"], mock_client) + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(elasticsearch_response) + expect(subject.fetch_config(["apache", "nginx"], mock_client)).to eq(elasticsearch_response) + end + + it "#fetch_config should raise error" do + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(elasticsearch_8_err_response) + expect{ subject.fetch_config(["apache", "nginx"], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) end it "#get_single_pipeline_setting" do @@ -216,30 +249,28 @@ "input { generator { count => 100 } tcp { port => 6006 } } output { }}"}}, {"_index"=>".logstash", "_id"=>"not_exists", "found"=>false}]} } - let(:elasticsearch_err_response) { - {"error"=> - {"root_cause"=> - [{"type"=>"parse_exception", - "reason"=>"request body or source parameter is required"}], - "type"=>"parse_exception", - "reason"=>"request body or source parameter is required"}, - "status"=>400} - } + let(:formatted_es_response) { {"super_generator"=>{"_index"=>".logstash", "_id"=>"super_generator", "_version"=>2, "_seq_no"=>2, "_primary_term"=>1, "found"=>true, "_source"=>{"pipeline"=>"input { generator { count => 100 } tcp { port => 6005 } } output { }}"}}} } it "#fetch_config" do - expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"apache\"},{\"_id\":\"nginx\"}]}").and_return(LogStash::Json.load("{}")) - subject.fetch_config(["apache", "nginx"], mock_client) + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"apache\"},{\"_id\":\"nginx\"}]}").and_return(elasticsearch_response) + expect(subject.fetch_config(["apache", "nginx"], mock_client).size).to eq(2) end - it "#format_response should raise error" do - expect{ subject.format_response(elasticsearch_err_response) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + it "#fetch_config should raise error" do + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"apache\"},{\"_id\":\"nginx\"}]}").and_return(elasticsearch_7_9_err_response) + expect{ subject.fetch_config(["apache", "nginx"], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + end + + it "#fetch_config should raise error when response is empty" do + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"apache\"},{\"_id\":\"nginx\"}]}").and_return(LogStash::Json.load("{}")) + expect{ subject.fetch_config(["apache", "nginx"], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) end it "#format_response should return pipelines" do - result = subject.format_response(elasticsearch_response) + result = subject.send(:format_response, elasticsearch_response) expect(result.size).to eq(2) expect(result.has_key?(pipeline_id)).to be_truthy expect(result.has_key?(another_pipeline_id)).to be_truthy @@ -542,17 +573,17 @@ it "should give SystemIndicesFetcher in [8]" do allow(mock_client).to receive(:get).with("/").and_return(es_version_response) - expect(subject.pipeline_fetcher_factory).to be_an_instance_of LogStash::ConfigManagement::SystemIndicesFetcher + expect(subject.get_pipeline_fetcher).to be_an_instance_of LogStash::ConfigManagement::SystemIndicesFetcher end it "should give SystemIndicesFetcher in [7.10]" do allow(mock_client).to receive(:get).with("/").and_return(generate_es_version_response("7.10.0-SNAPSHOT")) - expect(subject.pipeline_fetcher_factory).to be_an_instance_of LogStash::ConfigManagement::SystemIndicesFetcher + expect(subject.get_pipeline_fetcher).to be_an_instance_of LogStash::ConfigManagement::SystemIndicesFetcher end it "should give LegacyHiddenIndicesFetcher in [7.9]" do allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) - expect(subject.pipeline_fetcher_factory).to be_an_instance_of LogStash::ConfigManagement::LegacyHiddenIndicesFetcher + expect(subject.get_pipeline_fetcher).to be_an_instance_of LogStash::ConfigManagement::LegacyHiddenIndicesFetcher end end From cba438708303e403c7b6050c819613df06ca89c1 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Tue, 6 Oct 2020 20:17:25 +0200 Subject: [PATCH 6/6] modularise fetcher --- .../config_management/elasticsearch_source.rb | 37 ++++++++++++------- .../elasticsearch_source_spec.rb | 35 +++++++----------- 2 files changed, 37 insertions(+), 35 deletions(-) diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index d097db048ce..fa5c3736cff 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -78,20 +78,20 @@ def pipeline_configs end fetcher = get_pipeline_fetcher - response = fetcher.fetch_config(pipeline_ids, client) + fetcher.fetch_config(pipeline_ids, client) @cached_pipelines = pipeline_ids.collect do |pid| - get_pipeline(pid, response, fetcher) + get_pipeline(pid, fetcher) end.compact end - def get_pipeline(pipeline_id, response, fetcher) - if response.has_key?(pipeline_id) == false + def get_pipeline(pipeline_id, fetcher) + unless fetcher.config_exist?(pipeline_id) logger.debug("Could not find a remote configuration for a specific `pipeline_id`", :pipeline_id => pipeline_id) return nil end - config_string = fetcher.get_single_pipeline_setting(response, pipeline_id)["pipeline"] + config_string = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline"] raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty? @@ -102,7 +102,7 @@ def get_pipeline(pipeline_id, response, fetcher) settings.set("pipeline.id", pipeline_id) # override global settings with pipeline settings from ES, if any - pipeline_settings = fetcher.get_single_pipeline_setting(response, pipeline_id)["pipeline_settings"] + pipeline_settings = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline_settings"] unless pipeline_settings.nil? pipeline_settings.each do |setting, value| if SUPPORTED_PIPELINE_SETTINGS.include? setting @@ -187,8 +187,17 @@ def client end end + module Fetcher + def config_exist?(pipeline_id) + @response.has_key?(pipeline_id) + end + + def fetch_config(pipeline_ids, client) end + def get_single_pipeline_setting(pipeline_id) end + end + class SystemIndicesFetcher - include LogStash::Util::Loggable + include LogStash::Util::Loggable, Fetcher SYSTEM_INDICES_API_PATH = "_logstash/pipeline" @@ -200,17 +209,17 @@ def fetch_config(pipeline_ids, client) raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" end - response + @response = response end - def get_single_pipeline_setting(response, pipeline_id) - response.fetch(pipeline_id, {}) + def get_single_pipeline_setting(pipeline_id) + @response.fetch(pipeline_id, {}) end end # clean up LegacyHiddenIndicesFetcher https://github.com/elastic/logstash/issues/12291 class LegacyHiddenIndicesFetcher - include LogStash::Util::Loggable + include LogStash::Util::Loggable, Fetcher PIPELINE_INDEX = ".logstash" @@ -227,11 +236,11 @@ def fetch_config(pipeline_ids, client) raise ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" end - format_response(response) + @response = format_response(response) end - def get_single_pipeline_setting(response, pipeline_id) - response.fetch(pipeline_id, {}).fetch("_source", {}) + def get_single_pipeline_setting(pipeline_id) + @response.fetch(pipeline_id, {}).fetch("_source", {}) end private diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index 1e6e72e2ec7..28d10593a33 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -204,18 +204,15 @@ let(:elasticsearch_response) { {"#{pipeline_id}"=> {"pipeline"=> "#{config}"}} } it "#fetch_config" do - expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(elasticsearch_response) - expect(subject.fetch_config(["apache", "nginx"], mock_client)).to eq(elasticsearch_response) + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/#{pipeline_id}").and_return(elasticsearch_response) + expect(subject.fetch_config([pipeline_id], mock_client)).to eq(elasticsearch_response) + expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline"=> "#{config}"}) end it "#fetch_config should raise error" do expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(elasticsearch_8_err_response) expect{ subject.fetch_config(["apache", "nginx"], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) end - - it "#get_single_pipeline_setting" do - expect(subject.get_single_pipeline_setting(elasticsearch_response, pipeline_id)).to eq({"pipeline"=> "#{config}"}) - end end end @@ -225,6 +222,7 @@ describe "legacy api" do let(:mock_client) { double("http_client") } let(:config) { "input { generator { count => 100 } tcp { port => 6005 } } output { }}" } + let(:another_config) { "input { generator { count => 100 } tcp { port => 6006 } } output { }}" } let(:pipeline_id) { "super_generator" } let(:another_pipeline_id) { "another_generator" } let(:elasticsearch_response) { @@ -236,8 +234,7 @@ "_primary_term"=>1, "found"=>true, "_source"=> - {"pipeline"=> - "#{config}"}}, + {"pipeline"=> "#{config}"}}, {"_index"=>".logstash", "_id"=>"#{another_pipeline_id}", "_version"=>2, @@ -245,8 +242,7 @@ "_primary_term"=>1, "found"=>true, "_source"=> - {"pipeline"=> - "input { generator { count => 100 } tcp { port => 6006 } } output { }}"}}, + {"pipeline"=> "#{another_config}"}}, {"_index"=>".logstash", "_id"=>"not_exists", "found"=>false}]} } @@ -255,18 +251,20 @@ } it "#fetch_config" do - expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"apache\"},{\"_id\":\"nginx\"}]}").and_return(elasticsearch_response) - expect(subject.fetch_config(["apache", "nginx"], mock_client).size).to eq(2) + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(elasticsearch_response) + expect(subject.fetch_config([pipeline_id, another_pipeline_id], mock_client).size).to eq(2) + expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"}) + expect(subject.get_single_pipeline_setting(another_pipeline_id)).to eq({"pipeline" => "#{another_config}"}) end it "#fetch_config should raise error" do - expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"apache\"},{\"_id\":\"nginx\"}]}").and_return(elasticsearch_7_9_err_response) - expect{ subject.fetch_config(["apache", "nginx"], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(elasticsearch_7_9_err_response) + expect{ subject.fetch_config([pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) end it "#fetch_config should raise error when response is empty" do - expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"apache\"},{\"_id\":\"nginx\"}]}").and_return(LogStash::Json.load("{}")) - expect{ subject.fetch_config(["apache", "nginx"], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(LogStash::Json.load("{}")) + expect{ subject.fetch_config([pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) end it "#format_response should return pipelines" do @@ -275,11 +273,6 @@ expect(result.has_key?(pipeline_id)).to be_truthy expect(result.has_key?(another_pipeline_id)).to be_truthy end - - it "#get_single_pipeline_setting" do - result = subject.get_single_pipeline_setting(formatted_es_response, pipeline_id) - expect(result).to eq({"pipeline" => "#{config}"}) - end end end