-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
replace direct access of hidden indices with system indices api #12279
Changes from 7 commits
9da547f
19bb1e1
9dd7c73
ddc1b70
ad24c5f
4b0a134
96006cb
329e37a
cba4387
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ class ElasticsearchSource < LogStash::Config::Source::Base | |
|
||
class RemoteConfigError < LogStash::Error; end | ||
|
||
PIPELINE_INDEX = ".logstash" | ||
# exclude basic | ||
VALID_LICENSES = %w(trial standard gold platinum enterprise) | ||
FEATURE_INTERNAL = 'management' | ||
|
@@ -50,6 +49,21 @@ def config_conflict? | |
false | ||
end | ||
|
||
# decide using system indices api (7.10+) or legacy api (< 7.10) base on elasticsearch server version | ||
def pipeline_fetcher_factory | ||
response = client.get("/") | ||
|
||
if response["error"] | ||
raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" | ||
end | ||
|
||
logger.debug("Elasticsearch version ", response["version"]["number"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As it stands, this line won't output the value of `response["version"]["number"] - you can update this by either: You might also want to add some context as to what is happening, such as "Reading configuration from Elasticsearch version..." |
||
version_number = response["version"]["number"].split(".") | ||
first = version_number[0].to_i | ||
second = version_number[1].to_i | ||
(first >= 8 || (first == 7 && second >= 10))? SystemIndicesFetcher.new: LegacyHiddenIndicesFetcher.new | ||
roaksoax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
|
||
def pipeline_configs | ||
logger.trace("Fetch remote config pipeline", :pipeline_ids => pipeline_ids) | ||
|
||
|
@@ -63,33 +77,27 @@ def pipeline_configs | |
end | ||
end | ||
|
||
response = fetch_config(pipeline_ids) | ||
fetcher = pipeline_fetcher_factory | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Maybe call this method |
||
response = fetcher.fetch_config(pipeline_ids, client) | ||
|
||
if response["error"] | ||
raise RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" | ||
end | ||
|
||
if response["docs"].nil? | ||
logger.debug("Server returned an unknown or malformed document structure", :response => response) | ||
raise RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" | ||
end | ||
response = fetcher.format_response(response) | ||
|
||
# Cache pipelines to handle the case where a remote configuration error can render a pipeline unusable | ||
# it is not reloadable | ||
@cached_pipelines = response["docs"].collect do |document| | ||
get_pipeline(document) | ||
@cached_pipelines = pipeline_ids.collect do |pid| | ||
get_pipeline(pid, response, fetcher) | ||
end.compact | ||
end | ||
|
||
def get_pipeline(response) | ||
pipeline_id = response["_id"] | ||
|
||
if response["found"] == false | ||
def get_pipeline(pipeline_id, response, fetcher) | ||
if response.has_key?(pipeline_id) == false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Consider using |
||
logger.debug("Could not find a remote configuration for a specific `pipeline_id`", :pipeline_id => pipeline_id) | ||
return nil | ||
end | ||
|
||
config_string = response.fetch("_source", {})["pipeline"] | ||
config_string = fetcher.get_single_pipeline_setting(response, pipeline_id)["pipeline"] | ||
|
||
raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty? | ||
|
||
|
@@ -100,7 +108,7 @@ def get_pipeline(response) | |
settings.set("pipeline.id", pipeline_id) | ||
|
||
# override global settings with pipeline settings from ES, if any | ||
pipeline_settings = response["_source"]["pipeline_settings"] | ||
pipeline_settings = fetcher.get_single_pipeline_setting(response, pipeline_id)["pipeline_settings"] | ||
unless pipeline_settings.nil? | ||
pipeline_settings.each do |setting, value| | ||
if SUPPORTED_PIPELINE_SETTINGS.include? setting | ||
|
@@ -127,15 +135,6 @@ def build_client | |
es.build_client | ||
end | ||
|
||
def fetch_config(pipeline_ids) | ||
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } }) | ||
client.post(config_path, {}, request_body_string) | ||
end | ||
|
||
def config_path | ||
"#{PIPELINE_INDEX}/_mget" | ||
end | ||
|
||
def populate_license_state(xpack_info) | ||
if xpack_info.failed? | ||
{ | ||
|
@@ -193,5 +192,55 @@ def client | |
@client ||= build_client | ||
end | ||
end | ||
|
||
class SystemIndicesFetcher | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this might be simpler if we kept the response object in this class, and had methods like 'config_exists?(pipeline_id)', There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This question to me is if we want to further refactor the existing code or the goal is to apply the new API in a manageable way. This involved thirty lines of code, mainly moving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure there is a huge amount of refactoring to the existing code either way, beyond what is already present; you already have the method I'm comfortable either way, what you have appears to be functionally correct after running this code locally against Elasticsearch |
||
include LogStash::Util::Loggable | ||
|
||
SYSTEM_INDICES_API_PATH = "_logstash/pipeline" | ||
|
||
def fetch_config(pipeline_ids, client) | ||
path_ids = pipeline_ids.join(",") | ||
client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}") | ||
end | ||
|
||
def format_response(response) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to expose this method, which is always required by the caller? Or could the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will put There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to expose this method, or could it be done as part of |
||
# for backwards compatibility | ||
response | ||
end | ||
|
||
def get_single_pipeline_setting(response, pipeline_id) | ||
response.fetch(pipeline_id, {}) | ||
end | ||
end | ||
|
||
# TODO clean up LegacyHiddenIndicesFetcher when 7.9.* is deprecated | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's create an issue to remove this, and put a link in here |
||
class LegacyHiddenIndicesFetcher | ||
include LogStash::Util::Loggable | ||
|
||
PIPELINE_INDEX = ".logstash" | ||
|
||
def fetch_config(pipeline_ids, client) | ||
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } }) | ||
client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string) | ||
end | ||
|
||
# transform legacy response to be similar to system indices response | ||
def format_response(response) | ||
if response["docs"].nil? | ||
logger.debug("Server returned an unknown or malformed document structure", :response => response) | ||
raise ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" | ||
end | ||
|
||
response["docs"].map { |pipeline| | ||
{pipeline["_id"] => pipeline} if pipeline.fetch("found", false) | ||
}.compact | ||
.reduce({}, :merge) | ||
roaksoax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
|
||
def get_single_pipeline_setting(response, pipeline_id) | ||
response.fetch(pipeline_id, {}).fetch("_source", {}) | ||
end | ||
end | ||
|
||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe a different name for the method, such as
get_pipeline_fetcher
?