From 2ce2ef0ae4d2951357bfbdc6ad613a994d4811d0 Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu <59460118+yuntanghsu@users.noreply.github.com> Date: Thu, 8 Sep 2022 21:54:23 -0700 Subject: [PATCH] Add unit tests for pkg/flowaggregator/exporter (#4195) For #4142 Signed-off-by: Yun-Tang Hsu --- .../clickhouseclient/clickhouseclient.go | 13 ++- .../clickhouseclient/clickhouseclient_test.go | 2 +- pkg/flowaggregator/exporter/clickhouse.go | 2 +- .../exporter/clickhouse_test.go | 79 +++++++++++++++++++ pkg/flowaggregator/exporter/s3_test.go | 71 +++++++++++++++++ 5 files changed, 162 insertions(+), 5 deletions(-) create mode 100644 pkg/flowaggregator/exporter/clickhouse_test.go create mode 100644 pkg/flowaggregator/exporter/s3_test.go diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index f6edeec48f1..dd9776da769 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -86,6 +86,13 @@ const ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) +// PrepareClickHouseConnection is used for unit testing +var ( + PrepareClickHouseConnection = func(input ClickHouseInput) (string, *sql.DB, error) { + return PrepareConnection(input) + } +) + type stopPayload struct { flushQueue bool } @@ -124,7 +131,7 @@ type ClickHouseInput struct { CommitInterval time.Duration } -func (ci *ClickHouseInput) getDataSourceName() (string, error) { +func (ci *ClickHouseInput) GetDataSourceName() (string, error) { if len(ci.DatabaseURL) == 0 || len(ci.Username) == 0 || len(ci.Password) == 0 { return "", fmt.Errorf("URL, Username or Password missing for clickhouse DSN") } @@ -150,7 +157,7 @@ func (ci *ClickHouseInput) getDataSourceName() (string, error) { } func NewClickHouseClient(input ClickHouseInput) (*ClickHouseExportProcess, error) { - dsn, connect, err := PrepareConnection(input) + dsn, connect, err := PrepareClickHouseConnection(input) if err != nil { return nil, err } @@ -370,7 +377,7 @@ func (ch *ClickHouseExportProcess) pushRecordsToFrontOfQueue(records []*flowreco } func PrepareConnection(input ClickHouseInput) (string, *sql.DB, error) { - dsn, err := input.getDataSourceName() + dsn, err := input.GetDataSourceName() if err != nil { return "", nil, fmt.Errorf("error when parsing ClickHouse DSN: %v", err) } diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index aeb0000d6e5..36cebfefd20 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -65,7 +65,7 @@ func TestGetDataSourceName(t *testing.T) { } for _, tc := range testcases { - dsn, err := tc.input.getDataSourceName() + dsn, err := tc.input.GetDataSourceName() if tc.expectedErr { assert.Errorf(t, err, "ClickHouseInput %v unexpectedly returns no error when getting DSN", tc.input) } diff --git a/pkg/flowaggregator/exporter/clickhouse.go b/pkg/flowaggregator/exporter/clickhouse.go index 9d456c835c5..80faa98c300 100644 --- a/pkg/flowaggregator/exporter/clickhouse.go +++ b/pkg/flowaggregator/exporter/clickhouse.go @@ -69,7 +69,7 @@ func (e *ClickHouseExporter) Stop() { func (e *ClickHouseExporter) UpdateOptions(opt *options.Options) { chInput := buildClickHouseInput(opt) - dsn, connect, err := clickhouseclient.PrepareConnection(chInput) + dsn, connect, err := clickhouseclient.PrepareClickHouseConnection(chInput) if err != nil { klog.ErrorS(err, "Error when checking new connection") return diff --git a/pkg/flowaggregator/exporter/clickhouse_test.go b/pkg/flowaggregator/exporter/clickhouse_test.go new file mode 100644 index 00000000000..f4530e1fdf2 --- /dev/null +++ b/pkg/flowaggregator/exporter/clickhouse_test.go @@ -0,0 +1,79 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "database/sql" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "antrea.io/antrea/pkg/config/flowaggregator" + "antrea.io/antrea/pkg/flowaggregator/clickhouseclient" + "antrea.io/antrea/pkg/flowaggregator/options" +) + +func TestClickHouse_UpdateOptions(t *testing.T) { + os.Setenv("CH_USERNAME", "default") + os.Setenv("CH_PASSWORD", "default") + defer os.Unsetenv("CH_USERNAME") + defer os.Unsetenv("CH_PASSWORD") + PrepareClickHouseConnectionSaved := clickhouseclient.PrepareClickHouseConnection + clickhouseclient.PrepareClickHouseConnection = func(input clickhouseclient.ClickHouseInput) (string, *sql.DB, error) { + dsn, _ := input.GetDataSourceName() + return dsn, nil, nil + } + defer func() { + clickhouseclient.PrepareClickHouseConnection = PrepareClickHouseConnectionSaved + }() + compress := false + opt := &options.Options{ + Config: &flowaggregator.FlowAggregatorConfig{ + ClickHouse: flowaggregator.ClickHouseConfig{ + Enable: true, + Database: "default", + DatabaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000", + Debug: true, + Compress: &compress, + }, + }, + ClickHouseCommitInterval: 8 * time.Second, + } + clickHouseExporter, err := NewClickHouseExporter(opt) + require.NoError(t, err) + clickHouseExporter.Start() + assert.Equal(t, clickHouseExporter.chExportProcess.GetDsn(), "tcp://clickhouse-clickhouse.flow-visibility.svc:9000?username=default&password=default&database=default&debug=true&compress=false") + assert.Equal(t, clickHouseExporter.chExportProcess.GetCommitInterval().String(), "8s") + compress = true + newOpt := &options.Options{ + Config: &flowaggregator.FlowAggregatorConfig{ + ClickHouse: flowaggregator.ClickHouseConfig{ + Enable: true, + Database: "databaseTest", + DatabaseURL: "databaseTestURL", + Debug: false, + Compress: &compress, + }, + }, + ClickHouseCommitInterval: 5 * time.Second, + } + clickHouseExporter.UpdateOptions(newOpt) + assert.Equal(t, clickHouseExporter.chExportProcess.GetDsn(), "databaseTestURL?username=default&password=default&database=databaseTest&debug=false&compress=true") + assert.Equal(t, clickHouseExporter.chExportProcess.GetCommitInterval().String(), "5s") + clickHouseExporter.Stop() +} diff --git a/pkg/flowaggregator/exporter/s3_test.go b/pkg/flowaggregator/exporter/s3_test.go new file mode 100644 index 00000000000..6f66b96ff63 --- /dev/null +++ b/pkg/flowaggregator/exporter/s3_test.go @@ -0,0 +1,71 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "antrea.io/antrea/pkg/config/flowaggregator" + "antrea.io/antrea/pkg/flowaggregator/options" +) + +func TestS3_UpdateOptions(t *testing.T) { + compress := true + opt := &options.Options{ + Config: &flowaggregator.FlowAggregatorConfig{ + S3Uploader: flowaggregator.S3UploaderConfig{ + BucketName: "defaultBucketName", + BucketPrefix: "defaultBucketPrefix", + Region: "us-west-2", + RecordFormat: "CSV", + Compress: &compress, + MaxRecordsPerFile: 0, + }, + }, + S3UploadInterval: 8 * time.Second, + } + s3Exporter, err := NewS3Exporter(opt) + require.NoError(t, err) + s3Exporter.Start() + assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketName(), "defaultBucketName") + assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketPrefix(), "defaultBucketPrefix") + assert.Equal(t, s3Exporter.s3UploadProcess.GetRegion(), "us-west-2") + assert.Equal(t, s3Exporter.s3UploadProcess.GetUploadInterval().String(), "8s") + + compress = true + newOpt := &options.Options{ + Config: &flowaggregator.FlowAggregatorConfig{ + S3Uploader: flowaggregator.S3UploaderConfig{ + BucketName: "testBucketName", + BucketPrefix: "testBucketPrefix", + Region: "us-west-1", + RecordFormat: "CSV", + Compress: &compress, + MaxRecordsPerFile: 0, + }, + }, + S3UploadInterval: 5 * time.Second, + } + s3Exporter.UpdateOptions(newOpt) + assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketName(), "testBucketName") + assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketPrefix(), "testBucketPrefix") + assert.Equal(t, s3Exporter.s3UploadProcess.GetRegion(), "us-west-1") + assert.Equal(t, s3Exporter.s3UploadProcess.GetUploadInterval().String(), "5s") + s3Exporter.Stop() +}