Skip to content

Commit

Permalink
Fix CLI flag naming (#2070)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Jan 16, 2025
1 parent 397af3a commit 2515cf5
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 72 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ the [Processors documentation](https://conduit.io/docs/using/processors/getting-
Conduit exposes a gRPC API and an HTTP API.

The gRPC API is by default running on port 8084. You can define a custom address
using the CLI flag `-grpc.address`. To learn more about the gRPC API please have
using the CLI flag `-api.grpc.address`. To learn more about the gRPC API please have
a look at
the [protobuf file](https://github.com/ConduitIO/conduit/blob/main/proto/api/v1/api.proto).

The HTTP API is by default running on port 8080. You can define a custom address
using the CLI flag `-http.address`. It is generated
using the CLI flag `-api.http.address`. It is generated
using [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway) and is thus
providing the same functionality as the gRPC API. To learn more about the HTTP
API please have a look at the [API documentation](https://www.conduit.io/api),
Expand Down
2 changes: 1 addition & 1 deletion cmd/conduit/cecdysis/decorators.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func getGRPCAddress(cmd *cobra.Command) (string, error) {

path, err = cmd.Flags().GetString("config.path")
if err != nil || path == "" {
path = conduit.DefaultConfig().ConduitCfgPath
path = conduit.DefaultConfig().ConduitCfg.Path
}

var usrCfg conduit.Config
Expand Down
14 changes: 9 additions & 5 deletions cmd/conduit/root/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ the set environment variables, and the flags used. This command will show the co
}
}

func printStruct(v reflect.Value, parentPath string) {
func printStruct(ctx context.Context, v reflect.Value, parentPath string) {
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
Expand All @@ -68,22 +68,26 @@ func printStruct(v reflect.Value, parentPath string) {

if fieldValue.Kind() == reflect.Struct ||
(fieldValue.Kind() == reflect.Ptr && !fieldValue.IsNil() && fieldValue.Elem().Kind() == reflect.Struct) {
printStruct(fieldValue, fullPath)
printStruct(ctx, fieldValue, fullPath)
continue
}

if longName != "" {
value := fmt.Sprintf("%v", fieldValue.Interface())
if value != "" {
fmt.Printf("%s: %s\n", fullPath, value)
cobraCmd := ecdysis.CobraCmdFromContext(ctx)
_, err := fmt.Fprintf(cobraCmd.OutOrStdout(), "%s: %s\n", fullPath, value)
if err != nil {
fmt.Printf("failed writing config value to out: %v", err)
}
}
}
}
}

func (c *ConfigCommand) Usage() string { return "config" }

func (c ConfigCommand) Execute(_ context.Context) error {
printStruct(reflect.ValueOf(c.RunCmd.Cfg), "")
func (c ConfigCommand) Execute(ctx context.Context) error {
printStruct(ctx, reflect.ValueOf(c.RunCmd.Cfg), "")
return nil
}
178 changes: 138 additions & 40 deletions cmd/conduit/root/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,157 @@ import (
"bytes"
"io"
"os"
"reflect"
"slices"
"strings"
"testing"

"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/cmd/conduit/root/run"
"github.com/conduitio/ecdysis"
"github.com/matryer/is"
)

func TestPrintStructOutput(t *testing.T) {
is := is.New(t)

cfg := conduit.DefaultConfig()

oldStdout := os.Stdout
defer func() { os.Stdout = oldStdout }()
func TestConfig_WithFlags(t *testing.T) {
testCases := []struct {
name string
args []string
wantLines []string
}{
{
name: "with flags (api, config, connectors, db, log)",
args: []string{
"--api.enabled=false",
"--api.grpc.address", "localhost:9999",
"--api.http.address", "localhost:8888",
"--config.path", "/etc/conduit/config.yaml",
"--connectors.path", "/opt/conduit/connectors",
"--db.badger.path", "/var/lib/conduit/data.db",
"--db.postgres.connection-string", "postgres://user:pass@localhost:5432/conduit",
"--db.postgres.table", "my_conduit_store",
"--db.sqlite.path", "/var/lib/conduit/conduit.sqlite",
"--db.sqlite.table", "my_sqlite_store",
"--db.type", "postgres",
"--log.format", "json",
"--log.level", "debug",
},
wantLines: []string{
"config.path: /etc/conduit/config.yaml",
"db.type: postgres",
"db.postgres.table: my_conduit_store",
"db.sqlite.table: my_sqlite_store",
"api.enabled: false",
"api.http.address: localhost:8888",
"api.grpc.address: localhost:9999",
"log.level: debug",
"log.format: json",
"pipelines.exit-on-degraded: false",
"pipelines.error-recovery.min-delay: 1s",
"pipelines.error-recovery.max-delay: 10m0s",
"pipelines.error-recovery.backoff-factor: 2",
"pipelines.error-recovery.max-retries: -1",
"pipelines.error-recovery.max-retries-window: 5m0s",
"schema-registry.type: builtin",
"preview.pipeline-arch-v2: false",
},
},
{
name: "with flags (pipelines, preview, processors, schema, dev)",
args: []string{
"--pipelines.error-recovery.backoff-factor", "5",
"--pipelines.error-recovery.max-delay", "30m",
"--pipelines.error-recovery.max-retries", "10",
"--pipelines.error-recovery.max-retries-window", "15m",
"--pipelines.error-recovery.min-delay", "5s",
"--pipelines.exit-on-degraded=true",
"--pipelines.path", "/var/lib/conduit/pipelines",
"--preview.pipeline-arch-v2=true",
"--processors.path", "/opt/conduit/processors",
"--schema-registry.confluent.connection-string", "http://localhost:8081",
"--schema-registry.type", "confluent",
"--dev.blockprofile", "/tmp/block.prof",
"--dev.cpuprofile", "/tmp/cpu.prof",
"--dev.memprofile", "/tmp/mem.prof",
},
wantLines: []string{
"db.type: badger",
"db.postgres.table: conduit_kv_store",
"db.sqlite.table: conduit_kv_store",
"api.enabled: true",
"api.http.address: :8080",
"api.grpc.address: :8084",
"log.level: info",
"log.format: cli",
"processors.path: /opt/conduit/processors",
"pipelines.path: /var/lib/conduit/pipelines",
"pipelines.exit-on-degraded: true",
"pipelines.error-recovery.min-delay: 5s",
"pipelines.error-recovery.max-delay: 30m0s",
"pipelines.error-recovery.backoff-factor: 5",
"pipelines.error-recovery.max-retries: 10",
"pipelines.error-recovery.max-retries-window: 15m0s",
"schema-registry.type: confluent",
"schema-registry.confluent.connection-string: http://localhost:8081",
"preview.pipeline-arch-v2: false",
"dev.cpuprofile: /tmp/cpu.prof",
"dev.memprofile: /tmp/mem.prof",
"dev.blockprofile: /tmp/block.prof",
},
},
{
name: "default values (no flags)",
args: []string{},
wantLines: []string{
"db.type: badger",
"db.postgres.table: conduit_kv_store",
"db.sqlite.table: conduit_kv_store",
"api.enabled: true",
"api.http.address: :8080",
"api.grpc.address: :8084",
"log.level: info",
"log.format: cli",
"pipelines.exit-on-degraded: false",
"pipelines.error-recovery.min-delay: 1s",
"pipelines.error-recovery.max-delay: 10m0s",
"pipelines.error-recovery.backoff-factor: 2",
"pipelines.error-recovery.max-retries: -1",
"pipelines.error-recovery.max-retries-window: 5m0s",
"schema-registry.type: builtin",
"preview.pipeline-arch-v2: false",
},
},
}

r, w, err := os.Pipe()
is.NoErr(err)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)

os.Stdout = w
t.Cleanup(func() { os.Stdout = oldStdout })
readFrom, writeTo, err := os.Pipe()
is.NoErr(err)

printStruct(reflect.ValueOf(cfg), "")
e := ecdysis.New()
cmd := e.MustBuildCobraCommand(&ConfigCommand{RunCmd: &run.RunCommand{}})
cmd.SetArgs(tc.args)
cmd.SetOut(writeTo)
cmd.SetErr(writeTo)

err = w.Close()
is.NoErr(err)
err = cmd.Execute()
is.NoErr(err)

var buf bytes.Buffer
_, err = io.Copy(&buf, r)
is.NoErr(err)
err = writeTo.Close()
is.NoErr(err)

output := buf.String()
var buf bytes.Buffer
_, err = io.Copy(&buf, readFrom)
is.NoErr(err)

expectedLines := []string{
"db.type: badger",
"db.postgres.table: conduit_kv_store",
"db.sqlite.table: conduit_kv_store",
"api.enabled: true",
"api.http.address: :8080",
"api.grpc.address: :8084",
"log.level: info",
"log.format: cli",
"pipelines.exit-on-degraded: false",
"pipelines.error-recovery.min-delay: 1s",
"pipelines.error-recovery.max-delay: 10m0s",
"pipelines.error-recovery.backoff-factor: 2",
"pipelines.error-recovery.max-retries: -1",
"pipelines.error-recovery.max-retries-window: 5m0s",
"schema-registry.type: builtin",
"preview.pipeline-arch-v2: false",
}
output := buf.String()
is.True(output != "")

for _, line := range expectedLines {
is.True(strings.Contains(output, line))
outputLines := strings.Split(output, "\n")
for _, line := range tc.wantLines {
if !slices.Contains(outputLines, line) {
t.Errorf("output does not contain expected line: %q", line)
}
}
})
}
}
2 changes: 1 addition & 1 deletion cmd/conduit/root/initialize/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type InitCommand struct {

func (c *InitCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)
flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfgPath))
flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfg.Path))
return flags
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/conduit/root/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func (c *RunCommand) Execute(_ context.Context) error {
}

