Skip to content

Commit

Permalink
Merge branch 'main' into cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Oct 23, 2024
2 parents a89656a + 7af2321 commit b274697
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 27 deletions.
2 changes: 1 addition & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
if peerdbenv.PeerDBTemporalEnableCertAuth() {
slog.Info("Using temporal certificate/key for authentication")

certs, err := parseTemporalCertAndKey()
certs, err := parseTemporalCertAndKey(ctx)
if err != nil {
return fmt.Errorf("unable to base64 decode certificate and key: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions flow/cmd/cert.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package cmd

import (
"context"
"crypto/tls"
"fmt"

"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func parseTemporalCertAndKey() ([]tls.Certificate, error) {
certBytes, err := peerdbenv.PeerDBTemporalClientCert()
func parseTemporalCertAndKey(ctx context.Context) ([]tls.Certificate, error) {
certBytes, err := peerdbenv.PeerDBTemporalClientCert(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get temporal certificate: %w", err)
}

keyBytes, err := peerdbenv.PeerDBTemporalClientKey()
keyBytes, err := peerdbenv.PeerDBTemporalClientKey(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get temporal key: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work

if peerdbenv.PeerDBTemporalEnableCertAuth() {
slog.Info("Using temporal certificate/key for authentication")
certs, err := parseTemporalCertAndKey()
certs, err := parseTemporalCertAndKey(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {

if peerdbenv.PeerDBTemporalEnableCertAuth() {
slog.Info("Using temporal certificate/key for authentication")
certs, err := parseTemporalCertAndKey()
certs, err := parseTemporalCertAndKey(context.Background())
if err != nil {
return nil, fmt.Errorf("unable to process certificate and key: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type PostgresSchemaDeltaTestSuite struct {
func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), nil, peerdbenv.GetCatalogPostgresConfigFromEnv())
connector, err := NewPostgresConnector(context.Background(), nil, peerdbenv.GetCatalogPostgresConfigFromEnv(context.Background()))
require.NoError(t, err)

setupTx, err := connector.conn.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) {
query := "SELECT * FROM bench.large_table"

ctx := context.Background()
connector, err := NewPostgresConnector(ctx, nil, peerdbenv.GetCatalogPostgresConfigFromEnv())
connector, err := NewPostgresConnector(ctx, nil, peerdbenv.GetCatalogPostgresConfigFromEnv(ctx))
if err != nil {
b.Fatalf("failed to create connection: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int
}

func TestGetQRepPartitions(t *testing.T) {
connStr := peerdbenv.GetCatalogConnectionStringFromEnv()
connStr := peerdbenv.GetCatalogConnectionStringFromEnv(context.Background())

// Setup the DB
config, err := pgx.ParseConfig(connStr)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func setupDB(t *testing.T) (*PostgresConnector, string) {
t.Helper()

connector, err := NewPostgresConnector(context.Background(),
nil, peerdbenv.GetCatalogPostgresConfigFromEnv())
nil, peerdbenv.GetCatalogPostgresConfigFromEnv(context.Background()))
if err != nil {
t.Fatalf("unable to create connector: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func SetupPostgres(t *testing.T, suffix string) (*connpostgres.PostgresConnector
t.Helper()

connector, err := connpostgres.NewPostgresConnector(context.Background(),
nil, peerdbenv.GetCatalogPostgresConfigFromEnv())
nil, peerdbenv.GetCatalogPostgresConfigFromEnv(context.Background()))
if err != nil {
return nil, fmt.Errorf("failed to create postgres connection: %w", err)
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func GeneratePostgresPeer(t *testing.T) *protos.Peer {
Name: "catalog",
Type: protos.DBType_POSTGRES,
Config: &protos.Peer_PostgresConfig{
PostgresConfig: peerdbenv.GetCatalogPostgresConfigFromEnv(),
PostgresConfig: peerdbenv.GetCatalogPostgresConfigFromEnv(context.Background()),
},
}
CreatePeer(t, peer)
Expand Down
10 changes: 5 additions & 5 deletions flow/peerdbenv/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func GetCatalogConnectionPoolFromEnv(ctx context.Context) (*pgxpool.Pool, error)
poolMutex.Lock()
defer poolMutex.Unlock()
if pool == nil {
catalogConnectionString := GetCatalogConnectionStringFromEnv()
catalogConnectionString := GetCatalogConnectionStringFromEnv(ctx)
pool, err = pgxpool.New(ctx, catalogConnectionString)
if err != nil {
return nil, fmt.Errorf("unable to establish connection with catalog: %w", err)
Expand All @@ -37,16 +37,16 @@ func GetCatalogConnectionPoolFromEnv(ctx context.Context) (*pgxpool.Pool, error)
return pool, nil
}

func GetCatalogConnectionStringFromEnv() string {
return shared.GetPGConnectionString(GetCatalogPostgresConfigFromEnv(), "")
func GetCatalogConnectionStringFromEnv(ctx context.Context) string {
return shared.GetPGConnectionString(GetCatalogPostgresConfigFromEnv(ctx), "")
}

func GetCatalogPostgresConfigFromEnv() *protos.PostgresConfig {
func GetCatalogPostgresConfigFromEnv(ctx context.Context) *protos.PostgresConfig {
return &protos.PostgresConfig{
Host: PeerDBCatalogHost(),
Port: uint32(PeerDBCatalogPort()),
User: PeerDBCatalogUser(),
Password: PeerDBCatalogPassword(),
Password: PeerDBCatalogPassword(ctx),
Database: PeerDBCatalogDatabase(),
}
}
13 changes: 7 additions & 6 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package peerdbenv

import (
"context"
"fmt"
"log/slog"
"strings"
Expand Down Expand Up @@ -64,8 +65,8 @@ func PeerDBCatalogUser() string {
}

// PEERDB_CATALOG_PASSWORD
func PeerDBCatalogPassword() string {
val, err := GetKMSDecryptedEnvString("PEERDB_CATALOG_PASSWORD", "")
func PeerDBCatalogPassword(ctx context.Context) string {
val, err := GetKMSDecryptedEnvString(ctx, "PEERDB_CATALOG_PASSWORD", "")
if err != nil {
slog.Error("failed to decrypt PEERDB_CATALOG_PASSWORD", "error", err)
panic(err)
Expand Down Expand Up @@ -128,12 +129,12 @@ func PeerDBTemporalEnableCertAuth() bool {
return strings.TrimSpace(cert) != ""
}

func PeerDBTemporalClientCert() ([]byte, error) {
return GetEnvBase64EncodedBytes("TEMPORAL_CLIENT_CERT", nil)
func PeerDBTemporalClientCert(ctx context.Context) ([]byte, error) {
return GetEnvBase64EncodedBytes(ctx, "TEMPORAL_CLIENT_CERT", nil)
}

func PeerDBTemporalClientKey() ([]byte, error) {
return GetEnvBase64EncodedBytes("TEMPORAL_CLIENT_KEY", nil)
func PeerDBTemporalClientKey(ctx context.Context) ([]byte, error) {
return GetEnvBase64EncodedBytes(ctx, "TEMPORAL_CLIENT_KEY", nil)
}

func PeerDBGetIncidentIoUrl() string {
Expand Down
8 changes: 4 additions & 4 deletions flow/peerdbenv/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func decryptWithKMS(ctx context.Context, data []byte) ([]byte, error) {
return decrypted.Plaintext, nil
}

func GetEnvBase64EncodedBytes(name string, defaultValue []byte) ([]byte, error) {
func GetEnvBase64EncodedBytes(ctx context.Context, name string, defaultValue []byte) ([]byte, error) {
val, ok := os.LookupEnv(name)
if !ok {
return defaultValue, nil
Expand All @@ -113,10 +113,10 @@ func GetEnvBase64EncodedBytes(name string, defaultValue []byte) ([]byte, error)
return nil, fmt.Errorf("failed to decode base64 value for %s: %w", name, err)
}

return decryptWithKMS(context.Background(), decoded)
return decryptWithKMS(ctx, decoded)
}

func GetKMSDecryptedEnvString(name string, defaultValue string) (string, error) {
func GetKMSDecryptedEnvString(ctx context.Context, name string, defaultValue string) (string, error) {
val, ok := os.LookupEnv(name)
if !ok {
return defaultValue, nil
Expand All @@ -127,7 +127,7 @@ func GetKMSDecryptedEnvString(name string, defaultValue string) (string, error)
return val, nil
}

ret, err := GetEnvBase64EncodedBytes(name, []byte(defaultValue))
ret, err := GetEnvBase64EncodedBytes(ctx, name, []byte(defaultValue))
if err != nil {
return defaultValue, fmt.Errorf("failed to get base64 encoded bytes for %s: %w", name, err)
}
Expand Down

0 comments on commit b274697

Please sign in to comment.