From fb2f81bbd0b1335a2771da8986f4377fe65f47af Mon Sep 17 00:00:00 2001 From: heanlan Date: Thu, 8 Sep 2022 19:33:15 -0400 Subject: [PATCH] Address comments Signed-off-by: heanlan --- .../clickhouseclient/clickhouseclient_test.go | 3 +- pkg/flowaggregator/exporter/clickhouse.go | 7 +++-- pkg/flowaggregator/exporter/ipfix.go | 31 +++---------------- pkg/flowaggregator/exporter/s3.go | 7 +++-- pkg/flowaggregator/exporter/utils.go | 16 +++++----- .../s3uploader/s3uploader_test.go | 8 +++-- 6 files changed, 30 insertions(+), 42 deletions(-) diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index 7e25f5b8f00..3d7f46eae06 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -25,6 +25,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/gammazero/deque" "github.com/golang/mock/gomock" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing" @@ -40,7 +41,7 @@ func init() { registry.LoadRegistry() } -const fakeClusterUUID = "7e2e1de2-c85f-476e-ab1a-fce1bf83ee2c" +var fakeClusterUUID = uuid.New().String() func TestGetDataSourceName(t *testing.T) { chInput := ClickHouseInput{ diff --git a/pkg/flowaggregator/exporter/clickhouse.go b/pkg/flowaggregator/exporter/clickhouse.go index 57ac84cc6ce..d6cb360f8fa 100644 --- a/pkg/flowaggregator/exporter/clickhouse.go +++ b/pkg/flowaggregator/exporter/clickhouse.go @@ -45,8 +45,11 @@ func buildClickHouseInput(opt *options.Options) clickhouseclient.ClickHouseInput func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options) (*ClickHouseExporter, error) { chInput := buildClickHouseInput(opt) klog.InfoS("ClickHouse configuration", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval) - clusterUUID := getClusterUUID(k8sClient) - chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput, clusterUUID) + clusterUUID, err := getClusterUUID(k8sClient) + if err != nil { + return nil, err + } + chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput, clusterUUID.String()) if err != nil { return nil, err } diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index 9e19af90199..75f10f051ba 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -18,17 +18,14 @@ import ( "fmt" "hash/fnv" "sync" - "time" "github.com/google/uuid" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/exporter" ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/clusteridentity" "antrea.io/antrea/pkg/flowaggregator/infoelements" "antrea.io/antrea/pkg/flowaggregator/options" "antrea.io/antrea/pkg/ipfix" @@ -59,31 +56,11 @@ type IPFIXExporter struct { // genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the // user through the flow aggregator configuration. It will first try to generate one // deterministically based on the cluster UUID (if available, with a timeout of 10s). Otherwise, it -// will generate a random one. The cluster UUID should be available if Antrea is deployed to the -// cluster ahead of the flow aggregator, which is the expectation since when deploying flow -// aggregator as a Pod, networking needs to be configured by the CNI plugin. +// will generate a random one. func genObservationDomainID(k8sClient kubernetes.Interface) uint32 { - const retryInterval = time.Second - const timeout = 10 * time.Second - const defaultAntreaNamespace = "kube-system" - - clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( - defaultAntreaNamespace, - clusteridentity.DefaultClusterIdentityConfigMapName, - k8sClient, - ) - var clusterUUID uuid.UUID - if err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { - clusterIdentity, _, err := clusterIdentityProvider.Get() - if err != nil { - return false, nil - } - clusterUUID = clusterIdentity.UUID - return true, nil - }); err != nil { - klog.InfoS( - "Unable to retrieve cluster UUID; will generate a random observation domain ID", "timeout", timeout, "ConfigMapNameSpace", defaultAntreaNamespace, "ConfigMapName", clusteridentity.DefaultClusterIdentityConfigMapName, - ) + clusterUUID, err := getClusterUUID(k8sClient) + if err != nil { + klog.InfoS("Error when retrieving cluster UUID; will generate a random observation domain ID", "error", err) clusterUUID = uuid.New() } h := fnv.New32() diff --git a/pkg/flowaggregator/exporter/s3.go b/pkg/flowaggregator/exporter/s3.go index 7ad0f074ad1..9713687a1cc 100644 --- a/pkg/flowaggregator/exporter/s3.go +++ b/pkg/flowaggregator/exporter/s3.go @@ -38,8 +38,11 @@ func buildS3Input(opt *options.Options) s3uploader.S3Input { func NewS3Exporter(k8sClient kubernetes.Interface, opt *options.Options) (*S3Exporter, error) { s3Input := buildS3Input(opt) klog.InfoS("S3Uploader configuration", "bucketName", s3Input.Config.BucketName, "bucketPrefix", s3Input.Config.BucketPrefix, "region", s3Input.Config.Region, "recordFormat", s3Input.Config.RecordFormat, "compress", *s3Input.Config.Compress, "maxRecordsPerFile", s3Input.Config.MaxRecordsPerFile, "uploadInterval", s3Input.UploadInterval) - clusterUUID := getClusterUUID(k8sClient) - s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input, clusterUUID) + clusterUUID, err := getClusterUUID(k8sClient) + if err != nil { + return nil, err + } + s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input, clusterUUID.String()) if err != nil { return nil, err } diff --git a/pkg/flowaggregator/exporter/utils.go b/pkg/flowaggregator/exporter/utils.go index aa7b9a0de3c..25793948b5e 100644 --- a/pkg/flowaggregator/exporter/utils.go +++ b/pkg/flowaggregator/exporter/utils.go @@ -15,17 +15,22 @@ package exporter import ( + "fmt" "time" "github.com/google/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" "antrea.io/antrea/pkg/clusteridentity" ) -func getClusterUUID(k8sClient kubernetes.Interface) string { +// getClusterUUID retrieves the cluster UUID (if available, with a timeout of 10s). +// Otherwise, it returns an empty cluster UUID and error. The cluster UUID should +// be available if Antrea is deployed to the cluster ahead of the flow aggregator, +// which is the expectation since when deploying flow aggregator as a Pod, +// networking needs to be configured by the CNI plugin. +func getClusterUUID(k8sClient kubernetes.Interface) (uuid.UUID, error) { const retryInterval = time.Second const timeout = 10 * time.Second const defaultAntreaNamespace = "kube-system" @@ -44,10 +49,7 @@ func getClusterUUID(k8sClient kubernetes.Interface) string { clusterUUID = clusterIdentity.UUID return true, nil }); err != nil { - klog.InfoS( - "Unable to retrieve cluster UUID; will generate a random UUID", "timeout", timeout, "ConfigMapNameSpace", defaultAntreaNamespace, "ConfigMapName", clusteridentity.DefaultClusterIdentityConfigMapName, - ) - clusterUUID = uuid.New() + return clusterUUID, fmt.Errorf("unable to retrieve cluster UUID, timeout: %v, ConfigMapNameSpace: %s, ConfigMapName: %s", timeout, defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName) } - return clusterUUID.String() + return clusterUUID, nil } diff --git a/pkg/flowaggregator/s3uploader/s3uploader_test.go b/pkg/flowaggregator/s3uploader/s3uploader_test.go index eccda6773fe..a98016bd02d 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader_test.go +++ b/pkg/flowaggregator/s3uploader/s3uploader_test.go @@ -26,6 +26,7 @@ import ( s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/golang/mock/gomock" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing" "github.com/vmware/go-ipfix/pkg/registry" @@ -35,13 +36,14 @@ import ( flowaggregatortesting "antrea.io/antrea/pkg/flowaggregator/testing" ) -const ( - seed = 1 - fakeClusterUUID = "7e2e1de2-c85f-476e-ab1a-fce1bf83ee2c" +var ( + fakeClusterUUID = uuid.New().String() recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,{\"antrea-e2e\":\"perftest-a\",\"app\":\"perftool\"},{\"antrea-e2e\":\"perftest-b\",\"app\":\"perftool\"},15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,{\"antrea-e2e\":\"perftest-a\",\"app\":\"perftool\"},{\"antrea-e2e\":\"perftest-b\",\"app\":\"perftool\"},15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID ) +const seed = 1 + type mockS3Uploader struct { testReader *bytes.Buffer testReaderMutex sync.Mutex