diff --git a/fluentd.gemspec b/fluentd.gemspec index ff225597d3..4d18e3562b 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -33,6 +33,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"]) gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"]) gem.add_runtime_dependency("webrick", ["~> 1.4"]) + gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"]) # gems that aren't default gems as of Ruby 3.4 gem.add_runtime_dependency("base64", ["~> 0.2"]) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index bcb06998ed..471094a6d4 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -203,7 +203,7 @@ class MessagePackEventStream < EventStream # https://github.com/msgpack/msgpack-ruby/issues/119 # Keep cached_unpacker argument for existing plugins - def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) + def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: nil) @data = data @size = size @unpacked_times = unpacked_times @@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil) end class CompressedMessagePackEventStream < MessagePackEventStream - def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) + def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip) super @decompressed_data = nil @compressed_data = data + @type = compress end def empty? @@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil) def ensure_decompressed! return if @decompressed_data - @data = @decompressed_data = decompress(@data) + @data = @decompressed_data = decompress(@data, type: @type) end end diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb index aa242478fe..35c2bb94f7 100644 --- a/lib/fluent/plugin/compressable.rb +++ b/lib/fluent/plugin/compressable.rb @@ -16,29 +16,36 @@ require 'stringio' require 'zlib' +require 'zstd-ruby' module Fluent module Plugin module Compressable - def compress(data, **kwargs) + def compress(data, type: :gzip, **kwargs) output_io = kwargs[:output_io] + writer = nil io = output_io || StringIO.new - Zlib::GzipWriter.wrap(io) do |gz| - gz.write data + if type == :gzip + writer = Zlib::GzipWriter.new(io) + elsif type == :zstd + writer = Zstd::StreamWriter.new(io) + else + raise ArgumentError, "Unknown compression type: #{type}" end - + writer.write(data) + writer.finish output_io || io.string end # compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)` # https://www.ruby-forum.com/topic/971591#979503 - def decompress(compressed_data = nil, output_io: nil, input_io: nil) + def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip) case when input_io && output_io - io_decompress(input_io, output_io) + io_decompress(input_io, output_io, type) when input_io output_io = StringIO.new - io = io_decompress(input_io, output_io) + io = io_decompress(input_io, output_io, type) io.string when compressed_data.nil? || compressed_data.empty? # check compressed_data(String) is 0 length @@ -46,27 +53,36 @@ def decompress(compressed_data = nil, output_io: nil, input_io: nil) when output_io # execute after checking compressed_data is empty or not io = StringIO.new(compressed_data) - io_decompress(io, output_io) + io_decompress(io, output_io, type) else - string_decompress(compressed_data) + string_decompress(compressed_data, type) end end private - def string_decompress(compressed_data) + def string_decompress(compressed_data, type = :gzip) io = StringIO.new(compressed_data) out = '' loop do - gz = Zlib::GzipReader.new(io) - out << gz.read - unused = gz.unused - gz.finish + if type == :gzip + reader = Zlib::GzipReader.new(io) + out << reader.read + unused = reader.unused + reader.finish - unless unused.nil? - adjust = unused.length - io.pos -= adjust + unless unused.nil? + adjust = unused.length + io.pos -= adjust + end + elsif type == :zstd + reader = Zstd::StreamReader.new(io) + # Zstd::StreamReader needs to specify the size of the buffer + out << reader.read(1024) + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + else + raise ArgumentError, "Unknown compression type: #{type}" end break if io.eof? end @@ -74,17 +90,27 @@ def string_decompress(compressed_data) out end - def io_decompress(input, output) + def io_decompress(input, output, type = :gzip) loop do - gz = Zlib::GzipReader.new(input) - v = gz.read - output.write(v) - unused = gz.unused - gz.finish - - unless unused.nil? - adjust = unused.length - input.pos -= adjust + reader = nil + if type == :gzip + reader = Zlib::GzipReader.new(input) + v = reader.read + output.write(v) + unused = reader.unused + reader.finish + unless unused.nil? + adjust = unused.length + input.pos -= adjust + end + elsif type == :zstd + reader = Zstd::StreamReader.new(input) + # Zstd::StreamReader needs to specify the size of the buffer + v = reader.read(1024) + output.write(v) + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + else + raise ArgumentError, "Unknown compression type: #{type}" end break if input.eof? end diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index eb8b3c629e..05d2262533 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -309,8 +309,7 @@ def on_message(msg, chunk_size, conn) # PackedForward option = msg[2] size = (option && option['size']) || 0 - es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream - es = es_class.new(entries, nil, size.to_i) + es = (option && option['compressed']!=nil && option['compressed']!="text") ? Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym) : Fluent::MessagePackEventStream.new(entries, nil, size.to_i) es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event if @enable_field_injection es = add_source_info(es, conn) diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 9a3b46fd9a..e32dbe8c1b 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -553,7 +553,7 @@ def create_driver(conf=base_config) chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack # check CompressedMessagePackEventStream is created - mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0) + mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :gzip) d.run do Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|