Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <[email protected]>
  • Loading branch information
heanlan committed Sep 8, 2022
1 parent 549b563 commit fb2f81b
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 42 deletions.
3 changes: 2 additions & 1 deletion pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/gammazero/deque"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing"
Expand All @@ -40,7 +41,7 @@ func init() {
registry.LoadRegistry()
}

const fakeClusterUUID = "7e2e1de2-c85f-476e-ab1a-fce1bf83ee2c"
var fakeClusterUUID = uuid.New().String()

func TestGetDataSourceName(t *testing.T) {
chInput := ClickHouseInput{
Expand Down
7 changes: 5 additions & 2 deletions pkg/flowaggregator/exporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ func buildClickHouseInput(opt *options.Options) clickhouseclient.ClickHouseInput
func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options) (*ClickHouseExporter, error) {
chInput := buildClickHouseInput(opt)
klog.InfoS("ClickHouse configuration", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval)
clusterUUID := getClusterUUID(k8sClient)
chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput, clusterUUID)
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
return nil, err
}
chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput, clusterUUID.String())
if err != nil {
return nil, err
}
Expand Down
31 changes: 4 additions & 27 deletions pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@ import (
"fmt"
"hash/fnv"
"sync"
"time"

"github.com/google/uuid"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/exporter"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/clusteridentity"
"antrea.io/antrea/pkg/flowaggregator/infoelements"
"antrea.io/antrea/pkg/flowaggregator/options"
"antrea.io/antrea/pkg/ipfix"
Expand Down Expand Up @@ -59,31 +56,11 @@ type IPFIXExporter struct {
// genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the
// user through the flow aggregator configuration. It will first try to generate one
// deterministically based on the cluster UUID (if available, with a timeout of 10s). Otherwise, it
// will generate a random one. The cluster UUID should be available if Antrea is deployed to the
// cluster ahead of the flow aggregator, which is the expectation since when deploying flow
// aggregator as a Pod, networking needs to be configured by the CNI plugin.
// will generate a random one.
func genObservationDomainID(k8sClient kubernetes.Interface) uint32 {
const retryInterval = time.Second
const timeout = 10 * time.Second
const defaultAntreaNamespace = "kube-system"

clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider(
defaultAntreaNamespace,
clusteridentity.DefaultClusterIdentityConfigMapName,
k8sClient,
)
var clusterUUID uuid.UUID
if err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
clusterIdentity, _, err := clusterIdentityProvider.Get()
if err != nil {
return false, nil
}
clusterUUID = clusterIdentity.UUID
return true, nil
}); err != nil {
klog.InfoS(
"Unable to retrieve cluster UUID; will generate a random observation domain ID", "timeout", timeout, "ConfigMapNameSpace", defaultAntreaNamespace, "ConfigMapName", clusteridentity.DefaultClusterIdentityConfigMapName,
)
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
klog.InfoS("Error when retrieving cluster UUID; will generate a random observation domain ID", "error", err)
clusterUUID = uuid.New()
}
h := fnv.New32()
Expand Down
7 changes: 5 additions & 2 deletions pkg/flowaggregator/exporter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ func buildS3Input(opt *options.Options) s3uploader.S3Input {
func NewS3Exporter(k8sClient kubernetes.Interface, opt *options.Options) (*S3Exporter, error) {
s3Input := buildS3Input(opt)
klog.InfoS("S3Uploader configuration", "bucketName", s3Input.Config.BucketName, "bucketPrefix", s3Input.Config.BucketPrefix, "region", s3Input.Config.Region, "recordFormat", s3Input.Config.RecordFormat, "compress", *s3Input.Config.Compress, "maxRecordsPerFile", s3Input.Config.MaxRecordsPerFile, "uploadInterval", s3Input.UploadInterval)
clusterUUID := getClusterUUID(k8sClient)
s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input, clusterUUID)
clusterUUID, err := getClusterUUID(k8sClient)
if err != nil {
return nil, err
}
s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input, clusterUUID.String())
if err != nil {
return nil, err
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/flowaggregator/exporter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
package exporter

import (
"fmt"
"time"

"github.com/google/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/clusteridentity"
)

func getClusterUUID(k8sClient kubernetes.Interface) string {
// getClusterUUID retrieves the cluster UUID (if available, with a timeout of 10s).
// Otherwise, it returns an empty cluster UUID and error. The cluster UUID should
// be available if Antrea is deployed to the cluster ahead of the flow aggregator,
// which is the expectation since when deploying flow aggregator as a Pod,
// networking needs to be configured by the CNI plugin.
func getClusterUUID(k8sClient kubernetes.Interface) (uuid.UUID, error) {
const retryInterval = time.Second
const timeout = 10 * time.Second
const defaultAntreaNamespace = "kube-system"
Expand All @@ -44,10 +49,7 @@ func getClusterUUID(k8sClient kubernetes.Interface) string {
clusterUUID = clusterIdentity.UUID
return true, nil
}); err != nil {
klog.InfoS(
"Unable to retrieve cluster UUID; will generate a random UUID", "timeout", timeout, "ConfigMapNameSpace", defaultAntreaNamespace, "ConfigMapName", clusteridentity.DefaultClusterIdentityConfigMapName,
)
clusterUUID = uuid.New()
return clusterUUID, fmt.Errorf("unable to retrieve cluster UUID, timeout: %v, ConfigMapNameSpace: %s, ConfigMapName: %s", timeout, defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName)
}
return clusterUUID.String()
return clusterUUID, nil
}
8 changes: 5 additions & 3 deletions pkg/flowaggregator/s3uploader/s3uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing"
"github.com/vmware/go-ipfix/pkg/registry"
Expand All @@ -35,13 +36,14 @@ import (
flowaggregatortesting "antrea.io/antrea/pkg/flowaggregator/testing"
)

const (
seed = 1
fakeClusterUUID = "7e2e1de2-c85f-476e-ab1a-fce1bf83ee2c"
var (
fakeClusterUUID = uuid.New().String()
recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,{\"antrea-e2e\":\"perftest-a\",\"app\":\"perftool\"},{\"antrea-e2e\":\"perftest-b\",\"app\":\"perftool\"},15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID
recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,{\"antrea-e2e\":\"perftest-a\",\"app\":\"perftool\"},{\"antrea-e2e\":\"perftest-b\",\"app\":\"perftool\"},15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID
)

const seed = 1

type mockS3Uploader struct {
testReader *bytes.Buffer
testReaderMutex sync.Mutex
Expand Down

0 comments on commit fb2f81b

Please sign in to comment.