Skip to content

Commit

Permalink
Added zstd support for msgs
Browse files Browse the repository at this point in the history
Signed-off-by: Athish Pranav D <[email protected]>
  • Loading branch information
Athishpranav2003 committed Oct 3, 2024
1 parent 7f13c7a commit fa939ba
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 33 deletions.
1 change: 1 addition & 0 deletions fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"])
gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"])
gem.add_runtime_dependency("webrick", ["~> 1.4"])
gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"])

# gems that aren't default gems as of Ruby 3.4
gem.add_runtime_dependency("base64", ["~> 0.2"])
Expand Down
7 changes: 4 additions & 3 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class MessagePackEventStream < EventStream
# https://github.com/msgpack/msgpack-ruby/issues/119

# Keep cached_unpacker argument for existing plugins
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: nil)
@data = data
@size = size
@unpacked_times = unpacked_times
Expand Down Expand Up @@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil)
end

class CompressedMessagePackEventStream < MessagePackEventStream
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
super
@decompressed_data = nil
@compressed_data = data
@type = compress
end

def empty?
Expand Down Expand Up @@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil)

def ensure_decompressed!
return if @decompressed_data
@data = @decompressed_data = decompress(@data)
@data = @decompressed_data = decompress(@data, type: @type)
end
end

Expand Down
80 changes: 53 additions & 27 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,75 +16,101 @@

require 'stringio'
require 'zlib'
require 'zstd-ruby'

module Fluent
module Plugin
module Compressable
def compress(data, **kwargs)
def compress(data, type: :gzip, **kwargs)
output_io = kwargs[:output_io]
writer = nil
io = output_io || StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
gz.write data
if type == :gzip
writer = Zlib::GzipWriter.new(io)
elsif type == :zstd
writer = Zstd::StreamWriter.new(io)
else
raise ArgumentError, "Unknown compression type: #{type}"
end

writer.write(data)
writer.finish
output_io || io.string
end

# compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
# https://www.ruby-forum.com/topic/971591#979503
def decompress(compressed_data = nil, output_io: nil, input_io: nil)
def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip)
case
when input_io && output_io
io_decompress(input_io, output_io)
io_decompress(input_io, output_io, type)
when input_io
output_io = StringIO.new
io = io_decompress(input_io, output_io)
io = io_decompress(input_io, output_io, type)
io.string
when compressed_data.nil? || compressed_data.empty?
# check compressed_data(String) is 0 length
compressed_data
when output_io
# execute after checking compressed_data is empty or not
io = StringIO.new(compressed_data)
io_decompress(io, output_io)
io_decompress(io, output_io, type)
else
string_decompress(compressed_data)
string_decompress(compressed_data, type)
end
end

private

def string_decompress(compressed_data)
def string_decompress(compressed_data, type = :gzip)
io = StringIO.new(compressed_data)

out = ''
loop do
gz = Zlib::GzipReader.new(io)
out << gz.read
unused = gz.unused
gz.finish
if type == :gzip
reader = Zlib::GzipReader.new(io)
out << reader.read
unused = reader.unused
reader.finish

unless unused.nil?
adjust = unused.length
io.pos -= adjust
unless unused.nil?
adjust = unused.length
io.pos -= adjust
end
elsif type == :zstd
reader = Zstd::StreamReader.new(io)
# Zstd::StreamReader needs to specify the size of the buffer
out << reader.read(1024)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
else
raise ArgumentError, "Unknown compression type: #{type}"
end
break if io.eof?
end

out
end

def io_decompress(input, output)
def io_decompress(input, output, type = :gzip)
loop do
gz = Zlib::GzipReader.new(input)
v = gz.read
output.write(v)
unused = gz.unused
gz.finish

unless unused.nil?
adjust = unused.length
input.pos -= adjust
reader = nil
if type == :gzip
reader = Zlib::GzipReader.new(input)
v = reader.read
output.write(v)
unused = reader.unused
reader.finish
unless unused.nil?
adjust = unused.length
input.pos -= adjust
end
elsif type == :zstd
reader = Zstd::StreamReader.new(input)
# Zstd::StreamReader needs to specify the size of the buffer
v = reader.read(1024)
output.write(v)
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
else
raise ArgumentError, "Unknown compression type: #{type}"
end
break if input.eof?
end
Expand Down
3 changes: 1 addition & 2 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,7 @@ def on_message(msg, chunk_size, conn)
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = (option && option['compressed']!=nil && option['compressed']!="text") ? Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym) : Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
if @enable_field_injection
es = add_source_info(es, conn)
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def create_driver(conf=base_config)
chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack

# check CompressedMessagePackEventStream is created
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0)
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :gzip)

d.run do
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
Expand Down

0 comments on commit fa939ba

Please sign in to comment.