-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Kafka flow export in Flow Aggregator #2466
Conversation
This pull request introduces 1 alert when merging 308835b into fb28c8e - view on LGTM.com new alerts:
|
13475b3
to
6dcc491
Compare
This pull request introduces 1 alert when merging 6dcc491 into cdc8453 - view on LGTM.com new alerts:
|
This pull request introduces 1 alert when merging 33988d7 into cdc8453 - view on LGTM.com new alerts:
|
This pull request introduces 1 alert when merging 93a6667 into 6f33da3 - view on LGTM.com new alerts:
|
57b07dc
to
26e399f
Compare
This pull request introduces 1 alert when merging 26e399f into 7f90fc0 - view on LGTM.com new alerts:
|
26e399f
to
0d86a27
Compare
/test-all |
This pull request introduces 1 alert when merging 0d86a27 into ea26e6a - view on LGTM.com new alerts:
|
Codecov Report
@@ Coverage Diff @@
## main #2466 +/- ##
===========================================
- Coverage 61.43% 40.84% -20.59%
===========================================
Files 284 158 -126
Lines 23470 19557 -3913
===========================================
- Hits 14418 7989 -6429
- Misses 7504 10818 +3314
+ Partials 1548 750 -798
Flags with carried forward coverage won't be shown. Click here to find out more.
|
build/yamls/flow-aggregator.yml
Outdated
# Provide the Kafka broker address to send the aggregated flow records in the | ||
# given protobuf format. Kafka flow export is only enabled if the broker address | ||
# is provided. | ||
#kafkaBrokerAddress: "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we support multiple broker addresses here? if so, we can specify the format (e.g. string divided by comma)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the plan is to support a single broker. In the future, when we have the requirement of multiple brokers, we can look at enhancing this configmap parameter.
build/yamls/flow-aggregator.yml
Outdated
# of multiple proto formats, this config parameter could take in multiple values. | ||
#kafkaProtoSchema: "AntreaFlowMsg" | ||
|
||
# Provide the topic name on Kafka Broker, where the flow messages are intended |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broker
-> broker
to keep consistent with other usage in description.
hack/update-codegen.sh
Outdated
@@ -22,7 +22,7 @@ ANTREA_ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )/../" && pwd )" | |||
IMAGE_NAME="antrea/codegen:kubernetes-1.21.0" | |||
|
|||
function docker_run() { | |||
docker pull ${IMAGE_NAME} | |||
#docker pull ${IMAGE_NAME} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment by mistake?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using a local image for codegen with new verison of protoc-gen-go. Therefore, commenting this when running codegen on local machine. Once the image on antrea repo on docker registry is updated (before merging this PR), I will remove this.
pkg/apis/cni/v1beta1/cni.pb.go
Outdated
@@ -1,29 +1,43 @@ | |||
// Copyright 2019 Antrea Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are file changes of cni related to Kafka flow export support for flow aggregator? If not, I feel this should be put in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because of version change in protoc-gen-go. All this is generated code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @zyiou (left another comment above). The Protobuf version upgrade should be handled in a separate PR, even if the upgrade is only motivated by this change.
0d86a27
to
6266f94
Compare
Tagging this as v1.3. Will revisit the milestone after one round of reviews if needed. |
This pull request introduces 1 alert when merging 6266f94 into 4aaa90e - view on LGTM.com new alerts:
|
CACertFile = "ca.crt" | ||
TLSCertFile = "tls.crt" | ||
TLSKeyFile = "tls.key" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to clarify, ca.crt is to authenticate the Kafka broker, while tls.crt and tls.key are to authenticate the aggregator to the broker?
maybe this should be covered by comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two different options. Added comments.
pkg/apis/cni/v1beta1/cni.pb.go
Outdated
@@ -1,29 +1,43 @@ | |||
// Copyright 2019 Antrea Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @zyiou (left another comment above). The Protobuf version upgrade should be handled in a separate PR, even if the upgrade is only motivated by this change.
pkg/flowaggregator/flowaggregator.go
Outdated
klog.Errorf("Error when initializing kafka producer: %v, will retry in %s", err, fa.activeFlowRecordTimeout) | ||
} else { | ||
klog.Info("Initialized kafka producer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use structured logging for all new log messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
6266f94
to
cf9aefa
Compare
Switching milestone to v1.4 after talking with Srikar. |
413db08
to
3ead296
Compare
8d926fd
to
b855a0e
Compare
Thanks @antoninbas for comments. Addressed them. Please take a look at this PR again. |
build/yamls/flow-aggregator.yml
Outdated
# Provide the Kafka broker address to send the aggregated flow records in the | ||
# given protobuf format. Kafka flow export is only enabled if the broker address | ||
# is provided. | ||
#kafkaBrokerAddress: "" | ||
|
||
# Provide the Kafka proto schema name. Currently, Flow Aggregator supports only | ||
# one Antrea proto format called AntreaFlowMsg. In the future, with support | ||
# of multiple proto formats, this config parameter could take in multiple values. | ||
#kafkaProtoSchema: "AntreaFlowMsg" | ||
|
||
# Provide the topic name on Kafka Broker, where the flow messages are intended | ||
# to be received. | ||
#kafkaBrokerTopic: "AntreaTopic" | ||
|
||
# Enable TLS communication between the Kafka producer and the Kafka broker. | ||
#kafkaTLSEnable: false | ||
|
||
# Enable or disable the TLS certificate verification. This is to support the | ||
# Kafka TLS certs that are either self-signed, mis-configured or expired. | ||
#kafkaTLSSkipVerify: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if we need to repeat the kafka
prefix for all sub-parameters
build/yamls/flow-aggregator.yml
Outdated
# KafkaParams contains all configuration parameters required to export flow | ||
# records to a Kafka broker. | ||
kafkaParams: | ||
# Provide the Kafka broker address to send the aggregated flow records in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide the Kafka broker address to which the aggregated flow records will be sent, in the given Protobuf format
build/yamls/flow-aggregator.yml
Outdated
# Provide the Kafka proto schema name. Currently, Flow Aggregator supports only | ||
# one Antrea proto format called AntreaFlowMsg. In the future, with support | ||
# of multiple proto formats, this config parameter could take in multiple values. | ||
#kafkaProtoSchema: "AntreaFlowMsg" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/proto/Protobuf
- name: flow-aggregator-kafka-tls | ||
secret: | ||
secretName: flow-aggregator-kafka-tls | ||
defaultMode: 0400 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice we use 256 for the antrea controller, which is the same value but in decimal
it would be good to harmonize: 0400 makes more sense to me, so not sure why we are using 256 elsewhere
should be in a follow-up PR though
ci/kind/test-e2e-kind.sh
Outdated
"projects.registry.vmware.com/antrea/ipfix-collector:v0.5.7" \ | ||
"projects.registry.vmware.com/antrea/wireguard-go:0.0.20210424") | ||
FLOW_VISIBILITY_IMAGES_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:latest" \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ipfix-collector listed twice with different tags?
I think we should always specify a released version. Even if it means more frequent updates, the tests are less likely to break.
caCertPath = path.Join(certDir, CACertFile) | ||
tlsCertPath = path.Join(certDir, TLSCertFile) | ||
tlsKeyPath = path.Join(certDir, TLSKeyFile) | ||
// The secret may be created after the Pod is created, for example, when cert-manager is used the secret | ||
// is created asynchronously. It waits for a while before it's considered to be failed. | ||
if err = wait.PollImmediate(2*time.Second, certReadyTimeout, func() (bool, error) { | ||
for _, path := range []string{caCertPath, tlsCertPath, tlsKeyPath} { | ||
f, err := os.Open(path) | ||
if err != nil { | ||
klog.Warningf("Couldn't read %s when applying the kafka TLS certificate, retrying", path) | ||
return false, nil | ||
} | ||
f.Close() | ||
} | ||
return true, nil | ||
}); err != nil { | ||
return fmt.Errorf("error reading Kafka TLS CA cert (%s), cert (%s), and key (%s) files present at \"%s\"", CACertFile, TLSCertFile, TLSKeyFile, certDir) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no reload mechanism for the certificates? what we do for the antrea-controller's apiserver I think is that the certificates are reloaded from file periodically and changes are detected
const ( | ||
AntreaFlowMsg string = "AntreaFlowMsg" | ||
AntreaTopic string = "AntreaTopic" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't make it clear what the constant represents when you use it
if possible we should have somthing like this
type ProtobufSchema string
const (
ProtobufSchemaAntreaFlowMsg ProtobufSchema = "AntreaFlowMsg"
)
pkg/flowaggregator/flowaggregator.go
Outdated
if fa.kafkaProducer.GetSaramaProducer() == nil { | ||
err := fa.kafkaProducer.InitSaramaProducer() | ||
if err != nil { | ||
klog.ErrorS(err, "Error when initializing kafka producer", "retry timeout", fa.activeFlowRecordTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uniformize
s/kafka/Kafka
klog.Warningf("Do not expect source IP: %v to be filled already", flowMsg.SrcIP) | ||
} | ||
flowMsg.SrcIP = ie.Value.(net.IP).String() | ||
case "destinationIPv4Address", "destinationIPv6Address": | ||
if flowMsg.DstIP != "" { | ||
klog.Warningf("Do not expect destination IP: %v to be filled already", flowMsg.DstIP) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as part of the transition to structured logging, I don't think we use klog.Warningf
, but just klog.InfoS
. @tnqn is that correct?
@@ -0,0 +1,66 @@ | |||
// Copyright 2021 VMware, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the proto file be versioned if you make breaking changes in the future?
b855a0e
to
dd94b3d
Compare
cmd/flow-aggregator/config.go
Outdated
// given protobuf format. Kafka flow export is only enabled | ||
// if the broker address is provided. | ||
// Defaults to "" | ||
BrokerAddress string `yaml:"kafkaBrokerAddress,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field names in this file are not consistent with those defined in conf file. I think that's why the e2e tests do not pass.
uint64 TimeFlowStartInMilliSecs = 27; | ||
uint32 TimeFlowEndInMilliSecs = 28; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noticed that the data formats are different. We can remove these as they are not used in our record.
@srikartati Do we still target v1.4 for this PR? |
We have been working on a different pipeline, so halted this PR. Removed the release milestone. |
dd94b3d
to
17199df
Compare
This commit supports Kafka flow export in Flow Aggregator by adding Kafka producer. Given a Kafka broker, Kafka producer converts the aggregated flow record to a Kafka flow message and send it to the Kafka broker, which will inturn be sent to the Kafka consumer, where it could get decoded. Our Kafka producer and consumer are dependent on sarama Kafka library. Our Kafka producer in Flow Aggregator has the capability to support multiple proto schema. However, we are currently supporting one proto schema, which is named as "AntreaFlowMsg". We add e2e tests by utilizing the Kafka flow collector provided in go-ipfix repo. Kafka flow collector is a combination of a lean Kafka broker and Kafka consumer. Signed-off-by: Srikar Tati <[email protected]>
17199df
to
f44030a
Compare
Thanks, @zyiou for agreeing to take over this PR. I provided you the write access to my fork. |
This commit supports Kafka flow export in Flow Aggregator
by adding Kafka producer. Given a Kafka broker, Kafka producer
converts the aggregated flow record to a Kafka flow message
and send it to the Kafka broker, which will inturn be sent to
the Kafka consumer, where it could get decoded. Our Kafka producer
and consumer are dependent on sarama Kafka library.
Our Kafka producer in Flow Aggregator has the capability to support
multiple proto schema. However, we are currently supporting one proto
schema, which is named as "AntreaFlowMsg".
We add e2e tests by utilizing the Kafka flow collector provided
in go-ipfix repo. Kafka flow collector is a combination of a lean
Kafka broker and Kafka consumer.
Signed-off-by: Srikar Tati [email protected]