Skip to content

Commit

Permalink
Use helpers package for ydb in e2e tests
Browse files Browse the repository at this point in the history
Use helpers package instead of creating ydb driver for each test
commit_hash:dfd3043d4ea28b6fd6fcb2bdd0733a19b06153d6
  • Loading branch information
DenisEvd committed Dec 6, 2024
1 parent 978ac97 commit 3b929e1
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 212 deletions.
3 changes: 1 addition & 2 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,6 @@
"pkg/providers/ydb/cdc_converter_test.go":"transfer_manager/go/pkg/providers/ydb/cdc_converter_test.go",
"pkg/providers/ydb/cdc_event.go":"transfer_manager/go/pkg/providers/ydb/cdc_event.go",
"pkg/providers/ydb/client.go":"transfer_manager/go/pkg/providers/ydb/client.go",
"pkg/providers/ydb/client2.go":"transfer_manager/go/pkg/providers/ydb/client2.go",
"pkg/providers/ydb/decimal/parse.go":"transfer_manager/go/pkg/providers/ydb/decimal/parse.go",
"pkg/providers/ydb/fallback_date_and_datetime_as_timestamp.go":"transfer_manager/go/pkg/providers/ydb/fallback_date_and_datetime_as_timestamp.go",
"pkg/providers/ydb/gotest/canondata/result.json":"transfer_manager/go/pkg/providers/ydb/gotest/canondata/result.json",
Expand Down Expand Up @@ -2909,7 +2908,7 @@
"tests/helpers/utils.go":"transfer_manager/go/tests/helpers/utils.go",
"tests/helpers/utils/test_read_closer.go":"transfer_manager/go/tests/helpers/utils/test_read_closer.go",
"tests/helpers/ydb.go":"transfer_manager/go/tests/helpers/ydb.go",
"tests/helpers/ydb_topics/recipe.go":"transfer_manager/go/tests/helpers/ydb_topics/recipe.go",
"tests/helpers/ydb_recipe/recipe.go":"transfer_manager/go/tests/helpers/ydb_recipe/recipe.go",
"tests/helpers/yt/yt_helpers.go":"transfer_manager/go/tests/helpers/yt/yt_helpers.go",
"tests/large/docker-compose/README.md":"transfer_manager/go/tests/large/docker-compose/README.md",
"tests/large/docker-compose/canondata/docker-compose.docker-compose.TestElasticToPgSnapshot/extracted":"transfer_manager/go/tests/large/docker-compose/canondata/docker-compose.docker-compose.TestElasticToPgSnapshot/extracted",
Expand Down
58 changes: 52 additions & 6 deletions pkg/providers/ydb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@ import (

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/credentials"
"github.com/doublecloud/transfer/pkg/providers/ydb/logadapter"
ydb3 "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
"github.com/ydb-platform/ydb-go-sdk/v3"
ydbcreds "github.com/ydb-platform/ydb-go-sdk/v3/credentials"
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
"github.com/ydb-platform/ydb-go-yc-metadata"
)

