Skip to content

Commit

Permalink
hash function of pipeline config with metadata (#12389)
Browse files Browse the repository at this point in the history
add metadata in the hash function

Fixed #12387
  • Loading branch information
kaisecheng authored Nov 9, 2020
1 parent 244a9f4 commit 606cfe5
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class SourceWithMetadata implements HashableWithSource {
private final Integer column;
private final String text;
private int linesCount;
private final String metadata;

public String getProtocol() {
return this.protocol;
Expand All @@ -60,14 +61,17 @@ public String getText() {
return text;
}

public String getMetadata() { return metadata; }

private static final Pattern emptyString = Pattern.compile("^\\s*$");

public SourceWithMetadata(String protocol, String id, Integer line, Integer column, String text) throws IncompleteSourceWithMetadataException {
public SourceWithMetadata(String protocol, String id, Integer line, Integer column, String text, String metadata) throws IncompleteSourceWithMetadataException {
this.protocol = protocol;
this.id = id;
this.line = line;
this.column = column;
this.text = text;
this.metadata = metadata;

List<Object> badAttributes = this.attributes().stream().filter(a -> {
if (a == null) return true;
Expand All @@ -82,16 +86,23 @@ public SourceWithMetadata(String protocol, String id, Integer line, Integer colu
}

if (!badAttributes.isEmpty()) {
String message = "Missing attributes in SourceWithMetadata: (" + badAttributes + ") "
+ this.toString();
String message = "Missing attributes in SourceWithMetadata: (" + badAttributes + ") " + this.toString();
throw new IncompleteSourceWithMetadataException(message);
}

this.linesCount = text.split("\\n").length;
}

public SourceWithMetadata(String protocol, String id, String text) throws IncompleteSourceWithMetadataException {
this(protocol, id, 0, 0, text);
this(protocol, id, 0, 0, text, "");
}

public SourceWithMetadata(String protocol, String id, String text, String metadata) throws IncompleteSourceWithMetadataException {
this(protocol, id, 0, 0, text, metadata);
}

public SourceWithMetadata(String protocol, String id, Integer line, Integer column, String text) throws IncompleteSourceWithMetadataException {
this(protocol, id, line, column, text, "");
}

public int hashCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ boolean includeLine(int lineNumber) {
private LocalDateTime readAt;
private String configHash;
private volatile String configString;
private volatile String metadata;
private List<LineToSource> sourceRefs;

private static final String NEWLINE = "\n";
Expand Down Expand Up @@ -104,7 +105,7 @@ public RubyObject getSettings() {

public String configHash() {
if (configHash == null) {
configHash = DigestUtils.sha1Hex(configString());
configHash = DigestUtils.sha1Hex(configString() + metadataString());
}
return configHash;
}
Expand All @@ -129,6 +130,17 @@ public String configString() {
return this.configString;
}

public String metadataString() {
if (this.metadata == null) {
synchronized(this) {
if (this.metadata == null) {
this.metadata = confParts.stream().map(SourceWithMetadata::getMetadata).collect(Collectors.joining());
}
}
}
return this.metadata;
}

public boolean isSystem() {
return this.settings.callMethod(RUBY.getCurrentContext(), "get_value",
RubyString.newString(RUBY, "pipeline.system"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ static class SourceCollector {
private final StringBuilder compositeSource = new StringBuilder();
private final List<SourceWithMetadata> orderedConfigParts = new ArrayList<>();

void appendSource(final String protocol, final String id, final int line, final int column, final String text) throws IncompleteSourceWithMetadataException {
orderedConfigParts.add(new SourceWithMetadata(protocol, id, line, column, text));
void appendSource(final String protocol, final String id, final int line, final int column, final String text, final String metadata) throws IncompleteSourceWithMetadataException {
orderedConfigParts.add(new SourceWithMetadata(protocol, id, line, column, text, metadata));

if (compositeSource.length() > 0 && !compositeSource.toString().endsWith("\n")) {
compositeSource.append("\n");
Expand All @@ -106,13 +106,13 @@ public void setUp() throws IncompleteSourceWithMetadataException {
pipelineIdSym = RubySymbol.newSymbol(RubyUtil.RUBY, PIPELINE_ID);

final SourceCollector sourceCollector = new SourceCollector();
sourceCollector.appendSource("file", "/tmp/1", 0, 0, "input { generator1 }\n");
sourceCollector.appendSource("file", "/tmp/2", 0, 0, "input { generator2 }");
sourceCollector.appendSource("file", "/tmp/3", 0, 0, "input { generator3 }\n");
sourceCollector.appendSource("file", "/tmp/4", 0, 0, "input { generator4 }");
sourceCollector.appendSource("file", "/tmp/5", 0, 0, "input { generator5 }\n");
sourceCollector.appendSource("file", "/tmp/6", 0, 0, "input { generator6 }");
sourceCollector.appendSource("string", "config_string", 0, 0, "input { generator1 }");
sourceCollector.appendSource("file", "/tmp/1", 0, 0, "input { generator1 }\n", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/2", 0, 0, "input { generator2 }", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/3", 0, 0, "input { generator3 }\n", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/4", 0, 0, "input { generator4 }", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/5", 0, 0, "input { generator5 }\n", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/6", 0, 0, "input { generator6 }", "{\"version\": \"1\"}");
sourceCollector.appendSource("string", "config_string", 0, 0, "input { generator1 }", "{\"version\": \"1\"}");

orderedConfigParts = sourceCollector.orderedConfigParts();
configMerged = sourceCollector.compositeSource();
Expand All @@ -129,6 +129,7 @@ public void testReturnsTheSource() {
assertEquals("returns the source", source, sut.getSource());
assertEquals("returns the pipeline id", PIPELINE_ID, sut.getPipelineId());
assertNotNull("returns the config_hash", sut.configHash());
assertNotNull("returns the config_hash", sut.metadataString());
assertEquals("returns the merged `ConfigPart#config_string`", configMerged, sut.configString());
assertThat("records when the config was read", sut.getReadAt(), isBeforeOrSame(LocalDateTime.now()));
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/lib/config_management/elasticsearch_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ def pipeline_configs

def get_pipeline(pipeline_id, fetcher)
config_string = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline"]
pipeline_metadata_str = (fetcher.get_single_pipeline_setting(pipeline_id)["pipeline_metadata"] || "").to_s

raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty?

config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string)
config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string, pipeline_metadata_str)

# We don't support multiple pipelines, so use the global settings from the logstash.yml file
settings = @settings.clone
Expand Down
23 changes: 23 additions & 0 deletions x-pack/qa/integration/management/multiple_pipelines_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,29 @@ def count_hashes(pipelines)
end
end

it "should pick up recreated pipeline with the same config string and different metadata" do
elasticsearch_client.indices.refresh

pipeline_id = @pipelines.keys[0]
config = @pipelines.values[0]
file = File.join(@temporary_directory, pipeline_id)

Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
expect(File.exist?(file)).to be_truthy
end

cleanup_system_indices([pipeline_id])
File.delete(file)
expect(File.exist?(file)).to be_falsey

push_elasticsearch_config(pipeline_id, config, "2")
elasticsearch_client.indices.refresh

Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
expect(File.exist?(file)).to be_truthy
end
end

after :each do
@logstash_service.stop if !!@logstash_service
@elasticsearch_service.stop if !!@elasticsearch_service
Expand Down
27 changes: 18 additions & 9 deletions x-pack/qa/integration/support/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ def elasticsearch_client(options = { :url => "http://elastic:#{elastic_password}

def es_version
response = elasticsearch_client.perform_request(:get, "")
response.body["version"]["number"].gsub(/(\d+\.\d+)\..+/, '\1').to_f
major, minor = response.body["version"]["number"].split(".")
[major.to_i, minor.to_i]
end

def push_elasticsearch_config(pipeline_id, config)
if es_version >= 7.10
def push_elasticsearch_config(pipeline_id, config, version="1")
major, minor = es_version
if major >= 8 || (major == 7 && minor >= 10)
elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {},
{ :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => "1" },
{ :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => version },
:pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601})
else
elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config }
Expand All @@ -116,13 +118,20 @@ def cleanup_elasticsearch(index = MONITORING_INDEXES)
end

def cleanup_system_indices(pipeline_ids)
pipeline_ids.each do |id|
begin
elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}")
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
puts ".logstash can be empty #{e.message}"
major, minor = es_version

if major >= 8 || (major == 7 && minor >= 10)
pipeline_ids.each do |id|
begin
elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}")
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
puts ".logstash can be empty #{e.message}"
end
end
else
cleanup_elasticsearch(".logstash*")
end

elasticsearch_client.indices.refresh
end

Expand Down

0 comments on commit 606cfe5

Please sign in to comment.