func (c *RunCommand) Config() ecdysis.Config {
path := filepath.Dir(c.flags.ConduitCfgPath)
path := filepath.Dir(c.flags.ConduitCfg.Path)

return ecdysis.Config{
EnvPrefix: "CONDUIT",
Parsed: &c.Cfg,
Path: c.flags.ConduitCfgPath,
Path: c.flags.ConduitCfg.Path,
DefaultValues: conduit.DefaultConfigWithBasePath(path),
}
}
Expand All @@ -75,7 +75,7 @@ func (c *RunCommand) Flags() []ecdysis.Flag {
}

c.Cfg = conduit.DefaultConfigWithBasePath(currentPath)
flags.SetDefault("config.path", c.Cfg.ConduitCfgPath)
flags.SetDefault("config.path", c.Cfg.ConduitCfg.Path)
flags.SetDefault("db.type", c.Cfg.DB.Type)
flags.SetDefault("db.badger.path", c.Cfg.DB.Badger.Path)
flags.SetDefault("db.postgres.connection-string", c.Cfg.DB.Postgres.ConnectionString)
Expand Down
21 changes: 12 additions & 9 deletions cmd/conduit/root/run/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestRunCommandFlags(t *testing.T) {
is := is.New(t)

expectedFlags := []struct {
wantFlags := []struct {
longName string
shortName string
usage string
Expand Down Expand Up @@ -66,17 +66,20 @@ func TestRunCommandFlags(t *testing.T) {
persistentFlags := c.PersistentFlags()
cmdFlags := c.Flags()

for _, f := range expectedFlags {
for _, wantFlag := range wantFlags {
var cf *pflag.Flag

if f.persistent {
cf = persistentFlags.Lookup(f.longName)
if wantFlag.persistent {
cf = persistentFlags.Lookup(wantFlag.longName)
} else {
cf = cmdFlags.Lookup(f.longName)
cf = cmdFlags.Lookup(wantFlag.longName)
}
is.True(cf != nil)
is.Equal(f.longName, cf.Name)
is.Equal(f.shortName, cf.Shorthand)
is.Equal(cf.Usage, f.usage)
if cf == nil {
t.Logf("flag %q expected, but not found", wantFlag.longName)
t.FailNow()
}
is.Equal(wantFlag.longName, cf.Name)
is.Equal(wantFlag.shortName, cf.Shorthand)
is.Equal(cf.Usage, wantFlag.usage)
}
}
24 changes: 13 additions & 11 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ const (

// Config holds all configurable values for Conduit.
type Config struct {
ConduitCfgPath string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"`
ConduitCfg struct {
Path string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"`
} `mapstructure:"config"`

DB struct {
// When Driver is specified it takes precedence over other DB related
Expand Down Expand Up @@ -90,19 +92,19 @@ type Config struct {

Pipelines struct {
Path string `long:"pipelines.path" usage:"path to pipelines' directory"`
ExitOnDegraded bool `long:"pipelines.exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"`
ExitOnDegraded bool `long:"pipelines.exit-on-degraded" mapstructure:"exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"`
ErrorRecovery struct {
// MinDelay is the minimum delay before restart: Default: 1 second
MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" usage:"minimum delay before restart"`
MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" mapstructure:"min-delay" usage:"minimum delay before restart"`
// MaxDelay is the maximum delay before restart: Default: 10 minutes
MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" usage:"maximum delay before restart"`
MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" mapstructure:"max-delay" usage:"maximum delay before restart"`
// BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2
BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" usage:"backoff factor applied to the last delay"`
BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" mapstructure:"backoff-factor" usage:"backoff factor applied to the last delay"`
// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
MaxRetries int64 `long:"pipelines.error-recovery.max-retries" usage:"maximum number of retries"`
MaxRetries int64 `long:"pipelines.error-recovery.max-retries" mapstructure:"max-retries" usage:"maximum number of retries"`
// MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes
MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"`
}
MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" mapstructure:"max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"`
} `mapstructure:"error-recovery"`
}

ConnectorPlugins map[string]sdk.Connector
Expand All @@ -111,9 +113,9 @@ type Config struct {
Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"`

Confluent struct {
ConnectionString string `long:"schema-registry.confluent.connection-string" usage:"confluent schema registry connection string"`
ConnectionString string `long:"schema-registry.confluent.connection-string" mapstructure:"connection-string" usage:"confluent schema registry connection string"`
}
}
} `mapstructure:"schema-registry"`

Preview struct {
// PipelineArchV2 enables the new pipeline architecture.
Expand All @@ -139,7 +141,7 @@ func DefaultConfig() Config {
func DefaultConfigWithBasePath(basePath string) Config {
var cfg Config

cfg.ConduitCfgPath = filepath.Join(basePath, "conduit.yaml")
cfg.ConduitCfg.Path = filepath.Join(basePath, "conduit.yaml")

cfg.DB.Type = DBTypeBadger
cfg.DB.Badger.Path = filepath.Join(basePath, "conduit.db")
Expand Down
Loading

0 comments on commit 2515cf5

Please sign in to comment.