Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logstash repeatedly processes a corrupted compressed file in and endless loop #261

Closed
xo4n opened this issue Feb 28, 2020 · 4 comments
Closed

Comments

@xo4n
Copy link

xo4n commented Feb 28, 2020

We had a case where several corrupted compressed files ended up in the input directory of the logstash pipeline. What happens after is that logstash reads and processes some of the lines of the corrupted files, sends some corrupted data to the output, and throws an error before finishing reading.

Because it didn't finish properly, the file is not marked as processed and it is picked up again over and over. The result is a continuous stream of corrupted data being sent to the output.

logstash_1  | [2020-02-28T14:00:21,027][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.5.2"}
logstash_1  | [2020-02-28T14:00:24,963][INFO ][org.reflections.Reflections] Reflections took 100 ms to scan 1 urls, producing 20 keys and 40 values
logstash_1  | [2020-02-28T14:00:27,790][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
logstash_1  | [2020-02-28T14:00:27,798][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/etc/logstash/conf.d/logstash_akamai.conf"], :thread=>"#<Thread:0x5cd406bb run>"}
logstash_1  | [2020-02-28T14:00:28,139][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"main"}
logstash_1  | [2020-02-28T14:00:28,228][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
logstash_1  | [2020-02-28T14:00:28,233][INFO ][filewatch.observingread  ] START, creating Discoverer, Watch with file and sincedb collections
logstash_1  | [2020-02-28T14:00:28,723][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
logstash_1  | [2020-02-28T14:00:29,708][INFO ][logstash.outputs.file    ] Opening file {:path=>"/var/log/akamai_messages.log"}
logstash_1  | [2020-02-28T14:02:03,053][ERROR][filewatch.readmode.handlers.readzipfile] Cannot decompress the gzip file at path: /var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz
logstash_1  | [2020-02-28T14:03:32,609][ERROR][filewatch.readmode.handlers.readzipfile] Cannot decompress the gzip file at path: /var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz
logstash_1  | [2020-02-28T14:04:57,801][ERROR][filewatch.readmode.handlers.readzipfile] Cannot decompress the gzip file at path: /var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz
logstash_1  | [2020-02-28T14:06:21,140][ERROR][filewatch.readmode.handlers.readzipfile] Cannot decompress the gzip file at path: /var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz
^CGracefully stopping... (press Ctrl+C again to force)

The input configuration

input
{
  file
  {
    path => "/var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz"
    mode => "read"
    start_position => "beginning"
    file_completed_action => "delete"
    sincedb_path => "/dev/null"
  }
}

Tried with 2 different versions ( 7.5.2 and 7.1.1 ) and the issue was reproduceable in both cases

@andsel
Copy link
Contributor

andsel commented Mar 2, 2020

Hi @xo4n I wonder which data Logstash loads from a corrupted file. I mean if the gzip file is corrupted you aren't able to decompress it, so no content should be load from it. I think that probably using the tail mode could help to avoid continue failing, but do you have a shareable corrupted gzip that manifest the problem?

@xo4n
Copy link
Author

xo4n commented Mar 19, 2020

@andsel
Copy link
Contributor

andsel commented Mar 20, 2020

Thanks @xo4n for sharing the file, I was able to reproduce locally. The problem is that when the reading of a line from the gziped input stream

while (line = buffered.readLine(false))
throws an error

java.util.zip.InflaterInputStream.read(java/util/zip/InflaterInputStream.java:164)
java.util.zip.GZIPInputStream.read(java/util/zip/GZIPInputStream.java:117)
sun.nio.cs.StreamDecoder.readBytes(sun/nio/cs/StreamDecoder.java:284)
sun.nio.cs.StreamDecoder.implRead(sun/nio/cs/StreamDecoder.java:326)
sun.nio.cs.StreamDecoder.read(sun/nio/cs/StreamDecoder.java:178)
java.io.InputStreamReader.read(java/io/InputStreamReader.java:184)
java.io.BufferedReader.fill(java/io/BufferedReader.java:161)
java.io.BufferedReader.readLine(java/io/BufferedReader.java:324)
java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:456)
org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:317)
.logstash_minus_input_minus_file.lib.filewatch.read_mode.handlers.read_zip_file.handle_specifically(logstash-input-file/lib/filewatch/read_mode/handlers/read_zip_file.rb:26)

it's then executed only the ensure block

ensure
# rescue each close individually so all close attempts are tried
close_and_ignore_ioexception(buffered) unless buffered.nil?
close_and_ignore_ioexception(decoder) unless decoder.nil?
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
end

that essentially closes the input resources but doesn't delete the file, nor mark it as processes, so the next discovery loop find it again, process, creating the same events as done before, and so on.

There are two possible strategies to solve this:

  • before processing the input gzip file, validate if it's coorrupted or not, if corrupted skip it completely.
  • or process the file as it can, it it gets a java.util.zip.ZipException then move the file in a staging directory that the plugin doesn't read. For this implementation we could imagine some configuration settings. The action to do in case of error (move|delete) the path where move the corrupted file, eventually a boolean switch to enable/disable this behaviour

andsel added a commit to andsel/logstash-input-file that referenced this issue Mar 25, 2020
andsel added a commit to andsel/logstash-input-file that referenced this issue Mar 31, 2020
andsel added a commit to andsel/logstash-input-file that referenced this issue Apr 3, 2020
@andsel
Copy link
Contributor

andsel commented Apr 3, 2020

solved with version 4.1.17

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants