Skip to content

Commit

Permalink
Enforce XOR-relation between one-of fields (#69)
Browse files Browse the repository at this point in the history
- Enforce XOR between options for one-of
- Fix incorrect detection of metadata for one-of fields
- Update protobuf library to 3.23.4
  • Loading branch information
IngaFeick authored Sep 20, 2023
1 parent fe02c99 commit 65454f6
Show file tree
Hide file tree
Showing 14 changed files with 521 additions and 267 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.3.0
- Enforce XOR between options for one-of
- fix incorrect detection of metadata for one-of fields
- Update protobuf library to 3.23.4

## 1.2.10
- Update gem platform to be "java" instead of "jruby" [#67](https://github.com/logstash-plugins/logstash-codec-protobuf/pull/67)

Expand Down
179 changes: 118 additions & 61 deletions lib/logstash/codecs/protobuf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'google/protobuf/struct_pb'
require 'protocol_buffers' # https://github.com/codekitchen/ruby-protocol-buffers, for protobuf2


# Monkey-patch the `Google::Protobuf::DescriptorPool` with a mutex for exclusive
# access.
#
Expand Down Expand Up @@ -189,10 +190,8 @@ def register
load_protobuf_definition(@class_file) if should_register and !@class_file.empty?
# load from `include_path`
include_path.each { |path| load_protobuf_definition(path) } if include_path.length > 0 and should_register

if @protobuf_version == 3
@pb_builder = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).msgclass

else
@pb_builder = pb2_create_instance(class_name)
end
Expand All @@ -213,28 +212,24 @@ def reloadable?
def decode(data)
if @protobuf_version == 3
decoded = @pb_builder.decode(data.to_s)
if @pb3_set_oneof_metainfo
meta = pb3_get_oneof_metainfo(decoded, @class_name)
end
h = pb3_deep_to_hash(decoded)
else
hashed, meta = pb3_to_hash(decoded)
else # version = 2
decoded = @pb_builder.parse(data.to_s)
h = decoded.to_hash
hashed = decoded.to_hash
end
e = LogStash::Event.new(h)
e = LogStash::Event.new(hashed)
if @protobuf_version == 3 and @pb3_set_oneof_metainfo
e.set("[@metadata][pb_oneof]", meta)
end
yield e if block_given?
rescue => ex
@logger.warn("Couldn't decode protobuf: #{ex.inspect}")
if stop_on_error
if @stop_on_error
raise ex
else # keep original message so that the user can debug it.
yield LogStash::Event.new(
"message" => data, "tags" => ["_protobufdecodefailure"],
"decoder_exception" => "#{ex.inspect}"
)
"decoder_exception" => "#{ex.inspect}")
end
end # def decode

Expand All @@ -251,8 +246,46 @@ def encode(event)
end # def encode


# Get the builder class for any given protobuf object from the descriptor pool
# Exposed for testing
# @param [Object] pb_obj The pb object instance to do the lookup for
# @return [Object] The pb builder class
def pb3_class_for_name(pb_obj)
Google::Protobuf::DescriptorPool.generated_pool.lookup(pb_obj.class.descriptor.name)
end

private
def pb3_deep_to_hash(input)

# Helper function for debugging: print data types for fields of a hash
def print_types(hashy, i = 0)
hashy.each do |key, value|
puts ws(i) + "#{key} " + value.class.name
if value.is_a? ::Hash
print_types(value, i + 1)
end
if value.is_a? ::Array
value.each do |v|
puts ws(i + 1) + "" + v.class.name
if v.is_a? ::Hash
print_types(v, i + 2)
end
end
end
end
end

# Helper function for debugging: indent print statements based on recursion level
def ws(i)
" " * i
end


# Converts the pb class to a hash, including its nested objects.
# @param [Object] input The pb class or any of its nested data structures
# @param [Numeric] i Level of recursion, needed only for whitespace indentation in debug output
# @return [Hash, Hash] The converted data as a hash + meta information about the one-of choices.
def pb3_to_hash(input, i = 0)
meta = {}
case input
when Google::Protobuf::Struct
result = JSON.parse input.to_json({
Expand All @@ -261,34 +294,87 @@ def pb3_deep_to_hash(input)
})
when Google::Protobuf::MessageExts # it's a protobuf class
result = Hash.new
# when we've called to_h here it is already a nested hash, for the
# structs we bubble down the original value instead.
input.to_h.each {|key, value|
value = input[key] if input[key].is_a? Google::Protobuf::Struct
result[key] = pb3_deep_to_hash(value) # the key is required for the class lookup of enums.
input.clone().to_h.keys.each {|key|
# 'class' is a reserved word so we cannot send() it to the pb object.
# It would give the pb definition class instead of the value of a field of such name.
if key.to_s == "class"
value = input[key]
else
value = input.send(key)
end
unless value.nil?
r, m = pb3_to_hash(value, 1 + i)
result[key.to_s] = r unless r.nil?
meta[key] = m unless m.empty?
end
}
result, m = oneof_clean(result, input, i)
meta = meta.merge(m) unless m.empty?
when ::Array
when Google::Protobuf::RepeatedField
result = []
meta = []
input.each {|value|
result << pb3_deep_to_hash(value)
r, m = pb3_to_hash(value, 1 + i)
result << r unless r.nil?
meta << m unless r.nil?
}
when ::Hash
when Google::Protobuf::Map
result = {}
input.each {|key, value|
result[key] = pb3_deep_to_hash(value)
r, m = pb3_to_hash(value, 1 + i)
result[key.to_s] = r unless r.nil?
meta[key] = m unless m.empty?
}
when Symbol # is an Enum
result = input.to_s.sub(':','')
else
else # any other scalar
result = input
end
result
return result, meta
end

def pb3_encode(event)

datahash = event.to_hash
# For one-of options, remove the non-chosen options.
# @param [Hash] datahash The data hash including all options for each one-of field
# @param [Object] pb_obj The protobuf class from which datahash was created
# @param [Numeric] i Level of recursion, needed only for whitespace indentation in debug output
# @return [Hash, Hash] The reduced data as a hash + meta information about the one-of choices.
def oneof_clean(datahash, pb_obj, i = 0)
# If a field is part of a one-of then it must only be set if it's the selected option.
# In codec versions <= 1.2.x this was not the case. The .to_h delivered default values
# for every one-of option regardless of which one had been chosen, instead of respecting the XOR relation between them.
# The selected option's field name can be queried from input[parent_field]
# where parent_field is the name of the one-of field outside the option list.
# It's unclear though how to identify a) if a field is part of a one-of struct
# because the class of the chosen option will always be a scalar,
# and b) the name of the parent field.
# As a workaround we look up the names of the 'parent fields' for this class and then the chosen options for those.
# Then we remove the other options which weren't set by the producer.
pb_class = pb3_class_for_name(pb_obj)
meta = {}
unless pb_class.nil?
pb_class.msgclass.descriptor.each_oneof { |field|
# Find out which one-of option has been set
chosen = pb_obj.send(field.name).to_s
# Go through the options and remove the names of the non-chosen fields from the hash
# Whacky solution, better ideas are welcome.
field.each { | group_option |
if group_option.name != chosen
key = group_option.name
datahash.delete(key)
end
}
meta[field.name.to_s] = chosen
}
end # unless
return datahash, meta
end


def pb3_encode(event)
datahash = event.to_hash
is_recursive_call = !event.get('tags').nil? and event.get('tags').include? @pb3_typeconversion_tag
if is_recursive_call
datahash = pb3_remove_typeconversion_tag(datahash)
Expand All @@ -302,7 +388,6 @@ def pb3_encode(event)
end
pb_obj = @pb_builder.new(datahash)
@pb_builder.encode(pb_obj)

rescue ArgumentError => e
k = event.to_hash.keys.join(", ")
@logger.warn("Protobuf encoding error 1: Argument error (#{e.inspect}). Reason: probably mismatching protobuf definition. \
Expand All @@ -317,15 +402,12 @@ def pb3_encode(event)
end




def pb3_handle_type_errors(event, e, is_recursive_call, datahash)
begin
if is_recursive_call
@logger.warn("Protobuf encoding error 2.1: Type error (#{e.inspect}). Some types could not be converted. The event has been discarded. Type mismatches: #{mismatches}.")
else
if @pb3_encoder_autoconvert_types

msg = "Protobuf encoding error 2.2: Type error (#{e.inspect}). Will try to convert the data types. Original data: #{datahash}"
@logger.warn(msg)
mismatches = pb3_get_type_mismatches(datahash, "", @class_name)
Expand Down Expand Up @@ -371,12 +453,10 @@ def pb3_get_type_mismatches(data, key_prefix, pb_class)

def pb3_get_expected_type(key, pb_class)
pb_descriptor = Google::Protobuf::DescriptorPool.generated_pool.lookup(pb_class)

if !pb_descriptor.nil?
pb_builder = pb_descriptor.msgclass
pb_obj = pb_builder.new({})
v = pb_obj.send(key)

if !v.nil?
v.class
else
Expand All @@ -385,9 +465,9 @@ def pb3_get_expected_type(key, pb_class)
end
end


def pb3_compare_datatypes(value, key, key_prefix, pb_class, expected_type)
mismatches = []

if value.nil?
is_mismatch = false
else
Expand Down Expand Up @@ -485,6 +565,7 @@ def pb3_add_tag(event, tag )
end
end


# Due to recursion on nested fields in the event object this method might be given an event (1st call) or a hash (2nd .. nth call)
# First call will be the event object, child objects will be hashes.
def pb3_convert_mismatched_types(struct, mismatches)
Expand Down Expand Up @@ -531,6 +612,7 @@ def pb3_convert_mismatched_types(struct, mismatches)
struct
end


def pb3_prepare_for_encoding(datahash)
# 0) Remove empty fields.
datahash = datahash.select { |key, value| !value.nil? }
Expand All @@ -547,30 +629,6 @@ def pb3_prepare_for_encoding(datahash)
datahash
end

def pb3_get_oneof_metainfo(pb_object, pb_class_name)
meta = {}
pb_class = Google::Protobuf::DescriptorPool.generated_pool.lookup(pb_class_name).msgclass

pb_class.descriptor.each_oneof { |field|
field.each { | group_option |
if !pb_object.send(group_option.name).nil?
meta[field.name] = group_option.name
end
}
}

pb_class.descriptor.select{ |field| field.type == :message }.each { | field |
# recurse over nested protobuf classes
pb_sub_object = pb_object.send(field.name)
if !pb_sub_object.nil? and !field.subtype.nil?
pb_sub_class = pb3_get_descriptorpool_name(field.subtype.msgclass)
meta[field.name] = pb3_get_oneof_metainfo(pb_sub_object, pb_sub_class)
end
}

meta
end


def pb2_encode(event)
data = pb2_prepare_for_encoding(event.to_hash, @class_name)
Expand Down Expand Up @@ -607,7 +665,8 @@ def pb2_prepare_for_encoding(datahash, class_name)
original_value
else
proto_obj = pb2_create_instance(c)
proto_obj.new(pb2_prepare_for_encoding(original_value, c)) # this line is reached in the colourtest for an enum. Enums should not be instantiated. Should enums even be in the messageclasses? I dont think so! TODO bug
proto_obj.new(pb2_prepare_for_encoding(original_value, c)) # this line is reached in the colourtest for an enum.
# Enums should not be instantiated. Should enums even be in the messageclasses? I dont think so!
end # if is array
end # if datahash_include
end # do
Expand All @@ -629,8 +688,7 @@ def pb2_create_instance(name)


def pb3_metadata_analyis(filename)

regex_class_name = /\s*add_message "(?<name>.+?)" do\s+/ # TODO optimize both regexes for speed (negative lookahead)
regex_class_name = /\s*add_message "(?<name>.+?)" do\s+/
regex_pbdefs = /\s*(optional|repeated)(\s*):(?<name>.+),(\s*):(?<type>\w+),(\s*)(?<position>\d+)(, \"(?<enum_class>.*?)\")?/
class_name = ""
type = ""
Expand All @@ -653,16 +711,15 @@ def pb3_metadata_analyis(filename)
end # if
end # readlines
if class_name.nil?
@logger.warn("Error 4: class name not found in file " + filename)
@logger.error("Error 4: class name not found in file " + filename)
raise ArgumentError, "Invalid protobuf file: " + filename
end
rescue Exception => e
@logger.warn("Error 3: unable to read pb definition from file " + filename+ ". Reason: #{e.inspect}. Last settings were: class #{class_name} field #{field_name} type #{type}. Backtrace: " + e.backtrace.inspect.to_s)
@logger.error("Error 3: unable to read pb definition from file " + filename+ ". Reason: #{e.inspect}. Last settings were: class #{class_name} field #{field_name} type #{type}. Backtrace: " + e.backtrace.inspect.to_s)
raise e
end



def pb2_metadata_analyis(filename)
regex_class_start = /\s*set_fully_qualified_name \"(?<name>.+)\".*?/
regex_enum_name = /\s*include ..ProtocolBuffers..Enum\s*/
Expand Down
9 changes: 7 additions & 2 deletions logstash-codec-protobuf.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-protobuf'
s.version = '1.2.10'
s.version = '1.3.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads protobuf messages and converts to Logstash Events"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,7 +21,12 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'google-protobuf', '3.22.2' # for protobuf 3
s.add_runtime_dependency 'google-protobuf', '3.23.4' # for protobuf 3
# 3.23 is the latest version not requiring a ruby update.
# An upgrade to 3.24.3 would require ruby 2.7
# The earliest jruby supporting 2.7 ruby is 9.4 but
# there's an issue with openssl in that. Also logstash itself is using only version 9.3
# https://github.com/elastic/logstash/blob/main/.ruby-version
s.add_runtime_dependency 'ruby-protocol-buffers' # for protobuf 2
s.add_development_dependency 'logstash-devutils'

Expand Down
Loading

0 comments on commit 65454f6

Please sign in to comment.