Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compressable:Zstd comp support #4657

Merged
merged 7 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"])
gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"])
gem.add_runtime_dependency("webrick", ["~> 1.4"])
gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"])

# gems that aren't default gems as of Ruby 3.4
gem.add_runtime_dependency("base64", ["~> 0.2"])
Expand Down
11 changes: 6 additions & 5 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ def to_msgpack_stream(time_int: false, packer: nil)
out.full_pack
end

def to_compressed_msgpack_stream(time_int: false, packer: nil)
def to_compressed_msgpack_stream(time_int: false, packer: nil, type: :gzip)
packed = to_msgpack_stream(time_int: time_int, packer: packer)
compress(packed)
compress(packed, type: type)
end

def to_msgpack_stream_forced_integer(packer: nil)
Expand Down Expand Up @@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil)
end

class CompressedMessagePackEventStream < MessagePackEventStream
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
super
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records)
@decompressed_data = nil
@compressed_data = data
@type = compress
end

def empty?
Expand Down Expand Up @@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil)

def ensure_decompressed!
return if @decompressed_data
@data = @decompressed_data = decompress(@data)
@data = @decompressed_data = decompress(@data, type: @type)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
config_param :queued_chunks_limit_size, :integer, default: nil

desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text
config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text

desc 'If true, chunks are thrown away when unrecoverable error happens'
config_param :disable_chunk_backup, :bool, default: false
Expand Down
71 changes: 64 additions & 7 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ def initialize(metadata, compress: :text)
@size = 0
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now

extend Decompressable if compress == :gzip
if compress == :gzip
extend GzipDecompressable
elsif compress == :zstd
extend ZstdDecompressable
end
end

attr_reader :unique_id, :metadata, :state
Expand All @@ -85,7 +88,7 @@ def modified_at

# data is array of formatted record string
def append(data, **kwargs)
raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
raise ArgumentError, "`compress: #{kwargs[:compress]}` can be used for Compressable module" if kwargs[:compress] == :gzip || kwargs[:compress] == :zstd
adding = ''.b
data.each do |d|
adding << d.b
Expand Down Expand Up @@ -165,23 +168,23 @@ def purge
end

def read(**kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
raise NotImplementedError, "Implement this method in child class"
end

def open(**kwargs, &block)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
raise NotImplementedError, "Implement this method in child class"
end

def write_to(io, **kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
open do |i|
IO.copy_stream(i, io)
end
end

module Decompressable
module GzipDecompressable
include Fluent::Plugin::Compressable

def append(data, **kwargs)
Expand Down Expand Up @@ -234,6 +237,60 @@ def write_to(io, **kwargs)
end
end
end

module ZstdDecompressable
include Fluent::Plugin::Compressable

def append(data, **kwargs)
if kwargs[:compress] == :zstd
io = StringIO.new
stream = Zstd::StreamWriter.new(io)
data.each do |d|
stream.write(d)
end
stream.finish
concat(io.string, data.size)
else
super
end
end

def open(**kwargs, &block)
if kwargs[:compressed] == :zstd
super
else
super(**kwargs) do |chunk_io|
output_io = if chunk_io.is_a?(StringIO)
StringIO.new
else
Tempfile.new('decompressed-data')
end
output_io.binmode if output_io.is_a?(Tempfile)
decompress(input_io: chunk_io, output_io: output_io, type: :zstd)
output_io.seek(0, IO::SEEK_SET)
yield output_io
end
end
end

def read(**kwargs)
if kwargs[:compressed] == :zstd
super
else
decompress(super,type: :zstd)
end
end

def write_to(io, **kwargs)
open(compressed: :zstd) do |chunk_io|
if kwargs[:compressed] == :zstd
IO.copy_stream(chunk_io, io)
else
decompress(input_io: chunk_io, output_io: io, type: :zstd)
end
end
end
end
end
end
end
Expand Down
90 changes: 68 additions & 22 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,81 +16,127 @@

require 'stringio'
require 'zlib'
require 'zstd-ruby'

module Fluent
module Plugin
module Compressable
def compress(data, **kwargs)
def compress(data, type: :gzip, **kwargs)
output_io = kwargs[:output_io]
io = output_io || StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
gz.write data
if type == :gzip
writer = Zlib::GzipWriter.new(io)
elsif type == :zstd
writer = Zstd::StreamWriter.new(io)
else
raise ArgumentError, "Unknown compression type: #{type}"
end

writer.write(data)
writer.finish
output_io || io.string
end

# compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
# https://www.ruby-forum.com/topic/971591#979503
def decompress(compressed_data = nil, output_io: nil, input_io: nil)
def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip)
case
when input_io && output_io
io_decompress(input_io, output_io)
io_decompress(input_io, output_io, type)
when input_io
output_io = StringIO.new
io = io_decompress(input_io, output_io)
io = io_decompress(input_io, output_io, type)
io.string
when compressed_data.nil? || compressed_data.empty?
# check compressed_data(String) is 0 length
compressed_data
when output_io
# execute after checking compressed_data is empty or not
io = StringIO.new(compressed_data)
io_decompress(io, output_io)
io_decompress(io, output_io, type)
else
string_decompress(compressed_data)
string_decompress(compressed_data, type)
end
end

private

def string_decompress(compressed_data)
def string_decompress_gzip(compressed_data)
io = StringIO.new(compressed_data)

out = ''
loop do
gz = Zlib::GzipReader.new(io)
out << gz.read
unused = gz.unused
gz.finish

reader = Zlib::GzipReader.new(io)
out << reader.read
unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
io.pos -= adjust
end
break if io.eof?
end
out
end

def string_decompress_zstd(compressed_data)
io = StringIO.new(compressed_data)
out = ''
loop do
reader = Zstd::StreamReader.new(io)
# Zstd::StreamReader needs to specify the size of the buffer
out << reader.read(1024)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if io.eof?
end
out
end

def io_decompress(input, output)
def string_decompress(compressed_data, type = :gzip)
if type == :gzip
string_decompress_gzip(compressed_data)
elsif type == :zstd
string_decompress_zstd(compressed_data)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
end

def io_decompress_gzip(input, output)
loop do
gz = Zlib::GzipReader.new(input)
v = gz.read
reader = Zlib::GzipReader.new(input)
v = reader.read
output.write(v)
unused = gz.unused
gz.finish

unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
input.pos -= adjust
end
break if input.eof?
end
output
end

def io_decompress_zstd(input, output)
loop do
reader = Zstd::StreamReader.new(input)
# Zstd::StreamReader needs to specify the size of the buffer
v = reader.read(1024)
output.write(v)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
break if input.eof?
end
output
end

def io_decompress(input, output, type = :gzip)
if type == :gzip
io_decompress_gzip(input, output)
elsif type == :zstd
io_decompress_zstd(input, output)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
end
end
end
end
12 changes: 8 additions & 4 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,14 @@ def on_message(msg, chunk_size, conn)
case entries
when String
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
option = msg[2] || {}
size = option['size'] || 0

if option['compressed'] && option['compressed'] != 'text'
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym)
else
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
end
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
if @enable_field_injection
es = add_source_info(es, conn)
Expand Down
Loading
Loading