func NewYDBDriver(ctx context.Context, database, instance string, credentials credentials.Credentials, tlsConfig *tls.Config) (*ydb3.Driver, error) {
func newYDBDriver(ctx context.Context, database, instance string, credentials ydbcreds.Credentials, tlsConfig *tls.Config) (*ydb.Driver, error) {
secure := tlsConfig != nil
driver, err := ydb3.Open(
driver, err := ydb.Open(
ctx,
sugar.DSN(instance, database, sugar.WithSecure(secure)),
ydb3.WithCredentials(credentials),
ydb3.WithTLSConfig(tlsConfig),
ydb.WithCredentials(credentials),
ydb.WithTLSConfig(tlsConfig),
logadapter.WithTraces(logger.Log, trace.DriverEvents),
)
if err != nil {
Expand All @@ -28,3 +31,46 @@ func NewYDBDriver(ctx context.Context, database, instance string, credentials cr

return driver, nil
}

func newYDBSourceDriver(ctx context.Context, cfg *YdbSource) (*ydb.Driver, error) {
dsn, opts, err := ydbSourceToCreds(cfg)
if err != nil {
return nil, xerrors.Errorf("unable to resolve creds: %w", err)
}

if cfg.VerboseSDKLogs {
opts = append(opts, logadapter.WithTraces(logger.Log, trace.DetailsAll))
}

db, err := ydb.Open(ctx, dsn, opts...)
if err != nil {
return nil, xerrors.Errorf("unable to create ydb client, err: %w", err)
}

return db, nil
}

func ydbSourceToCreds(cfg *YdbSource) (string, []ydb.Option, error) {
if cfg.SAKeyContent != "" {
return "", nil, abstract.NewFatalError(xerrors.New("SAKeyContent is not supported for now"))
}

isSecure := false
opts := make([]ydb.Option, 0)

if cfg.TLSEnabled {
isSecure = true
opts = append(opts, yc.WithInternalCA())
}
if cfg.ServiceAccountID != "" {
creds, err := credentials.NewServiceAccountCreds(logger.Log, cfg.ServiceAccountID)
if err != nil {
return "", nil, xerrors.Errorf("could not create service account YDB credentials: %w", err)
}
opts = append(opts, ydb.WithCredentials(creds))
} else {
opts = append(opts, ydb.WithAccessTokenCredentials(string(cfg.Token)))
}

return sugar.DSN(cfg.Instance, cfg.Database, sugar.WithSecure(isSecure)), opts, nil
}
72 changes: 0 additions & 72 deletions pkg/providers/ydb/client2.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/providers/ydb/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ func NewSinker(lgr log.Logger, cfg *YdbDestination, mtrcs metrics.Registry) (abs
return nil, xerrors.Errorf("Cannot create YDB credentials: %w", err)
}

ydbDriver, err := NewYDBDriver(ctx, cfg.Database, cfg.Instance, creds, tlsConfig)
ydbDriver, err := newYDBDriver(ctx, cfg.Database, cfg.Instance, creds, tlsConfig)
if err != nil {
return nil, xerrors.Errorf("unable to init ydb driver: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/ydb/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func NewSource(transferID string, cfg *YdbSource, logger log.Logger, _ metrics.R
defer rb.Do()
rb.Add(cancelFunc)

ydbClient, err := newClient2(clientCtx, cfg)
ydbClient, err := newYDBSourceDriver(clientCtx, cfg)
if err != nil {
return nil, xerrors.Errorf("unable to create ydb, err: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/providers/ydb/source_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func CreateChangeFeed(cfg *YdbSource, transferID string) error {
clientCtx, cancel := context.WithTimeout(context.Background(), time.Minute*3)
defer cancel()

ydbClient, err := newClient2(clientCtx, cfg)
ydbClient, err := newYDBSourceDriver(clientCtx, cfg)
if err != nil {
return xerrors.Errorf("unable to create ydb, err: %w", err)
}
Expand All @@ -131,7 +131,7 @@ func CreateChangeFeedIfNotExists(cfg *YdbSource, transferID string) error {
clientCtx, cancel := context.WithTimeout(context.Background(), time.Minute*3)
defer cancel()

ydbClient, err := newClient2(clientCtx, cfg)
ydbClient, err := newYDBSourceDriver(clientCtx, cfg)
if err != nil {
return xerrors.Errorf("unable to create ydb, err: %w", err)
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func DropChangeFeed(cfg *YdbSource, transferID string) error {
clientCtx, cancel := context.WithTimeout(context.Background(), time.Minute*3)
defer cancel()

ydbClient, err := newClient2(clientCtx, cfg)
ydbClient, err := newYDBSourceDriver(clientCtx, cfg)
if err != nil {
return xerrors.Errorf("unable to create ydb, err: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/providers/ydb/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/doublecloud/transfer/library/go/test/canon"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
recipe "github.com/doublecloud/transfer/tests/helpers/ydb_topics"
"github.com/doublecloud/transfer/tests/helpers/ydb_recipe"
"github.com/stretchr/testify/require"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
Expand All @@ -41,7 +41,7 @@ func (s asyncSinkMock) Close() error {
}

func TestSourceCDC(t *testing.T) {
db := recipe.Driver(t)
db := ydbrecipe.Driver(t)
transferID := "test_transfer"

srcCfgTemplate := YdbSource{
Expand Down Expand Up @@ -313,7 +313,7 @@ func waitExpectedEvents(t *testing.T, src *Source, expectedItemsCount int) []abs
}

func prepareTableAndFeed(t *testing.T, feedName, tableName string, differentKeysCount, updatesPerKey int) int {
db := recipe.Driver(t)
db := ydbrecipe.Driver(t)
tablePath := formTablePath(tableName)
createTableAndFeed(t, db, feedName, tablePath,
options.WithColumn("id", types.Optional(types.TypeUint64)),
Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/ydb/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewStorage(cfg *YdbStorageParams) (*Storage, error) {
return nil, xerrors.Errorf("Cannot create YDB credentials: %w", err)
}

ydbDriver, err := NewYDBDriver(clientCtx, cfg.Database, cfg.Instance, ydbCreds, tlsConfig)
ydbDriver, err := newYDBDriver(clientCtx, cfg.Database, cfg.Instance, ydbCreds, tlsConfig)
if err != nil {
return nil, xerrors.Errorf("Cannot create YDB driver: %w", err)
}
Expand Down
43 changes: 2 additions & 41 deletions tests/e2e/ydb2ch/replication/add_column/add_column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,25 @@ package addcolumn

import (
"context"
"crypto/tls"
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
dp_model "github.com/doublecloud/transfer/pkg/abstract/model"
"github.com/doublecloud/transfer/pkg/providers/clickhouse/model"
"github.com/doublecloud/transfer/pkg/providers/ydb"
"github.com/doublecloud/transfer/pkg/xtls"
"github.com/doublecloud/transfer/tests/helpers"
ydbrecipe "github.com/doublecloud/transfer/tests/helpers/ydb_recipe"
"github.com/stretchr/testify/require"
ydb3 "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"go.ytsaurus.tech/library/go/core/log"
)

func NewYDBConnection(cfg *ydb.YdbSource) (*ydb3.Driver, error) {
var err error
var tlsConfig *tls.Config
if cfg.TLSEnabled {
tlsConfig, err = xtls.FromPath(cfg.RootCAFiles)
if err != nil {
return nil, xerrors.Errorf("could not create TLS config: %w", err)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var creds credentials.Credentials
creds, err = ydb.ResolveCredentials(
cfg.UserdataAuth,
string(cfg.Token),
ydb.JWTAuthParams{
KeyContent: cfg.SAKeyContent,
TokenServiceURL: cfg.TokenServiceURL,
},
cfg.ServiceAccountID,
logger.Log,
)
if err != nil {
return nil, xerrors.Errorf("Cannot create YDB credentials: %w", err)
}

ydbDriver, err := ydb.NewYDBDriver(ctx, cfg.Database, cfg.Instance, creds, tlsConfig)
if err != nil {
return nil, xerrors.Errorf("unable to init ydb driver: %w", err)
}

return ydbDriver, nil
}

func execDDL(t *testing.T, ydbConn *ydb3.Driver, query string) {
err := ydbConn.Table().Do(context.Background(), func(ctx context.Context, session table.Session) (err error) {
return session.ExecuteSchemeQuery(ctx, query)
Expand Down Expand Up @@ -116,8 +78,7 @@ func TestAddColumnOnReplication(t *testing.T) {
transferType := abstract.TransferTypeIncrementOnly
helpers.InitSrcDst(helpers.TransferID, source, &target, transferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable

ydbConn, err := NewYDBConnection(source)
require.NoError(t, err)
ydbConn := ydbrecipe.Driver(t)

// defer port checking
defer func() {
Expand Down
44 changes: 2 additions & 42 deletions tests/e2e/ydb2ch/snapshot_and_replication/check_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,24 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"os"
"testing"
"time"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/metrics/solomon"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
dp_model "github.com/doublecloud/transfer/pkg/abstract/model"
"github.com/doublecloud/transfer/pkg/providers/clickhouse/model"
"github.com/doublecloud/transfer/pkg/providers/ydb"
"github.com/doublecloud/transfer/pkg/xtls"
"github.com/doublecloud/transfer/tests/helpers"
"github.com/doublecloud/transfer/tests/helpers/ydb_recipe"
"github.com/stretchr/testify/require"
ydb3 "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"go.ytsaurus.tech/yt/go/schema"
)

func NewYDBConnection(cfg *ydb.YdbDestination) (*ydb3.Driver, error) {
var err error
var tlsConfig *tls.Config
if cfg.TLSEnabled {
tlsConfig, err = xtls.FromPath(cfg.RootCAFiles)
if err != nil {
return nil, xerrors.Errorf("could not create TLS config: %w", err)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var creds credentials.Credentials
creds, err = ydb.ResolveCredentials(
cfg.UserdataAuth,
string(cfg.Token),
ydb.JWTAuthParams{
KeyContent: cfg.SAKeyContent,
TokenServiceURL: cfg.TokenServiceURL,
},
cfg.ServiceAccountID,
logger.Log,
)
if err != nil {
return nil, xerrors.Errorf("cannot create YDB credentials: %w", err)
}

ydbDriver, err := ydb.NewYDBDriver(ctx, cfg.Database, cfg.Instance, creds, tlsConfig)
if err != nil {
return nil, xerrors.Errorf("unable to init ydb driver: %w", err)
}

return ydbDriver, nil
}

func customYDBInsertItem(t *testing.T, tablePath string, id int) *abstract.ChangeItem {
res := helpers.YDBStmtInsert(t, tablePath, id)
res.TableSchema = abstract.NewTableSchema(append(res.TableSchema.Columns(),
Expand Down Expand Up @@ -158,8 +119,7 @@ func testSnapshotAndReplicationWithChangeFeedMode(t *testing.T, tableName string
}))

if mode == ydb.ChangeFeedModeNewImage || mode == ydb.ChangeFeedModeNewAndOldImages {
ydbConn, err := NewYDBConnection(Target)
require.NoError(t, err)
ydbConn := ydbrecipe.Driver(t)
err = ydbConn.Table().Do(context.Background(), func(ctx context.Context, session table.Session) (err error) {
return session.ExecuteSchemeQuery(ctx, fmt.Sprintf(`
--!syntax_v1
Expand Down
Loading

0 comments on commit 3b929e1

Please sign in to comment.